@@ -24,7 +24,7 @@ import {
2424} from '../sdam/server_selection' ;
2525import type { Topology } from '../sdam/topology' ;
2626import type { ClientSession } from '../sessions' ;
27- import { squashError , supportsRetryableWrites } from '../utils' ;
27+ import { supportsRetryableWrites } from '../utils' ;
2828import { AbstractOperation , Aspect } from './operation' ;
2929
3030const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES . IllegalOperation ;
@@ -45,10 +45,9 @@ type ResultTypeFromOperation<TOperation> = TOperation extends AbstractOperation<
4545 * not provided.
4646 *
4747 * The expectation is that this function:
48- * - Connects the MongoClient if it has not already been connected
48+ * - Connects the MongoClient if it has not already been connected, see { @link autoConnect}
4949 * - Creates a session if none is provided and cleans up the session it creates
50- * - Selects a server based on readPreference or various factors
51- * - Retries an operation if it fails for certain errors, see {@link retryOperation}
50+ * - Tries an operation and retries under certain conditions, see {@link tryOperation}
5251 *
5352 * @typeParam T - The operation's type
5453 * @typeParam TResult - The type of the operation's result, calculated from T
@@ -65,23 +64,7 @@ export async function executeOperation<
6564 throw new MongoRuntimeError ( 'This method requires a valid operation instance' ) ;
6665 }
6766
68- if ( client . topology == null ) {
69- // Auto connect on operation
70- if ( client . s . hasBeenClosed ) {
71- throw new MongoNotConnectedError ( 'Client must be connected before running operations' ) ;
72- }
73- client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] = true ;
74- try {
75- await client . connect ( ) ;
76- } finally {
77- delete client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] ;
78- }
79- }
80-
81- const { topology } = client ;
82- if ( topology == null ) {
83- throw new MongoRuntimeError ( 'client.connect did not create a topology but also did not throw' ) ;
84- }
67+ const topology = await autoConnect ( client ) ;
8568
8669 // The driver sessions spec mandates that we implicitly create sessions for operations
8770 // that are not explicitly provided with a session.
@@ -108,7 +91,6 @@ export async function executeOperation<
10891 const inTransaction = ! ! session ?. inTransaction ( ) ;
10992
11093 const hasReadAspect = operation . hasAspect ( Aspect . READ_OPERATION ) ;
111- const hasWriteAspect = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
11294
11395 if (
11496 inTransaction &&
@@ -124,6 +106,73 @@ export async function executeOperation<
124106 session . unpin ( ) ;
125107 }
126108
109+ try {
110+ return await tryOperation ( operation , {
111+ topology,
112+ session,
113+ readPreference
114+ } ) ;
115+ } finally {
116+ if ( session ?. owner != null && session . owner === owner ) {
117+ await session . endSession ( ) ;
118+ }
119+ }
120+ }
121+
122+ /**
123+ * Connects a client if it has not yet been connected
124+ * @internal
125+ */
126+ async function autoConnect ( client : MongoClient ) : Promise < Topology > {
127+ if ( client . topology == null ) {
128+ if ( client . s . hasBeenClosed ) {
129+ throw new MongoNotConnectedError ( 'Client must be connected before running operations' ) ;
130+ }
131+ client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] = true ;
132+ try {
133+ await client . connect ( ) ;
134+ if ( client . topology == null ) {
135+ throw new MongoRuntimeError (
136+ 'client.connect did not create a topology but also did not throw'
137+ ) ;
138+ }
139+ return client . topology ;
140+ } finally {
141+ delete client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] ;
142+ }
143+ }
144+ return client . topology ;
145+ }
146+
147+ /** @internal */
148+ type RetryOptions = {
149+ session : ClientSession | undefined ;
150+ readPreference : ReadPreference ;
151+ topology : Topology ;
152+ } ;
153+
154+ /**
155+ * Executes an operation and retries as appropriate
156+ * @internal
157+ *
158+ * @remarks
159+ * Implements behaviour described in [Retryable Reads](https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.md) and [Retryable
160+ * Writes](https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.md) specification
161+ *
162+ * This function:
163+ * - performs initial server selection
164+ * - attempts to execute an operation
165+ * - retries the operation if it meets the criteria for a retryable read or a retryable write
166+ *
167+ * @typeParam T - The operation's type
168+ * @typeParam TResult - The type of the operation's result, calculated from T
169+ *
170+ * @param operation - The operation to execute
171+ * */
172+ async function tryOperation <
173+ T extends AbstractOperation < TResult > ,
174+ TResult = ResultTypeFromOperation < T >
175+ > ( operation : T , { topology, session, readPreference } : RetryOptions ) : Promise < TResult > {
127176 let selector : ReadPreference | ServerSelector ;
128177
129178 if ( operation . hasAspect ( Aspect . MUST_SELECT_SAME_SERVER ) ) {
@@ -139,30 +188,14 @@ export async function executeOperation<
139188 selector = readPreference ;
140189 }
141190
142- const server = await topology . selectServer ( selector , {
191+ let server = await topology . selectServer ( selector , {
143192 session,
144193 operationName : operation . commandName
145194 } ) ;
146195
147- if ( session == null ) {
148- // No session also means it is not retryable, early exit
149- return await operation . execute ( server , undefined ) ;
150- }
151-
152- if ( ! operation . hasAspect ( Aspect . RETRYABLE ) ) {
153- // non-retryable operation, early exit
154- try {
155- return await operation . execute ( server , session ) ;
156- } finally {
157- if ( session ?. owner != null && session . owner === owner ) {
158- try {
159- await session . endSession ( ) ;
160- } catch ( error ) {
161- squashError ( error ) ;
162- }
163- }
164- }
165- }
196+ const hasReadAspect = operation . hasAspect ( Aspect . READ_OPERATION ) ;
197+ const hasWriteAspect = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
198+ const inTransaction = session ?. inTransaction ( ) ?? false ;
166199
167200 const willRetryRead = topology . s . options . retryReads && ! inTransaction && operation . canRetryRead ;
168201
@@ -172,105 +205,76 @@ export async function executeOperation<
172205 supportsRetryableWrites ( server ) &&
173206 operation . canRetryWrite ;
174207
175- const willRetry = ( hasReadAspect && willRetryRead ) || ( hasWriteAspect && willRetryWrite ) ;
208+ const willRetry =
209+ operation . hasAspect ( Aspect . RETRYABLE ) &&
210+ session != null &&
211+ ( ( hasReadAspect && willRetryRead ) || ( hasWriteAspect && willRetryWrite ) ) ;
176212
177- if ( hasWriteAspect && willRetryWrite ) {
213+ if ( hasWriteAspect && willRetryWrite && session != null ) {
178214 operation . options . willRetryWrite = true ;
179215 session . incrementTransactionNumber ( ) ;
180216 }
181217
182- try {
183- return await operation . execute ( server , session ) ;
184- } catch ( operationError ) {
185- if ( willRetry && operationError instanceof MongoError ) {
186- return await retryOperation ( operation , operationError , {
187- session,
188- topology,
189- selector,
190- previousServer : server . description
191- } ) ;
192- }
193- throw operationError ;
194- } finally {
195- if ( session ?. owner != null && session . owner === owner ) {
196- try {
197- await session . endSession ( ) ;
198- } catch ( error ) {
199- squashError ( error ) ;
200- }
201- }
202- }
203- }
218+ // TODO(NODE-6231): implement infinite retry within CSOT timeout here
219+ const maxTries = willRetry ? 2 : 1 ;
220+ let previousOperationError : MongoError | undefined ;
221+ let previousServer : ServerDescription | undefined ;
204222
205- /** @internal */
206- type RetryOptions = {
207- session : ClientSession ;
208- topology : Topology ;
209- selector : ReadPreference | ServerSelector ;
210- previousServer : ServerDescription ;
211- } ;
223+ // TODO(NODE-6231): implement infinite retry within CSOT timeout here
224+ for ( let tries = 0 ; tries < maxTries ; tries ++ ) {
225+ if ( previousOperationError ) {
226+ if ( hasWriteAspect && previousOperationError . code === MMAPv1_RETRY_WRITES_ERROR_CODE ) {
227+ throw new MongoServerError ( {
228+ message : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
229+ errmsg : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
230+ originalError : previousOperationError
231+ } ) ;
232+ }
212233
213- async function retryOperation <
214- T extends AbstractOperation < TResult > ,
215- TResult = ResultTypeFromOperation < T >
216- > (
217- operation : T ,
218- originalError : MongoError ,
219- { session, topology, selector, previousServer } : RetryOptions
220- ) : Promise < TResult > {
221- const isWriteOperation = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
222- const isReadOperation = operation . hasAspect ( Aspect . READ_OPERATION ) ;
234+ if ( hasWriteAspect && ! isRetryableWriteError ( previousOperationError ) )
235+ throw previousOperationError ;
223236
224- if ( isWriteOperation && originalError . code === MMAPv1_RETRY_WRITES_ERROR_CODE ) {
225- throw new MongoServerError ( {
226- message : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
227- errmsg : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
228- originalError
229- } ) ;
230- }
237+ if ( hasReadAspect && ! isRetryableReadError ( previousOperationError ) )
238+ throw previousOperationError ;
231239
232- if ( isWriteOperation && ! isRetryableWriteError ( originalError ) ) {
233- throw originalError ;
234- }
235-
236- if ( isReadOperation && ! isRetryableReadError ( originalError ) ) {
237- throw originalError ;
238- }
240+ if (
241+ previousOperationError instanceof MongoNetworkError &&
242+ operation . hasAspect ( Aspect . CURSOR_CREATING ) &&
243+ session != null &&
244+ session . isPinned &&
245+ ! session . inTransaction ( )
246+ ) {
247+ session . unpin ( { force : true , forceClear : true } ) ;
248+ }
239249
240- if (
241- originalError instanceof MongoNetworkError &&
242- session . isPinned &&
243- ! session . inTransaction ( ) &&
244- operation . hasAspect ( Aspect . CURSOR_CREATING )
245- ) {
246- // If we have a cursor and the initial command fails with a network error,
247- // we can retry it on another connection. So we need to check it back in, clear the
248- // pool for the service id, and retry again.
249- session . unpin ( { force : true , forceClear : true } ) ;
250- }
250+ server = await topology . selectServer ( selector , {
251+ session,
252+ operationName : operation . commandName ,
253+ previousServer
254+ } ) ;
251255
252- // select a new server, and attempt to retry the operation
253- const server = await topology . selectServer ( selector , {
254- session ,
255- operationName : operation . commandName ,
256- previousServer
257- } ) ;
256+ if ( hasWriteAspect && ! supportsRetryableWrites ( server ) ) {
257+ throw new MongoUnexpectedServerResponseError (
258+ 'Selected server does not support retryable writes'
259+ ) ;
260+ }
261+ }
258262
259- if ( isWriteOperation && ! supportsRetryableWrites ( server ) ) {
260- throw new MongoUnexpectedServerResponseError (
261- 'Selected server does not support retryable writes'
262- ) ;
263- }
263+ try {
264+ return await operation . execute ( server , session ) ;
265+ } catch ( operationError ) {
266+ if ( ! ( operationError instanceof MongoError ) ) throw operationError ;
264267
265- try {
266- return await operation . execute ( server , session ) ;
267- } catch ( retryError ) {
268- if (
269- retryError instanceof MongoError &&
270- retryError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed )
271- ) {
272- throw originalError ;
268+ if (
269+ previousOperationError != null &&
270+ operationError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed )
271+ ) {
272+ throw previousOperationError ;
273+ }
274+ previousServer = server . description ;
275+ previousOperationError = operationError ;
273276 }
274- throw retryError ;
275277 }
278+
279+ throw previousOperationError ;
276280}
0 commit comments