diff --git a/src/sessions.ts b/src/sessions.ts index bd10e7e07c7..1fc88b468f4 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -46,7 +46,7 @@ import { squashError, uuidV4 } from './utils'; -import { WriteConcern } from './write_concern'; +import { WriteConcern, type WriteConcernOptions, type WriteConcernSettings } from './write_concern'; const minWireVersionForShardedTransactions = 8; @@ -443,14 +443,167 @@ export class ClientSession * Commits the currently active transaction in this session. */ async commitTransaction(): Promise { - return await endTransaction(this, 'commitTransaction'); + if (this.transaction.state === TxnState.NO_TRANSACTION) { + throw new MongoTransactionError('No transaction started'); + } + + if ( + this.transaction.state === TxnState.STARTING_TRANSACTION || + this.transaction.state === TxnState.TRANSACTION_COMMITTED_EMPTY + ) { + // the transaction was never started, we can safely exit here + this.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY); + return; + } + + if (this.transaction.state === TxnState.TRANSACTION_ABORTED) { + throw new MongoTransactionError( + 'Cannot call commitTransaction after calling abortTransaction' + ); + } + + const command: { + commitTransaction: 1; + writeConcern?: WriteConcernSettings; + recoveryToken?: Document; + maxTimeMS?: number; + } = { commitTransaction: 1 }; + + const wc = this.transaction.options.writeConcern ?? this.clientOptions?.writeConcern; + if (wc != null) { + WriteConcern.apply(command, { wtimeoutMS: 10000, w: 'majority', ...wc }); + } + + if (this.transaction.state === TxnState.TRANSACTION_COMMITTED) { + WriteConcern.apply(command, { wtimeoutMS: 10000, ...wc, w: 'majority' }); + } + + if (typeof this.transaction.options.maxTimeMS === 'number') { + command.maxTimeMS = this.transaction.options.maxTimeMS; + } + + if (this.transaction.recoveryToken) { + command.recoveryToken = this.transaction.recoveryToken; + } + + const operation = new RunAdminCommandOperation(command, { + session: this, + readPreference: ReadPreference.primary, + bypassPinningCheck: true + }); + + try { + await executeOperation(this.client, operation); + return; + } catch (firstCommitError) { + if (firstCommitError instanceof MongoError && isRetryableWriteError(firstCommitError)) { + // SPEC-1185: apply majority write concern when retrying commitTransaction + WriteConcern.apply(command, { wtimeoutMS: 10000, ...wc, w: 'majority' }); + // per txns spec, must unpin session in this case + this.unpin({ force: true }); + + try { + await executeOperation(this.client, operation); + return; + } catch (retryCommitError) { + // If the retry failed, we process that error instead of the original + if (shouldAddUnknownTransactionCommitResultLabel(retryCommitError)) { + retryCommitError.addErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult); + } + + if (shouldUnpinAfterCommitError(retryCommitError)) { + this.unpin({ error: retryCommitError }); + } + + throw retryCommitError; + } + } + + if (shouldAddUnknownTransactionCommitResultLabel(firstCommitError)) { + firstCommitError.addErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult); + } + + if (shouldUnpinAfterCommitError(firstCommitError)) { + this.unpin({ error: firstCommitError }); + } + + throw firstCommitError; + } finally { + this.transaction.transition(TxnState.TRANSACTION_COMMITTED); + } } /** * Aborts the currently active transaction in this session. */ async abortTransaction(): Promise { - return await endTransaction(this, 'abortTransaction'); + if (this.transaction.state === TxnState.NO_TRANSACTION) { + throw new MongoTransactionError('No transaction started'); + } + + if (this.transaction.state === TxnState.STARTING_TRANSACTION) { + // the transaction was never started, we can safely exit here + this.transaction.transition(TxnState.TRANSACTION_ABORTED); + return; + } + + if (this.transaction.state === TxnState.TRANSACTION_ABORTED) { + throw new MongoTransactionError('Cannot call abortTransaction twice'); + } + + if ( + this.transaction.state === TxnState.TRANSACTION_COMMITTED || + this.transaction.state === TxnState.TRANSACTION_COMMITTED_EMPTY + ) { + throw new MongoTransactionError( + 'Cannot call abortTransaction after calling commitTransaction' + ); + } + + const command: { + abortTransaction: 1; + writeConcern?: WriteConcernOptions; + recoveryToken?: Document; + } = { abortTransaction: 1 }; + + const wc = this.transaction.options.writeConcern ?? this.clientOptions?.writeConcern; + if (wc != null) { + WriteConcern.apply(command, { wtimeoutMS: 10000, w: 'majority', ...wc }); + } + + if (this.transaction.recoveryToken) { + command.recoveryToken = this.transaction.recoveryToken; + } + + const operation = new RunAdminCommandOperation(command, { + session: this, + readPreference: ReadPreference.primary, + bypassPinningCheck: true + }); + + try { + await executeOperation(this.client, operation); + this.unpin(); + return; + } catch (firstAbortError) { + this.unpin(); + + if (firstAbortError instanceof MongoError && isRetryableWriteError(firstAbortError)) { + try { + await executeOperation(this.client, operation); + return; + } catch (secondAbortError) { + // we do not retry the retry + } + } + + // The spec indicates that if the operation times out or fails with a non-retryable error, we should ignore all errors on `abortTransaction` + } finally { + this.transaction.transition(TxnState.TRANSACTION_ABORTED); + if (this.loadBalanced) { + maybeClearPinnedConnection(this, { force: false }); + } + } } /** @@ -496,25 +649,132 @@ export class ClientSession fn: WithTransactionCallback, options?: TransactionOptions ): Promise { + const MAX_TIMEOUT = 120000; const startTime = now(); - return await attemptTransaction(this, startTime, fn, options); + + let committed = false; + let result: any; + + while (!committed) { + this.startTransaction(options); // may throw on error + + try { + const promise = fn(this); + if (!isPromiseLike(promise)) { + throw new MongoInvalidArgumentError( + 'Function provided to `withTransaction` must return a Promise' + ); + } + + result = await promise; + + if ( + this.transaction.state === TxnState.NO_TRANSACTION || + this.transaction.state === TxnState.TRANSACTION_COMMITTED || + this.transaction.state === TxnState.TRANSACTION_ABORTED + ) { + // Assume callback intentionally ended the transaction + return result; + } + } catch (fnError) { + if (!(fnError instanceof MongoError) || fnError instanceof MongoInvalidArgumentError) { + await this.abortTransaction(); + throw fnError; + } + + if ( + this.transaction.state === TxnState.STARTING_TRANSACTION || + this.transaction.state === TxnState.TRANSACTION_IN_PROGRESS + ) { + await this.abortTransaction(); + } + + if ( + fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) && + now() - startTime < MAX_TIMEOUT + ) { + continue; + } + + throw fnError; + } + + while (!committed) { + try { + /* + * We will rely on ClientSession.commitTransaction() to + * apply a majority write concern if commitTransaction is + * being retried (see: DRIVERS-601) + */ + await this.commitTransaction(); + committed = true; + } catch (commitError) { + /* + * Note: a maxTimeMS error will have the MaxTimeMSExpired + * code (50) and can be reported as a top-level error or + * inside writeConcernError, ex. + * { ok:0, code: 50, codeName: 'MaxTimeMSExpired' } + * { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } } + */ + if ( + !isMaxTimeMSExpiredError(commitError) && + commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) && + now() - startTime < MAX_TIMEOUT + ) { + continue; + } + + if ( + commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) && + now() - startTime < MAX_TIMEOUT + ) { + break; + } + + throw commitError; + } + } + } + + return result; } } configureResourceManagement(ClientSession.prototype); -const MAX_WITH_TRANSACTION_TIMEOUT = 120000; const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([ 'CannotSatisfyWriteConcern', 'UnknownReplWriteConcern', 'UnsatisfiableWriteConcern' ]); -function hasNotTimedOut(startTime: number, max: number) { - return calculateDurationInMs(startTime) < max; +function shouldUnpinAfterCommitError(commitError: Error) { + if (commitError instanceof MongoError) { + if ( + isRetryableWriteError(commitError) || + commitError instanceof MongoWriteConcernError || + isMaxTimeMSExpiredError(commitError) + ) { + if (isUnknownTransactionCommitResult(commitError)) { + // per txns spec, must unpin session in this case + return true; + } + } else if (commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) { + return true; + } + } + return false; +} + +function shouldAddUnknownTransactionCommitResultLabel(commitError: MongoError) { + let ok = isRetryableWriteError(commitError); + ok ||= commitError instanceof MongoWriteConcernError; + ok ||= isMaxTimeMSExpiredError(commitError); + ok &&= isUnknownTransactionCommitResult(commitError); + return ok; } -function isUnknownTransactionCommitResult(err: MongoError) { +function isUnknownTransactionCommitResult(err: MongoError): err is MongoError { const isNonDeterministicWriteConcernError = err instanceof MongoServerError && err.codeName && @@ -569,282 +829,17 @@ export function maybeClearPinnedConnection( } } -function isMaxTimeMSExpiredError(err: MongoError) { +function isMaxTimeMSExpiredError(err: MongoError): boolean { if (err == null || !(err instanceof MongoServerError)) { return false; } return ( err.code === MONGODB_ERROR_CODES.MaxTimeMSExpired || - (err.writeConcernError && err.writeConcernError.code === MONGODB_ERROR_CODES.MaxTimeMSExpired) + err.writeConcernError?.code === MONGODB_ERROR_CODES.MaxTimeMSExpired ); } -async function attemptTransactionCommit( - session: ClientSession, - startTime: number, - fn: WithTransactionCallback, - result: T, - options: TransactionOptions -): Promise { - try { - await session.commitTransaction(); - return result; - } catch (commitErr) { - if ( - commitErr instanceof MongoError && - hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) && - !isMaxTimeMSExpiredError(commitErr) - ) { - if (commitErr.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)) { - return await attemptTransactionCommit(session, startTime, fn, result, options); - } - - if (commitErr.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) { - return await attemptTransaction(session, startTime, fn, options); - } - } - - throw commitErr; - } -} - -const USER_EXPLICIT_TXN_END_STATES = new Set([ - TxnState.NO_TRANSACTION, - TxnState.TRANSACTION_COMMITTED, - TxnState.TRANSACTION_ABORTED -]); - -function userExplicitlyEndedTransaction(session: ClientSession) { - return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state); -} - -async function attemptTransaction( - session: ClientSession, - startTime: number, - fn: WithTransactionCallback, - options: TransactionOptions = {} -): Promise { - session.startTransaction(options); - - let promise; - try { - promise = fn(session); - } catch (err) { - promise = Promise.reject(err); - } - - if (!isPromiseLike(promise)) { - try { - await session.abortTransaction(); - } catch (error) { - squashError(error); - } - throw new MongoInvalidArgumentError( - 'Function provided to `withTransaction` must return a Promise' - ); - } - - try { - const result = await promise; - if (userExplicitlyEndedTransaction(session)) { - return result; - } - return await attemptTransactionCommit(session, startTime, fn, result, options); - } catch (err) { - if (session.inTransaction()) { - await session.abortTransaction(); - } - - if ( - err instanceof MongoError && - err.hasErrorLabel(MongoErrorLabel.TransientTransactionError) && - hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) - ) { - return await attemptTransaction(session, startTime, fn, options); - } - - if (isMaxTimeMSExpiredError(err)) { - err.addErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult); - } - - throw err; - } -} - -async function endTransaction( - session: ClientSession, - commandName: 'abortTransaction' | 'commitTransaction' -): Promise { - // handle any initial problematic cases - const txnState = session.transaction.state; - - if (txnState === TxnState.NO_TRANSACTION) { - throw new MongoTransactionError('No transaction started'); - } - - if (commandName === 'commitTransaction') { - if ( - txnState === TxnState.STARTING_TRANSACTION || - txnState === TxnState.TRANSACTION_COMMITTED_EMPTY - ) { - // the transaction was never started, we can safely exit here - session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY); - return; - } - - if (txnState === TxnState.TRANSACTION_ABORTED) { - throw new MongoTransactionError( - 'Cannot call commitTransaction after calling abortTransaction' - ); - } - } else { - if (txnState === TxnState.STARTING_TRANSACTION) { - // the transaction was never started, we can safely exit here - session.transaction.transition(TxnState.TRANSACTION_ABORTED); - return; - } - - if (txnState === TxnState.TRANSACTION_ABORTED) { - throw new MongoTransactionError('Cannot call abortTransaction twice'); - } - - if ( - txnState === TxnState.TRANSACTION_COMMITTED || - txnState === TxnState.TRANSACTION_COMMITTED_EMPTY - ) { - throw new MongoTransactionError( - 'Cannot call abortTransaction after calling commitTransaction' - ); - } - } - - // construct and send the command - const command: Document = { [commandName]: 1 }; - - // apply a writeConcern if specified - let writeConcern; - if (session.transaction.options.writeConcern) { - writeConcern = Object.assign({}, session.transaction.options.writeConcern); - } else if (session.clientOptions && session.clientOptions.writeConcern) { - writeConcern = { w: session.clientOptions.writeConcern.w }; - } - - if (txnState === TxnState.TRANSACTION_COMMITTED) { - writeConcern = Object.assign({ wtimeoutMS: 10000 }, writeConcern, { w: 'majority' }); - } - - if (writeConcern) { - WriteConcern.apply(command, writeConcern); - } - - if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) { - Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS }); - } - - if (session.transaction.recoveryToken) { - command.recoveryToken = session.transaction.recoveryToken; - } - - try { - // send the command - await executeOperation( - session.client, - new RunAdminCommandOperation(command, { - session, - readPreference: ReadPreference.primary, - bypassPinningCheck: true - }) - ); - if (command.abortTransaction) { - // always unpin on abort regardless of command outcome - session.unpin(); - } - if (commandName !== 'commitTransaction') { - session.transaction.transition(TxnState.TRANSACTION_ABORTED); - if (session.loadBalanced) { - maybeClearPinnedConnection(session, { force: false }); - } - } else { - session.transaction.transition(TxnState.TRANSACTION_COMMITTED); - } - } catch (firstAttemptErr) { - if (command.abortTransaction) { - // always unpin on abort regardless of command outcome - session.unpin(); - } - if (firstAttemptErr instanceof MongoError && isRetryableWriteError(firstAttemptErr)) { - // SPEC-1185: apply majority write concern when retrying commitTransaction - if (command.commitTransaction) { - // per txns spec, must unpin session in this case - session.unpin({ force: true }); - - command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, { - w: 'majority' - }); - } - - try { - await executeOperation( - session.client, - new RunAdminCommandOperation(command, { - session, - readPreference: ReadPreference.primary, - bypassPinningCheck: true - }) - ); - if (commandName !== 'commitTransaction') { - session.transaction.transition(TxnState.TRANSACTION_ABORTED); - if (session.loadBalanced) { - maybeClearPinnedConnection(session, { force: false }); - } - } else { - session.transaction.transition(TxnState.TRANSACTION_COMMITTED); - } - } catch (secondAttemptErr) { - handleEndTransactionError(session, commandName, secondAttemptErr); - } - } else { - handleEndTransactionError(session, commandName, firstAttemptErr); - } - } -} - -function handleEndTransactionError( - session: ClientSession, - commandName: 'abortTransaction' | 'commitTransaction', - error: Error -) { - if (commandName !== 'commitTransaction') { - session.transaction.transition(TxnState.TRANSACTION_ABORTED); - if (session.loadBalanced) { - maybeClearPinnedConnection(session, { force: false }); - } - // The spec indicates that if the operation times out or fails with a non-retryable error, we should ignore all errors on `abortTransaction` - return; - } - - session.transaction.transition(TxnState.TRANSACTION_COMMITTED); - if (error instanceof MongoError) { - if ( - isRetryableWriteError(error) || - error instanceof MongoWriteConcernError || - isMaxTimeMSExpiredError(error) - ) { - if (isUnknownTransactionCommitResult(error)) { - error.addErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult); - - // per txns spec, must unpin session in this case - session.unpin({ error }); - } - } else if (error.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) { - session.unpin({ error }); - } - } - - throw error; -} - /** @public */ export type ServerSessionId = { id: Binary };