@@ -25,7 +25,7 @@ import {
2525import type { Topology } from '../sdam/topology' ;
2626import type { ClientSession } from '../sessions' ;
2727import { TimeoutContext } from '../timeout' ;
28- import { squashError , supportsRetryableWrites } from '../utils' ;
28+ import { supportsRetryableWrites } from '../utils' ;
2929import { AbstractOperation , Aspect } from './operation' ;
3030
3131const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES . IllegalOperation ;
@@ -88,12 +88,6 @@ export async function executeOperation<
8888 ) ;
8989 }
9090
91- timeoutContext ??= TimeoutContext . create ( {
92- serverSelectionTimeoutMS : client . s . options . serverSelectionTimeoutMS ,
93- waitQueueTimeoutMS : client . s . options . waitQueueTimeoutMS ,
94- timeoutMS : operation . options . timeoutMS
95- } ) ;
96-
9791 const readPreference = operation . readPreference ?? ReadPreference . primary ;
9892 const inTransaction = ! ! session ?. inTransaction ( ) ;
9993
@@ -206,31 +200,15 @@ async function tryOperation<
206200 selector = readPreference ;
207201 }
208202
209- const server = await topology . selectServer ( selector , {
203+ let server = await topology . selectServer ( selector , {
210204 session,
211205 operationName : operation . commandName ,
212206 timeoutContext
213207 } ) ;
214208
215- if ( session == null ) {
216- // No session also means it is not retryable, early exit
217- return await operation . execute ( server , undefined , timeoutContext ) ;
218- }
219-
220- if ( ! operation . hasAspect ( Aspect . RETRYABLE ) ) {
221- // non-retryable operation, early exit
222- try {
223- return await operation . execute ( server , session , timeoutContext ) ;
224- } finally {
225- if ( session ?. owner != null && session . owner === owner ) {
226- try {
227- await session . endSession ( ) ;
228- } catch ( error ) {
229- squashError ( error ) ;
230- }
231- }
232- }
233- }
209+ const hasReadAspect = operation . hasAspect ( Aspect . READ_OPERATION ) ;
210+ const hasWriteAspect = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
211+ const inTransaction = session ?. inTransaction ( ) ?? false ;
234212
235213 const willRetryRead = topology . s . options . retryReads && ! inTransaction && operation . canRetryRead ;
236214
@@ -250,16 +228,42 @@ async function tryOperation<
250228 session . incrementTransactionNumber ( ) ;
251229 }
252230
253- try {
254- return await operation . execute ( server , session , timeoutContext ) ;
255- } catch ( operationError ) {
256- if ( willRetry && operationError instanceof MongoError ) {
257- return await retryOperation ( operation , operationError , {
231+ // TODO(NODE-6231): implement infinite retry within CSOT timeout here
232+ const maxTries = willRetry ? 2 : 1 ;
233+ let previousOperationError : MongoError | undefined ;
234+ let previousServer : ServerDescription | undefined ;
235+
236+ // TODO(NODE-6231): implement infinite retry within CSOT timeout here
237+ for ( let tries = 0 ; tries < maxTries ; tries ++ ) {
238+ if ( previousOperationError ) {
239+ if ( hasWriteAspect && previousOperationError . code === MMAPv1_RETRY_WRITES_ERROR_CODE ) {
240+ throw new MongoServerError ( {
241+ message : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
242+ errmsg : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
243+ originalError : previousOperationError
244+ } ) ;
245+ }
246+
247+ if ( hasWriteAspect && ! isRetryableWriteError ( previousOperationError ) )
248+ throw previousOperationError ;
249+
250+ if ( hasReadAspect && ! isRetryableReadError ( previousOperationError ) )
251+ throw previousOperationError ;
252+
253+ if (
254+ previousOperationError instanceof MongoNetworkError &&
255+ operation . hasAspect ( Aspect . CURSOR_CREATING ) &&
256+ session != null &&
257+ session . isPinned &&
258+ ! session . inTransaction ( )
259+ ) {
260+ session . unpin ( { force : true , forceClear : true } ) ;
261+ }
262+
263+ server = await topology . selectServer ( selector , {
258264 session,
259- topology,
260- selector,
261- previousServer : server . description ,
262- timeoutContext
265+ operationName : operation . commandName ,
266+ previousServer
263267 } ) ;
264268
265269 if ( hasWriteAspect && ! supportsRetryableWrites ( server ) ) {
@@ -269,76 +273,19 @@ async function tryOperation<
269273 }
270274 }
271275
272- /** @internal */
273- type RetryOptions = {
274- session : ClientSession ;
275- topology : Topology ;
276- selector : ReadPreference | ServerSelector ;
277- previousServer : ServerDescription ;
278- timeoutContext : TimeoutContext ;
279- } ;
280-
281- async function retryOperation <
282- T extends AbstractOperation < TResult > ,
283- TResult = ResultTypeFromOperation < T >
284- > (
285- operation : T ,
286- originalError : MongoError ,
287- { session, topology, selector, previousServer, timeoutContext } : RetryOptions
288- ) : Promise < TResult > {
289- const isWriteOperation = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
290- const isReadOperation = operation . hasAspect ( Aspect . READ_OPERATION ) ;
291-
292- if ( isWriteOperation && originalError . code === MMAPv1_RETRY_WRITES_ERROR_CODE ) {
293- throw new MongoServerError ( {
294- message : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
295- errmsg : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
296- originalError
297- } ) ;
298- }
299-
300- if ( isWriteOperation && ! isRetryableWriteError ( originalError ) ) {
301- throw originalError ;
302- }
303-
304- if ( isReadOperation && ! isRetryableReadError ( originalError ) ) {
305- throw originalError ;
306- }
307-
308- if (
309- originalError instanceof MongoNetworkError &&
310- session . isPinned &&
311- ! session . inTransaction ( ) &&
312- operation . hasAspect ( Aspect . CURSOR_CREATING )
313- ) {
314- // If we have a cursor and the initial command fails with a network error,
315- // we can retry it on another connection. So we need to check it back in, clear the
316- // pool for the service id, and retry again.
317- session . unpin ( { force : true , forceClear : true } ) ;
318- }
319-
320- // select a new server, and attempt to retry the operation
321- const server = await topology . selectServer ( selector , {
322- session,
323- operationName : operation . commandName ,
324- previousServer,
325- timeoutContext
326- } ) ;
327-
328- if ( isWriteOperation && ! supportsRetryableWrites ( server ) ) {
329- throw new MongoUnexpectedServerResponseError (
330- 'Selected server does not support retryable writes'
331- ) ;
332- }
333-
334- try {
335- return await operation . execute ( server , session , timeoutContext ) ;
336- } catch ( retryError ) {
337- if (
338- retryError instanceof MongoError &&
339- retryError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed )
340- ) {
341- throw originalError ;
276+ try {
277+ return await operation . execute ( server , session , timeoutContext ) ;
278+ } catch ( operationError ) {
279+ if ( ! ( operationError instanceof MongoError ) ) throw operationError ;
280+
281+ if (
282+ previousOperationError != null &&
283+ operationError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed )
284+ ) {
285+ throw previousOperationError ;
286+ }
287+ previousServer = server . description ;
288+ previousOperationError = operationError ;
342289 }
343290 }
344291
0 commit comments