@@ -11,6 +11,7 @@ import {
1111 isResumableError ,
1212 MongoAPIError ,
1313 MongoChangeStreamError ,
14+ MongoOperationTimeoutError ,
1415 MongoRuntimeError
1516} from './error' ;
1617import { MongoClient } from './mongo_client' ;
@@ -20,6 +21,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
2021import type { ReadPreference } from './read_preference' ;
2122import { type AsyncDisposable , configureResourceManagement } from './resource_management' ;
2223import type { ServerSessionId } from './sessions' ;
24+ import { TimeoutContext } from './timeout' ;
2325import { filterOptions , getTopology , type MongoDBNamespace , squashError } from './utils' ;
2426
2527/** @internal */
@@ -584,6 +586,8 @@ export class ChangeStream<
584586 /** @internal */
585587 [ kMode ] : false | 'iterator' | 'emitter' ;
586588
589+ private timeoutContext ?: TimeoutContext ;
590+
587591 /** @event */
588592 static readonly RESPONSE = RESPONSE ;
589593 /** @event */
@@ -689,6 +693,9 @@ export class ChangeStream<
689693 try {
690694 await this . _processErrorIteratorMode ( error ) ;
691695 } catch ( error ) {
696+ if ( error instanceof MongoOperationTimeoutError ) {
697+ throw error ;
698+ }
692699 try {
693700 await this . close ( ) ;
694701 } catch ( error ) {
@@ -705,25 +712,33 @@ export class ChangeStream<
705712 this . _setIsIterator ( ) ;
706713 // Change streams must resume indefinitely while each resume event succeeds.
707714 // This loop continues until either a change event is received or until a resume attempt
708- // fails.
715+ // fails or until a timeout error is encountered
716+ this . timeoutContext ?. refresh ( ) ;
709717
710- while ( true ) {
711- try {
712- const change = await this . cursor . next ( ) ;
713- const processedChange = this . _processChange ( change ?? null ) ;
714- return processedChange ;
715- } catch ( error ) {
718+ try {
719+ while ( true ) {
716720 try {
717- await this . _processErrorIteratorMode ( error ) ;
721+ const change = await this . cursor . next ( ) ;
722+ const processedChange = this . _processChange ( change ?? null ) ;
723+ return processedChange ;
718724 } catch ( error ) {
719725 try {
720- await this . close ( ) ;
726+ await this . _processErrorIteratorMode ( error ) ;
721727 } catch ( error ) {
722- squashError ( error ) ;
728+ if ( error instanceof MongoOperationTimeoutError ) {
729+ throw error ; // Don't close the change stream, but throw the timeout error
730+ }
731+ try {
732+ await this . close ( ) ;
733+ } catch ( error ) {
734+ squashError ( error ) ;
735+ }
736+ throw error ;
723737 }
724- throw error ;
725738 }
726739 }
740+ } finally {
741+ this . timeoutContext ?. clear ( ) ;
727742 }
728743 }
729744
@@ -744,6 +759,9 @@ export class ChangeStream<
744759 try {
745760 await this . _processErrorIteratorMode ( error ) ;
746761 } catch ( error ) {
762+ if ( error instanceof MongoOperationTimeoutError ) {
763+ throw error ; // throw the error without closing the change stream
764+ }
747765 try {
748766 await this . close ( ) ;
749767 } catch ( error ) {
@@ -862,11 +880,20 @@ export class ChangeStream<
862880 ) ;
863881 }
864882
883+ if ( this . options . timeoutMS != null ) {
884+ this . timeoutContext ??= TimeoutContext . create ( {
885+ timeoutMS : this . options . timeoutMS ,
886+ serverSelectionTimeoutMS : client . options . serverSelectionTimeoutMS
887+ } ) ;
888+ delete this . options . timeoutMS ;
889+ }
890+
865891 const changeStreamCursor = new ChangeStreamCursor < TSchema , TChange > (
866892 client ,
867893 this . namespace ,
868894 pipeline ,
869- options
895+ options ,
896+ this . timeoutContext
870897 ) ;
871898
872899 for ( const event of CHANGE_STREAM_EVENTS ) {
@@ -946,6 +973,10 @@ export class ChangeStream<
946973 // If the change stream has been closed explicitly, do not process error.
947974 if ( this [ kClosed ] ) return ;
948975
976+ if ( changeStreamError instanceof MongoOperationTimeoutError ) {
977+ return ; // FIXME: At least emit the error
978+ }
979+
949980 if ( isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ) {
950981 this . _endStream ( ) ;
951982
@@ -975,7 +1006,10 @@ export class ChangeStream<
9751006 throw new MongoAPIError ( CHANGESTREAM_CLOSED_ERROR ) ;
9761007 }
9771008
978- if ( ! isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ) {
1009+ if (
1010+ ! isResumableError ( changeStreamError , this . cursor . maxWireVersion ) &&
1011+ ! ( changeStreamError instanceof MongoOperationTimeoutError )
1012+ ) {
9791013 try {
9801014 await this . close ( ) ;
9811015 } catch ( error ) {
@@ -1000,6 +1034,8 @@ export class ChangeStream<
10001034 await this . close ( ) ;
10011035 throw changeStreamError ;
10021036 }
1037+
1038+ if ( changeStreamError instanceof MongoOperationTimeoutError ) throw changeStreamError ;
10031039 }
10041040}
10051041
0 commit comments