@@ -18,6 +18,7 @@ import {
1818 MongoMissingDependencyError ,
1919 MongoNetworkError ,
2020 MongoNetworkTimeoutError ,
21+ MongoOperationTimeoutError ,
2122 MongoParseError ,
2223 MongoServerError ,
2324 MongoUnexpectedServerResponseError
@@ -29,7 +30,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2930import { ReadPreference , type ReadPreferenceLike } from '../read_preference' ;
3031import { ServerType } from '../sdam/common' ;
3132import { applySession , type ClientSession , updateSessionFromResponse } from '../sessions' ;
32- import { type TimeoutContext } from '../timeout' ;
33+ import { type TimeoutContext , TimeoutError } from '../timeout' ;
3334import {
3435 BufferPool ,
3536 calculateDurationInMs ,
@@ -416,6 +417,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
416417 ...options
417418 } ;
418419
420+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
421+ const { maxTimeMS } = options . timeoutContext ;
422+ if ( maxTimeMS > 0 && Number . isFinite ( maxTimeMS ) ) cmd . maxTimeMS = maxTimeMS ;
423+ }
424+
419425 const message = this . supportsOpMsg
420426 ? new OpMsgRequest ( db , cmd , commandOptions )
421427 : new OpQueryRequest ( db , cmd , commandOptions ) ;
@@ -430,7 +436,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
430436 ) : AsyncGenerator < MongoDBResponse > {
431437 this . throwIfAborted ( ) ;
432438
433- if ( typeof options . socketTimeoutMS === 'number' ) {
439+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
440+ this . socket . setTimeout ( 0 ) ;
441+ } else if ( typeof options . socketTimeoutMS === 'number' ) {
434442 this . socket . setTimeout ( options . socketTimeoutMS ) ;
435443 } else if ( this . socketTimeoutMS !== 0 ) {
436444 this . socket . setTimeout ( this . socketTimeoutMS ) ;
@@ -439,7 +447,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
439447 try {
440448 await this . writeCommand ( message , {
441449 agreedCompressor : this . description . compressor ?? 'none' ,
442- zlibCompressionLevel : this . description . zlibCompressionLevel
450+ zlibCompressionLevel : this . description . zlibCompressionLevel ,
451+ timeoutContext : options . timeoutContext
443452 } ) ;
444453
445454 if ( options . noResponse ) {
@@ -449,7 +458,17 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
449458
450459 this . throwIfAborted ( ) ;
451460
452- for await ( const response of this . readMany ( ) ) {
461+ if (
462+ options . timeoutContext ?. csotEnabled ( ) &&
463+ options . timeoutContext . minRoundTripTime != null &&
464+ options . timeoutContext . remainingTimeMS < options . timeoutContext . minRoundTripTime
465+ ) {
466+ throw new MongoOperationTimeoutError (
467+ 'Server roundtrip time is greater than the time remaining'
468+ ) ;
469+ }
470+
471+ for await ( const response of this . readMany ( { timeoutContext : options . timeoutContext } ) ) {
453472 this . socket . setTimeout ( 0 ) ;
454473 const bson = response . parse ( ) ;
455474
@@ -622,7 +641,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
622641 */
623642 private async writeCommand (
624643 command : WriteProtocolMessageType ,
625- options : { agreedCompressor ?: CompressorName ; zlibCompressionLevel ?: number }
644+ options : {
645+ agreedCompressor ?: CompressorName ;
646+ zlibCompressionLevel ?: number ;
647+ timeoutContext ?: TimeoutContext ;
648+ }
626649 ) : Promise < void > {
627650 const finalCommand =
628651 options . agreedCompressor === 'none' || ! OpCompressedRequest . canCompress ( command )
@@ -634,8 +657,32 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
634657
635658 const buffer = Buffer . concat ( await finalCommand . toBin ( ) ) ;
636659
660+ if ( options . timeoutContext ?. csotEnabled ( ) ) {
661+ if (
662+ options . timeoutContext . minRoundTripTime != null &&
663+ options . timeoutContext . remainingTimeMS < options . timeoutContext . minRoundTripTime
664+ ) {
665+ throw new MongoOperationTimeoutError (
666+ 'Server roundtrip time is greater than the time remaining'
667+ ) ;
668+ }
669+ }
670+
637671 if ( this . socket . write ( buffer ) ) return ;
638- return await once ( this . socket , 'drain' ) ;
672+
673+ const drainEvent = once < void > ( this . socket , 'drain' ) ;
674+ const timeout = options ?. timeoutContext ?. timeoutForSocketWrite ;
675+ if ( timeout ) {
676+ try {
677+ return await Promise . race ( [ drainEvent , timeout ] ) ;
678+ } catch ( error ) {
679+ if ( TimeoutError . is ( error ) ) {
680+ throw new MongoOperationTimeoutError ( 'Timed out at socket write' ) ;
681+ }
682+ throw error ;
683+ }
684+ }
685+ return await drainEvent ;
639686 }
640687
641688 /**
@@ -647,9 +694,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
647694 *
648695 * Note that `for-await` loops call `return` automatically when the loop is exited.
649696 */
650- private async * readMany ( ) : AsyncGenerator < OpMsgResponse | OpReply > {
697+ private async * readMany ( options : {
698+ timeoutContext ?: TimeoutContext ;
699+ } ) : AsyncGenerator < OpMsgResponse | OpReply > {
651700 try {
652- this . dataEvents = onData ( this . messageStream ) ;
701+ this . dataEvents = onData ( this . messageStream , options ) ;
702+
653703 for await ( const message of this . dataEvents ) {
654704 const response = await decompressResponse ( message ) ;
655705 yield response ;
0 commit comments