@@ -3,14 +3,15 @@ import type { Readable } from 'stream';
33import type { Binary , Document , Timestamp } from './bson' ;
44import { Collection } from './collection' ;
55import { CHANGE , CLOSE , END , ERROR , INIT , MORE , RESPONSE , RESUME_TOKEN_CHANGED } from './constants' ;
6- import type { AbstractCursorEvents , CursorStreamOptions } from './cursor/abstract_cursor' ;
6+ import { type CursorStreamOptions , CursorTimeoutContext } from './cursor/abstract_cursor' ;
77import { ChangeStreamCursor , type ChangeStreamCursorOptions } from './cursor/change_stream_cursor' ;
88import { Db } from './db' ;
99import {
1010 type AnyError ,
1111 isResumableError ,
1212 MongoAPIError ,
1313 MongoChangeStreamError ,
14+ MongoOperationTimeoutError ,
1415 MongoRuntimeError
1516} from './error' ;
1617import { MongoClient } from './mongo_client' ;
@@ -20,6 +21,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
2021import type { ReadPreference } from './read_preference' ;
2122import { type AsyncDisposable , configureResourceManagement } from './resource_management' ;
2223import type { ServerSessionId } from './sessions' ;
24+ import { CSOTTimeoutContext , type TimeoutContext } from './timeout' ;
2325import { filterOptions , getTopology , type MongoDBNamespace , squashError } from './utils' ;
2426
2527/** @internal */
@@ -538,7 +540,13 @@ export type ChangeStreamEvents<
538540 end ( ) : void ;
539541 error ( error : Error ) : void ;
540542 change ( change : TChange ) : void ;
541- } & AbstractCursorEvents ;
543+ /**
544+ * @remarks Note that the `close` event is currently emitted whenever the internal `ChangeStreamCursor`
545+ * instance is closed, which can occur multiple times for a given `ChangeStream` instance.
546+ * When this event is emitted is subject to change outside of major versions.
547+ */
548+ close ( ) : void ;
549+ } ;
542550
543551/**
544552 * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
@@ -609,6 +617,13 @@ export class ChangeStream<
609617 */
610618 static readonly RESUME_TOKEN_CHANGED = RESUME_TOKEN_CHANGED ;
611619
620+ private timeoutContext ?: TimeoutContext ;
621+ /**
622+ * Note that this property is here to uniquely identify a ChangeStream instance as the owner of
623+ * the {@link CursorTimeoutContext} instance (see {@link ChangeStream._createChangeStreamCursor}) to ensure
624+ * that {@link AbstractCursor.close} does not mutate the timeoutContext.
625+ */
626+ private contextOwner : symbol ;
612627 /**
613628 * @internal
614629 *
@@ -624,20 +639,25 @@ export class ChangeStream<
624639
625640 this . pipeline = pipeline ;
626641 this . options = { ...options } ;
642+ let serverSelectionTimeoutMS : number ;
627643 delete this . options . writeConcern ;
628644
629645 if ( parent instanceof Collection ) {
630646 this . type = CHANGE_DOMAIN_TYPES . COLLECTION ;
647+ serverSelectionTimeoutMS = parent . s . db . client . options . serverSelectionTimeoutMS ;
631648 } else if ( parent instanceof Db ) {
632649 this . type = CHANGE_DOMAIN_TYPES . DATABASE ;
650+ serverSelectionTimeoutMS = parent . client . options . serverSelectionTimeoutMS ;
633651 } else if ( parent instanceof MongoClient ) {
634652 this . type = CHANGE_DOMAIN_TYPES . CLUSTER ;
653+ serverSelectionTimeoutMS = parent . options . serverSelectionTimeoutMS ;
635654 } else {
636655 throw new MongoChangeStreamError (
637656 'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient'
638657 ) ;
639658 }
640659
660+ this . contextOwner = Symbol ( ) ;
641661 this . parent = parent ;
642662 this . namespace = parent . s . namespace ;
643663 if ( ! this . options . readPreference && parent . readPreference ) {
@@ -662,6 +682,13 @@ export class ChangeStream<
662682 this [ kCursorStream ] ?. removeAllListeners ( 'data' ) ;
663683 }
664684 } ) ;
685+
686+ if ( this . options . timeoutMS != null ) {
687+ this . timeoutContext = new CSOTTimeoutContext ( {
688+ timeoutMS : this . options . timeoutMS ,
689+ serverSelectionTimeoutMS
690+ } ) ;
691+ }
665692 }
666693
667694 /** @internal */
@@ -681,22 +708,31 @@ export class ChangeStream<
681708 // This loop continues until either a change event is received or until a resume attempt
682709 // fails.
683710
684- while ( true ) {
685- try {
686- const hasNext = await this . cursor . hasNext ( ) ;
687- return hasNext ;
688- } catch ( error ) {
711+ this . timeoutContext ?. refresh ( ) ;
712+ try {
713+ while ( true ) {
714+ const cursorInitialized = this . cursor . id != null ;
689715 try {
690- await this . _processErrorIteratorMode ( error ) ;
716+ const hasNext = await this . cursor . hasNext ( ) ;
717+ return hasNext ;
691718 } catch ( error ) {
692719 try {
693- await this . close ( ) ;
720+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
694721 } catch ( error ) {
695- squashError ( error ) ;
722+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) {
723+ throw error ;
724+ }
725+ try {
726+ await this . close ( ) ;
727+ } catch ( error ) {
728+ squashError ( error ) ;
729+ }
730+ throw error ;
696731 }
697- throw error ;
698732 }
699733 }
734+ } finally {
735+ this . timeoutContext ?. clear ( ) ;
700736 }
701737 }
702738
@@ -706,24 +742,33 @@ export class ChangeStream<
706742 // Change streams must resume indefinitely while each resume event succeeds.
707743 // This loop continues until either a change event is received or until a resume attempt
708744 // fails.
745+ this . timeoutContext ?. refresh ( ) ;
709746
710- while ( true ) {
711- try {
712- const change = await this . cursor . next ( ) ;
713- const processedChange = this . _processChange ( change ?? null ) ;
714- return processedChange ;
715- } catch ( error ) {
747+ try {
748+ while ( true ) {
749+ const cursorInitialized = this . cursor . id != null ;
716750 try {
717- await this . _processErrorIteratorMode ( error ) ;
751+ const change = await this . cursor . next ( ) ;
752+ const processedChange = this . _processChange ( change ?? null ) ;
753+ return processedChange ;
718754 } catch ( error ) {
719755 try {
720- await this . close ( ) ;
756+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
721757 } catch ( error ) {
722- squashError ( error ) ;
758+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) {
759+ throw error ;
760+ }
761+ try {
762+ await this . close ( ) ;
763+ } catch ( error ) {
764+ squashError ( error ) ;
765+ }
766+ throw error ;
723767 }
724- throw error ;
725768 }
726769 }
770+ } finally {
771+ this . timeoutContext ?. clear ( ) ;
727772 }
728773 }
729774
@@ -735,23 +780,30 @@ export class ChangeStream<
735780 // Change streams must resume indefinitely while each resume event succeeds.
736781 // This loop continues until either a change event is received or until a resume attempt
737782 // fails.
783+ this . timeoutContext ?. refresh ( ) ;
738784
739- while ( true ) {
740- try {
741- const change = await this . cursor . tryNext ( ) ;
742- return change ?? null ;
743- } catch ( error ) {
785+ try {
786+ while ( true ) {
787+ const cursorInitialized = this . cursor . id != null ;
744788 try {
745- await this . _processErrorIteratorMode ( error ) ;
789+ const change = await this . cursor . tryNext ( ) ;
790+ return change ?? null ;
746791 } catch ( error ) {
747792 try {
748- await this . close ( ) ;
793+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
749794 } catch ( error ) {
750- squashError ( error ) ;
795+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) throw error ;
796+ try {
797+ await this . close ( ) ;
798+ } catch ( error ) {
799+ squashError ( error ) ;
800+ }
801+ throw error ;
751802 }
752- throw error ;
753803 }
754804 }
805+ } finally {
806+ this . timeoutContext ?. clear ( ) ;
755807 }
756808 }
757809
@@ -784,6 +836,8 @@ export class ChangeStream<
784836 * Frees the internal resources used by the change stream.
785837 */
786838 async close ( ) : Promise < void > {
839+ this . timeoutContext ?. clear ( ) ;
840+ this . timeoutContext = undefined ;
787841 this [ kClosed ] = true ;
788842
789843 const cursor = this . cursor ;
@@ -866,7 +920,12 @@ export class ChangeStream<
866920 client ,
867921 this . namespace ,
868922 pipeline ,
869- options
923+ {
924+ ...options ,
925+ timeoutContext : this . timeoutContext
926+ ? new CursorTimeoutContext ( this . timeoutContext , this . contextOwner )
927+ : undefined
928+ }
870929 ) ;
871930
872931 for ( const event of CHANGE_STREAM_EVENTS ) {
@@ -900,7 +959,7 @@ export class ChangeStream<
900959 this . emit ( ChangeStream . ERROR , error ) ;
901960 }
902961 } ) ;
903- stream . on ( 'error' , error => this . _processErrorStreamMode ( error ) ) ;
962+ stream . on ( 'error' , error => this . _processErrorStreamMode ( error , this . cursor . id != null ) ) ;
904963 }
905964
906965 /** @internal */
@@ -942,24 +1001,30 @@ export class ChangeStream<
9421001 }
9431002
9441003 /** @internal */
945- private _processErrorStreamMode ( changeStreamError : AnyError ) {
1004+ private _processErrorStreamMode ( changeStreamError : AnyError , cursorInitialized : boolean ) {
9461005 // If the change stream has been closed explicitly, do not process error.
9471006 if ( this [ kClosed ] ) return ;
9481007
949- if ( this . cursor . id != null && isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ) {
1008+ if (
1009+ cursorInitialized &&
1010+ ( isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ||
1011+ changeStreamError instanceof MongoOperationTimeoutError )
1012+ ) {
9501013 this . _endStream ( ) ;
9511014
952- this . cursor . close ( ) . then ( undefined , squashError ) ;
953-
954- const topology = getTopology ( this . parent ) ;
955- topology
956- . selectServer ( this . cursor . readPreference , {
957- operationName : 'reconnect topology in change stream'
958- } )
959-
1015+ this . cursor
1016+ . close ( )
1017+ . then (
1018+ ( ) => this . _resume ( changeStreamError ) ,
1019+ e => {
1020+ squashError ( e ) ;
1021+ return this . _resume ( changeStreamError ) ;
1022+ }
1023+ )
9601024 . then (
9611025 ( ) => {
962- this . cursor = this . _createChangeStreamCursor ( this . cursor . resumeOptions ) ;
1026+ if ( changeStreamError instanceof MongoOperationTimeoutError )
1027+ this . emit ( ChangeStream . ERROR , changeStreamError ) ;
9631028 } ,
9641029 ( ) => this . _closeEmitterModeWithError ( changeStreamError )
9651030 ) ;
@@ -969,33 +1034,44 @@ export class ChangeStream<
9691034 }
9701035
9711036 /** @internal */
972- private async _processErrorIteratorMode ( changeStreamError : AnyError ) {
1037+ private async _processErrorIteratorMode ( changeStreamError : AnyError , cursorInitialized : boolean ) {
9731038 if ( this [ kClosed ] ) {
9741039 // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
9751040 throw new MongoAPIError ( CHANGESTREAM_CLOSED_ERROR ) ;
9761041 }
9771042
9781043 if (
979- this . cursor . id == null ||
980- ! isResumableError ( changeStreamError , this . cursor . maxWireVersion )
1044+ cursorInitialized &&
1045+ ( isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ||
1046+ changeStreamError instanceof MongoOperationTimeoutError )
9811047 ) {
1048+ try {
1049+ await this . cursor . close ( ) ;
1050+ } catch ( error ) {
1051+ squashError ( error ) ;
1052+ }
1053+
1054+ await this . _resume ( changeStreamError ) ;
1055+
1056+ if ( changeStreamError instanceof MongoOperationTimeoutError ) throw changeStreamError ;
1057+ } else {
9821058 try {
9831059 await this . close ( ) ;
9841060 } catch ( error ) {
9851061 squashError ( error ) ;
9861062 }
1063+
9871064 throw changeStreamError ;
9881065 }
1066+ }
9891067
990- try {
991- await this . cursor . close ( ) ;
992- } catch ( error ) {
993- squashError ( error ) ;
994- }
1068+ private async _resume ( changeStreamError : AnyError ) {
1069+ this . timeoutContext ?. refresh ( ) ;
9951070 const topology = getTopology ( this . parent ) ;
9961071 try {
9971072 await topology . selectServer ( this . cursor . readPreference , {
998- operationName : 'reconnect topology in change stream'
1073+ operationName : 'reconnect topology in change stream' ,
1074+ timeoutContext : this . timeoutContext
9991075 } ) ;
10001076 this . cursor = this . _createChangeStreamCursor ( this . cursor . resumeOptions ) ;
10011077 } catch {
0 commit comments