@@ -24,8 +24,8 @@ import {
2424} from '../sdam/server_selection' ;
2525import type { Topology } from '../sdam/topology' ;
2626import type { ClientSession } from '../sessions' ;
27- import { TimeoutContext } from '../timeout' ;
28- import { supportsRetryableWrites } from '../utils' ;
27+ import { Timeout } from '../timeout' ;
28+ import { squashError , supportsRetryableWrites } from '../utils' ;
2929import { AbstractOperation , Aspect } from './operation' ;
3030
3131const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES . IllegalOperation ;
@@ -200,10 +200,13 @@ async function tryOperation<
200200 selector = readPreference ;
201201 }
202202
203- let server = await topology . selectServer ( selector , {
203+ const timeout = operation . timeoutMS != null ? Timeout . expires ( operation . timeoutMS ) : undefined ;
204+ operation . timeout = timeout ;
205+
206+ const server = await topology . selectServer ( selector , {
204207 session,
205208 operationName : operation . commandName ,
206- timeoutContext
209+ timeout
207210 } ) ;
208211
209212 const hasReadAspect = operation . hasAspect ( Aspect . READ_OPERATION ) ;
@@ -278,14 +281,67 @@ async function tryOperation<
278281 } catch ( operationError ) {
279282 if ( ! ( operationError instanceof MongoError ) ) throw operationError ;
280283
281- if (
282- previousOperationError != null &&
283- operationError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed )
284- ) {
285- throw previousOperationError ;
286- }
287- previousServer = server . description ;
288- previousOperationError = operationError ;
284+ async function retryOperation <
285+ T extends AbstractOperation < TResult > ,
286+ TResult = ResultTypeFromOperation < T >
287+ > (
288+ operation : T ,
289+ originalError : MongoError ,
290+ { session, topology, selector, previousServer } : RetryOptions
291+ ) : Promise < TResult > {
292+ const isWriteOperation = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
293+ const isReadOperation = operation . hasAspect ( Aspect . READ_OPERATION ) ;
294+
295+ if ( isWriteOperation && originalError . code === MMAPv1_RETRY_WRITES_ERROR_CODE ) {
296+ throw new MongoServerError ( {
297+ message : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
298+ errmsg : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
299+ originalError
300+ } ) ;
301+ }
302+
303+ if ( isWriteOperation && ! isRetryableWriteError ( originalError ) ) {
304+ throw originalError ;
305+ }
306+
307+ if ( isReadOperation && ! isRetryableReadError ( originalError ) ) {
308+ throw originalError ;
309+ }
310+
311+ if (
312+ originalError instanceof MongoNetworkError &&
313+ session . isPinned &&
314+ ! session . inTransaction ( ) &&
315+ operation . hasAspect ( Aspect . CURSOR_CREATING )
316+ ) {
317+ // If we have a cursor and the initial command fails with a network error,
318+ // we can retry it on another connection. So we need to check it back in, clear the
319+ // pool for the service id, and retry again.
320+ session . unpin ( { force : true , forceClear : true } ) ;
321+ }
322+
323+ // select a new server, and attempt to retry the operation
324+ const server = await topology . selectServer ( selector , {
325+ session,
326+ timeout : operation . timeout ,
327+ operationName : operation . commandName ,
328+ previousServer
329+ } ) ;
330+
331+ if ( isWriteOperation && ! supportsRetryableWrites ( server ) ) {
332+ throw new MongoUnexpectedServerResponseError (
333+ 'Selected server does not support retryable writes'
334+ ) ;
335+ }
336+
337+ try {
338+ return await operation . execute ( server , session ) ;
339+ } catch ( retryError ) {
340+ if (
341+ retryError instanceof MongoError &&
342+ retryError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed )
343+ ) {
344+ throw originalError ;
289345 }
290346 }
291347
0 commit comments