@@ -3,14 +3,19 @@ import type { Readable } from 'stream';
33import type { Binary , Document , Timestamp } from './bson' ;
44import { Collection } from './collection' ;
55import { CHANGE , CLOSE , END , ERROR , INIT , MORE , RESPONSE , RESUME_TOKEN_CHANGED } from './constants' ;
6- import type { AbstractCursorEvents , CursorStreamOptions } from './cursor/abstract_cursor' ;
6+ import {
7+ type AbstractCursorEvents ,
8+ type CursorStreamOptions ,
9+ CursorTimeoutContext
10+ } from './cursor/abstract_cursor' ;
711import { ChangeStreamCursor , type ChangeStreamCursorOptions } from './cursor/change_stream_cursor' ;
812import { Db } from './db' ;
913import {
1014 type AnyError ,
1115 isResumableError ,
1216 MongoAPIError ,
1317 MongoChangeStreamError ,
18+ MongoOperationTimeoutError ,
1419 MongoRuntimeError
1520} from './error' ;
1621import { MongoClient } from './mongo_client' ;
@@ -20,6 +25,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
2025import type { ReadPreference } from './read_preference' ;
2126import { type AsyncDisposable , configureResourceManagement } from './resource_management' ;
2227import type { ServerSessionId } from './sessions' ;
28+ import { type TimeoutContext } from './timeout' ;
2329import { filterOptions , getTopology , type MongoDBNamespace , squashError } from './utils' ;
2430
2531/** @internal */
@@ -609,6 +615,8 @@ export class ChangeStream<
609615 */
610616 static readonly RESUME_TOKEN_CHANGED = RESUME_TOKEN_CHANGED ;
611617
618+ private timeoutContext ?: TimeoutContext ;
619+ private symbol : symbol ;
612620 /**
613621 * @internal
614622 *
@@ -638,6 +646,7 @@ export class ChangeStream<
638646 ) ;
639647 }
640648
649+ this . symbol = Symbol ( ) ;
641650 this . parent = parent ;
642651 this . namespace = parent . s . namespace ;
643652 if ( ! this . options . readPreference && parent . readPreference ) {
@@ -681,22 +690,31 @@ export class ChangeStream<
681690 // This loop continues until either a change event is received or until a resume attempt
682691 // fails.
683692
684- while ( true ) {
685- try {
686- const hasNext = await this . cursor . hasNext ( ) ;
687- return hasNext ;
688- } catch ( error ) {
693+ this . timeoutContext ?. refresh ( ) ;
694+ try {
695+ while ( true ) {
696+ const cursorInitialized = this . cursor . id != null ;
689697 try {
690- await this . _processErrorIteratorMode ( error ) ;
698+ const hasNext = await this . cursor . hasNext ( ) ;
699+ return hasNext ;
691700 } catch ( error ) {
692701 try {
693- await this . close ( ) ;
702+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
694703 } catch ( error ) {
695- squashError ( error ) ;
704+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) {
705+ throw error ;
706+ }
707+ try {
708+ await this . close ( ) ;
709+ } catch ( error ) {
710+ squashError ( error ) ;
711+ }
712+ throw error ;
696713 }
697- throw error ;
698714 }
699715 }
716+ } finally {
717+ this . timeoutContext ?. clear ( ) ;
700718 }
701719 }
702720
@@ -706,24 +724,33 @@ export class ChangeStream<
706724 // Change streams must resume indefinitely while each resume event succeeds.
707725 // This loop continues until either a change event is received or until a resume attempt
708726 // fails.
727+ this . timeoutContext ?. refresh ( ) ;
709728
710- while ( true ) {
711- try {
712- const change = await this . cursor . next ( ) ;
713- const processedChange = this . _processChange ( change ?? null ) ;
714- return processedChange ;
715- } catch ( error ) {
729+ try {
730+ while ( true ) {
731+ const cursorInitialized = this . cursor . id != null ;
716732 try {
717- await this . _processErrorIteratorMode ( error ) ;
733+ const change = await this . cursor . next ( ) ;
734+ const processedChange = this . _processChange ( change ?? null ) ;
735+ return processedChange ;
718736 } catch ( error ) {
719737 try {
720- await this . close ( ) ;
738+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
721739 } catch ( error ) {
722- squashError ( error ) ;
740+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) {
741+ throw error ;
742+ }
743+ try {
744+ await this . close ( ) ;
745+ } catch ( error ) {
746+ squashError ( error ) ;
747+ }
748+ throw error ;
723749 }
724- throw error ;
725750 }
726751 }
752+ } finally {
753+ this . timeoutContext ?. clear ( ) ;
727754 }
728755 }
729756
@@ -735,23 +762,30 @@ export class ChangeStream<
735762 // Change streams must resume indefinitely while each resume event succeeds.
736763 // This loop continues until either a change event is received or until a resume attempt
737764 // fails.
765+ this . timeoutContext ?. refresh ( ) ;
738766
739- while ( true ) {
740- try {
741- const change = await this . cursor . tryNext ( ) ;
742- return change ?? null ;
743- } catch ( error ) {
767+ try {
768+ while ( true ) {
769+ const cursorInitialized = this . cursor . id != null ;
744770 try {
745- await this . _processErrorIteratorMode ( error ) ;
771+ const change = await this . cursor . tryNext ( ) ;
772+ return change ?? null ;
746773 } catch ( error ) {
747774 try {
748- await this . close ( ) ;
775+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
749776 } catch ( error ) {
750- squashError ( error ) ;
777+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) throw error ;
778+ try {
779+ await this . close ( ) ;
780+ } catch ( error ) {
781+ squashError ( error ) ;
782+ }
783+ throw error ;
751784 }
752- throw error ;
753785 }
754786 }
787+ } finally {
788+ this . timeoutContext ?. clear ( ) ;
755789 }
756790 }
757791
@@ -784,6 +818,8 @@ export class ChangeStream<
784818 * Frees the internal resources used by the change stream.
785819 */
786820 async close ( ) : Promise < void > {
821+ this . timeoutContext ?. clear ( ) ;
822+ this . timeoutContext = undefined ;
787823 this [ kClosed ] = true ;
788824
789825 const cursor = this . cursor ;
@@ -866,7 +902,12 @@ export class ChangeStream<
866902 client ,
867903 this . namespace ,
868904 pipeline ,
869- options
905+ {
906+ ...options ,
907+ timeoutContext : this . timeoutContext
908+ ? new CursorTimeoutContext ( this . timeoutContext , this . symbol )
909+ : undefined
910+ }
870911 ) ;
871912
872913 for ( const event of CHANGE_STREAM_EVENTS ) {
@@ -893,14 +934,17 @@ export class ChangeStream<
893934 const stream = this [ kCursorStream ] ?? cursor . stream ( ) ;
894935 this [ kCursorStream ] = stream ;
895936 stream . on ( 'data' , change => {
937+ this . timeoutContext ?. refresh ( ) ;
896938 try {
897939 const processedChange = this . _processChange ( change ) ;
898940 this . emit ( ChangeStream . CHANGE , processedChange ) ;
899941 } catch ( error ) {
900942 this . emit ( ChangeStream . ERROR , error ) ;
943+ } finally {
944+ this . timeoutContext ?. clear ( ) ;
901945 }
902946 } ) ;
903- stream . on ( 'error' , error => this . _processErrorStreamMode ( error ) ) ;
947+ stream . on ( 'error' , error => this . _processErrorStreamMode ( error , this . cursor . id != null ) ) ;
904948 }
905949
906950 /** @internal */
@@ -942,24 +986,30 @@ export class ChangeStream<
942986 }
943987
944988 /** @internal */
945- private _processErrorStreamMode ( changeStreamError : AnyError ) {
989+ private _processErrorStreamMode ( changeStreamError : AnyError , cursorInitialized : boolean ) {
946990 // If the change stream has been closed explicitly, do not process error.
947991 if ( this [ kClosed ] ) return ;
948992
949- if ( this . cursor . id != null && isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ) {
993+ if (
994+ cursorInitialized &&
995+ ( isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ||
996+ changeStreamError instanceof MongoOperationTimeoutError )
997+ ) {
950998 this . _endStream ( ) ;
951999
952- this . cursor . close ( ) . then ( undefined , squashError ) ;
953-
954- const topology = getTopology ( this . parent ) ;
955- topology
956- . selectServer ( this . cursor . readPreference , {
957- operationName : 'reconnect topology in change stream'
958- } )
959-
1000+ this . cursor
1001+ . close ( )
1002+ . then (
1003+ ( ) => this . _resume ( changeStreamError ) ,
1004+ e => {
1005+ squashError ( e ) ;
1006+ return this . _resume ( changeStreamError ) ;
1007+ }
1008+ )
9601009 . then (
9611010 ( ) => {
962- this . cursor = this . _createChangeStreamCursor ( this . cursor . resumeOptions ) ;
1011+ if ( changeStreamError instanceof MongoOperationTimeoutError )
1012+ this . emit ( ChangeStream . ERROR , changeStreamError ) ;
9631013 } ,
9641014 ( ) => this . _closeEmitterModeWithError ( changeStreamError )
9651015 ) ;
@@ -969,15 +1019,16 @@ export class ChangeStream<
9691019 }
9701020
9711021 /** @internal */
972- private async _processErrorIteratorMode ( changeStreamError : AnyError ) {
1022+ private async _processErrorIteratorMode ( changeStreamError : AnyError , cursorInitialized : boolean ) {
9731023 if ( this [ kClosed ] ) {
9741024 // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
9751025 throw new MongoAPIError ( CHANGESTREAM_CLOSED_ERROR ) ;
9761026 }
9771027
9781028 if (
979- this . cursor . id == null ||
980- ! isResumableError ( changeStreamError , this . cursor . maxWireVersion )
1029+ ! cursorInitialized ||
1030+ ( ! isResumableError ( changeStreamError , this . cursor . maxWireVersion ) &&
1031+ ! ( changeStreamError instanceof MongoOperationTimeoutError ) )
9811032 ) {
9821033 try {
9831034 await this . close ( ) ;
@@ -992,10 +1043,19 @@ export class ChangeStream<
9921043 } catch ( error ) {
9931044 squashError ( error ) ;
9941045 }
1046+
1047+ await this . _resume ( changeStreamError ) ;
1048+
1049+ if ( changeStreamError instanceof MongoOperationTimeoutError ) throw changeStreamError ;
1050+ }
1051+
1052+ private async _resume ( changeStreamError : AnyError ) {
1053+ this . timeoutContext ?. refresh ( ) ;
9951054 const topology = getTopology ( this . parent ) ;
9961055 try {
9971056 await topology . selectServer ( this . cursor . readPreference , {
998- operationName : 'reconnect topology in change stream'
1057+ operationName : 'reconnect topology in change stream' ,
1058+ timeoutContext : this . timeoutContext
9991059 } ) ;
10001060 this . cursor = this . _createChangeStreamCursor ( this . cursor . resumeOptions ) ;
10011061 } catch {
0 commit comments