@@ -21,13 +21,14 @@ import {
2121 MongoInvalidArgumentError ,
2222 MongoMissingCredentialsError ,
2323 MongoNetworkError ,
24+ MongoOperationTimeoutError ,
2425 MongoRuntimeError ,
2526 MongoServerError
2627} from '../error' ;
2728import { CancellationToken , TypedEventEmitter } from '../mongo_types' ;
2829import type { Server } from '../sdam/server' ;
2930import { Timeout , TimeoutError } from '../timeout' ;
30- import { type Callback , List , makeCounter , promiseWithResolvers } from '../utils' ;
31+ import { type Callback , csotMin , List , makeCounter , promiseWithResolvers } from '../utils' ;
3132import { connect } from './connect' ;
3233import { Connection , type ConnectionEvents , type ConnectionOptions } from './connection' ;
3334import {
@@ -102,7 +103,6 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
102103export interface WaitQueueMember {
103104 resolve : ( conn : Connection ) => void ;
104105 reject : ( err : AnyError ) => void ;
105- timeout : Timeout ;
106106 [ kCancelled ] ?: boolean ;
107107}
108108
@@ -354,35 +354,57 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
354354 * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
355355 * explicitly destroyed by the new owner.
356356 */
357- async checkOut ( ) : Promise < Connection > {
357+ async checkOut ( options ?: { timeout ?: Timeout } ) : Promise < Connection > {
358358 this . emitAndLog (
359359 ConnectionPool . CONNECTION_CHECK_OUT_STARTED ,
360360 new ConnectionCheckOutStartedEvent ( this )
361361 ) ;
362362
363363 const waitQueueTimeoutMS = this . options . waitQueueTimeoutMS ;
364+ const serverSelectionTimeoutMS = this [ kServer ] . topology . s . serverSelectionTimeoutMS ;
364365
365366 const { promise, resolve, reject } = promiseWithResolvers < Connection > ( ) ;
366367
367- const timeout = Timeout . expires ( waitQueueTimeoutMS ) ;
368+ let timeout : Timeout | null = null ;
369+ if ( options ?. timeout ) {
370+ // CSOT enabled
371+ // Determine if we're using the timeout passed in or a new timeout
372+ if ( options . timeout . duration > 0 || serverSelectionTimeoutMS > 0 ) {
373+ // This check determines whether or not Topology.selectServer used the configured
374+ // `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
375+ if (
376+ options . timeout . duration === serverSelectionTimeoutMS ||
377+ csotMin ( options . timeout . duration , serverSelectionTimeoutMS ) < serverSelectionTimeoutMS
378+ ) {
379+ // server selection used `timeoutMS`, so we should use the existing timeout as the timeout
380+ // here
381+ timeout = options . timeout ;
382+ } else {
383+ // server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
384+ // the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
385+ // cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
386+ timeout = Timeout . expires ( serverSelectionTimeoutMS - options . timeout . timeElapsed ) ;
387+ }
388+ }
389+ } else {
390+ timeout = Timeout . expires ( waitQueueTimeoutMS ) ;
391+ }
368392
369393 const waitQueueMember : WaitQueueMember = {
370394 resolve,
371- reject,
372- timeout
395+ reject
373396 } ;
374397
375398 this [ kWaitQueue ] . push ( waitQueueMember ) ;
376399 process . nextTick ( ( ) => this . processWaitQueue ( ) ) ;
377400
378401 try {
379- return await Promise . race ( [ promise , waitQueueMember . timeout ] ) ;
402+ timeout ?. throwIfExpired ( ) ;
403+ return await ( timeout ? Promise . race ( [ promise , timeout ] ) : promise ) ;
380404 } catch ( error ) {
381405 if ( TimeoutError . is ( error ) ) {
382406 waitQueueMember [ kCancelled ] = true ;
383407
384- waitQueueMember . timeout . clear ( ) ;
385-
386408 this . emitAndLog (
387409 ConnectionPool . CONNECTION_CHECK_OUT_FAILED ,
388410 new ConnectionCheckOutFailedEvent ( this , 'timeout' )
@@ -393,9 +415,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
393415 : 'Timed out while checking out a connection from connection pool' ,
394416 this . address
395417 ) ;
418+ if ( options ?. timeout ) {
419+ throw new MongoOperationTimeoutError ( 'Timed out during connection checkout' , {
420+ cause : timeoutError
421+ } ) ;
422+ }
396423 throw timeoutError ;
397424 }
398425 throw error ;
426+ } finally {
427+ if ( timeout !== options ?. timeout ) timeout ?. clear ( ) ;
399428 }
400429 }
401430
@@ -761,7 +790,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
761790 ConnectionPool . CONNECTION_CHECK_OUT_FAILED ,
762791 new ConnectionCheckOutFailedEvent ( this , reason , error )
763792 ) ;
764- waitQueueMember . timeout . clear ( ) ;
765793 this [ kWaitQueue ] . shift ( ) ;
766794 waitQueueMember . reject ( error ) ;
767795 continue ;
@@ -782,7 +810,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
782810 ConnectionPool . CONNECTION_CHECKED_OUT ,
783811 new ConnectionCheckedOutEvent ( this , connection )
784812 ) ;
785- waitQueueMember . timeout . clear ( ) ;
786813
787814 this [ kWaitQueue ] . shift ( ) ;
788815 waitQueueMember . resolve ( connection ) ;
@@ -820,8 +847,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
820847 ) ;
821848 waitQueueMember . resolve ( connection ) ;
822849 }
823-
824- waitQueueMember . timeout . clear ( ) ;
825850 }
826851 process . nextTick ( ( ) => this . processWaitQueue ( ) ) ;
827852 } ) ;
0 commit comments