-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat(NODE-6389): add support for timeoutMS in StateMachine.execute() #4243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 36 commits
3c2ec0a
909578f
e101750
e4efd3f
22082c9
bf95fa4
c63d102
1eab23d
4c4b0a9
558d416
3ed4a14
d3438ea
ff561e3
164780c
12a7e2e
999f23d
0355404
5ef3d69
7139b8f
acfb4fc
4efff95
1997f81
cc3ef8f
38affae
738188b
c4a7c2c
5aa6d4c
17a2fde
aead2f1
88ca990
2e3a84c
e6e9fb4
3dc383b
702a03e
3b6a23b
5560a1b
601c159
096f154
5aba790
903e0d0
709f725
cb12f64
01aca89
6ea56d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,7 +12,9 @@ import { | |
| } from '../bson'; | ||
| import { type ProxyOptions } from '../cmap/connection'; | ||
| import { getSocks, type SocksLib } from '../deps'; | ||
| import { MongoOperationTimeoutError } from '../error'; | ||
| import { type MongoClient, type MongoClientOptions } from '../mongo_client'; | ||
| import { Timeout, type TimeoutContext, TimeoutError } from '../timeout'; | ||
| import { BufferPool, MongoDBCollectionNamespace, promiseWithResolvers } from '../utils'; | ||
| import { autoSelectSocketOptions, type DataKey } from './client_encryption'; | ||
| import { MongoCryptError } from './errors'; | ||
|
|
@@ -173,6 +175,7 @@ export type StateMachineOptions = { | |
| * An internal class that executes across a MongoCryptContext until either | ||
| * a finishing state or an error is reached. Do not instantiate directly. | ||
| */ | ||
| // TODO(DRIVERS-2671): clarify CSOT behavior for FLE APIs | ||
| export class StateMachine { | ||
| constructor( | ||
| private options: StateMachineOptions, | ||
|
|
@@ -182,7 +185,11 @@ export class StateMachine { | |
| /** | ||
| * Executes the state machine according to the specification | ||
| */ | ||
| async execute(executor: StateMachineExecutable, context: MongoCryptContext): Promise<Uint8Array> { | ||
| async execute( | ||
| executor: StateMachineExecutable, | ||
| context: MongoCryptContext, | ||
| timeoutContext?: TimeoutContext | ||
| ): Promise<Uint8Array> { | ||
| const keyVaultNamespace = executor._keyVaultNamespace; | ||
| const keyVaultClient = executor._keyVaultClient; | ||
| const metaDataClient = executor._metaDataClient; | ||
|
|
@@ -201,8 +208,13 @@ export class StateMachine { | |
| 'unreachable state machine state: entered MONGOCRYPT_CTX_NEED_MONGO_COLLINFO but metadata client is undefined' | ||
| ); | ||
| } | ||
| const collInfo = await this.fetchCollectionInfo(metaDataClient, context.ns, filter); | ||
|
|
||
| const collInfo = await this.fetchCollectionInfo( | ||
| metaDataClient, | ||
| context.ns, | ||
| filter, | ||
| timeoutContext | ||
| ); | ||
| if (collInfo) { | ||
| context.addMongoOperationResponse(collInfo); | ||
| } | ||
|
|
@@ -222,9 +234,9 @@ export class StateMachine { | |
| // When we are using the shared library, we don't have a mongocryptd manager. | ||
| const markedCommand: Uint8Array = mongocryptdManager | ||
| ? await mongocryptdManager.withRespawn( | ||
| this.markCommand.bind(this, mongocryptdClient, context.ns, command) | ||
| this.markCommand.bind(this, mongocryptdClient, context.ns, command, timeoutContext) | ||
| ) | ||
| : await this.markCommand(mongocryptdClient, context.ns, command); | ||
| : await this.markCommand(mongocryptdClient, context.ns, command, timeoutContext); | ||
|
|
||
| context.addMongoOperationResponse(markedCommand); | ||
| context.finishMongoOperation(); | ||
|
|
@@ -233,7 +245,12 @@ export class StateMachine { | |
|
|
||
| case MONGOCRYPT_CTX_NEED_MONGO_KEYS: { | ||
| const filter = context.nextMongoOperation(); | ||
| const keys = await this.fetchKeys(keyVaultClient, keyVaultNamespace, filter); | ||
| const keys = await this.fetchKeys( | ||
| keyVaultClient, | ||
| keyVaultNamespace, | ||
| filter, | ||
| timeoutContext | ||
| ); | ||
|
|
||
| if (keys.length === 0) { | ||
| // See docs on EMPTY_V | ||
|
|
@@ -255,9 +272,7 @@ export class StateMachine { | |
| } | ||
|
|
||
| case MONGOCRYPT_CTX_NEED_KMS: { | ||
| const requests = Array.from(this.requests(context)); | ||
| await Promise.all(requests); | ||
|
|
||
| await Promise.all(this.requests(context, timeoutContext)); | ||
| context.finishKMSRequests(); | ||
| break; | ||
| } | ||
|
|
@@ -299,7 +314,7 @@ export class StateMachine { | |
| * @param kmsContext - A C++ KMS context returned from the bindings | ||
| * @returns A promise that resolves when the KMS reply has be fully parsed | ||
| */ | ||
| async kmsRequest(request: MongoCryptKMSRequest): Promise<void> { | ||
| async kmsRequest(request: MongoCryptKMSRequest, timeoutContext?: TimeoutContext): Promise<void> { | ||
| const parsedUrl = request.endpoint.split(':'); | ||
| const port = parsedUrl[1] != null ? Number.parseInt(parsedUrl[1], 10) : HTTPS_PORT; | ||
| const socketOptions = autoSelectSocketOptions(this.options.socketOptions || {}); | ||
|
|
@@ -329,10 +344,6 @@ export class StateMachine { | |
| } | ||
| } | ||
|
|
||
| function ontimeout() { | ||
| return new MongoCryptError('KMS request timed out'); | ||
| } | ||
|
|
||
| function onerror(cause: Error) { | ||
| return new MongoCryptError('KMS request failed', { cause }); | ||
| } | ||
|
|
@@ -364,7 +375,6 @@ export class StateMachine { | |
| resolve: resolveOnNetSocketConnect | ||
| } = promiseWithResolvers<void>(); | ||
| netSocket | ||
| .once('timeout', () => rejectOnNetSocketError(ontimeout())) | ||
| .once('error', err => rejectOnNetSocketError(onerror(err))) | ||
| .once('close', () => rejectOnNetSocketError(onclose())) | ||
| .once('connect', () => resolveOnNetSocketConnect()); | ||
|
|
@@ -410,8 +420,8 @@ export class StateMachine { | |
| reject: rejectOnTlsSocketError, | ||
| resolve | ||
| } = promiseWithResolvers<void>(); | ||
|
|
||
| socket | ||
| .once('timeout', () => rejectOnTlsSocketError(ontimeout())) | ||
| .once('error', err => rejectOnTlsSocketError(onerror(err))) | ||
| .once('close', () => rejectOnTlsSocketError(onclose())) | ||
| .on('data', data => { | ||
|
|
@@ -425,20 +435,30 @@ export class StateMachine { | |
| resolve(); | ||
| } | ||
| }); | ||
| await willResolveKmsRequest; | ||
|
|
||
| if (timeoutContext?.csotEnabled() && timeoutContext?.remainingTimeMS <= 0) { | ||
| throw new MongoOperationTimeoutError('Timed out before KMS request.'); | ||
| } | ||
| await (timeoutContext?.csotEnabled() | ||
| ? Promise.all([willResolveKmsRequest, Timeout.expires(timeoutContext?.remainingTimeMS)]) | ||
| : willResolveKmsRequest); | ||
| } catch (error) { | ||
| if (error instanceof TimeoutError) | ||
| throw new MongoOperationTimeoutError('KMS request timed out'); | ||
| throw error; | ||
| } finally { | ||
| // There's no need for any more activity on this socket at this point. | ||
| destroySockets(); | ||
| } | ||
| } | ||
|
|
||
| *requests(context: MongoCryptContext) { | ||
| *requests(context: MongoCryptContext, timeoutContext?: TimeoutContext) { | ||
| for ( | ||
| let request = context.nextKMSRequest(); | ||
| request != null; | ||
| request = context.nextKMSRequest() | ||
| ) { | ||
| yield this.kmsRequest(request); | ||
| yield this.kmsRequest(request, timeoutContext); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -498,15 +518,23 @@ export class StateMachine { | |
| async fetchCollectionInfo( | ||
| client: MongoClient, | ||
| ns: string, | ||
| filter: Document | ||
| filter: Document, | ||
| timeoutContext?: TimeoutContext | ||
| ): Promise<Uint8Array | null> { | ||
| const { db } = MongoDBCollectionNamespace.fromString(ns); | ||
|
|
||
| if (timeoutContext?.csotEnabled() && timeoutContext?.remainingTimeMS <= 0) { | ||
| throw new MongoOperationTimeoutError( | ||
| 'Timed out before call to mongocryptd listCollections operation.' | ||
| ); | ||
| } | ||
|
|
||
| const collections = await client | ||
| .db(db) | ||
| .listCollections(filter, { | ||
| promoteLongs: false, | ||
| promoteValues: false | ||
| promoteValues: false, | ||
| timeoutMS: timeoutContext?.csotEnabled() ? timeoutContext?.remainingTimeMS : undefined | ||
| }) | ||
| .toArray(); | ||
|
|
||
|
|
@@ -522,12 +550,27 @@ export class StateMachine { | |
| * @param command - The command to execute. | ||
| * @param callback - Invoked with the serialized and marked bson command, or with an error | ||
| */ | ||
| async markCommand(client: MongoClient, ns: string, command: Uint8Array): Promise<Uint8Array> { | ||
| const options = { promoteLongs: false, promoteValues: false }; | ||
| async markCommand( | ||
| client: MongoClient, | ||
| ns: string, | ||
| command: Uint8Array, | ||
| timeoutContext?: TimeoutContext | ||
| ): Promise<Uint8Array> { | ||
| const { db } = MongoDBCollectionNamespace.fromString(ns); | ||
| const rawCommand = deserialize(command, options); | ||
| const bsonOptions = { promoteLongs: false, promoteValues: false }; | ||
| const rawCommand = deserialize(command, bsonOptions); | ||
|
|
||
| const response = await client.db(db).command(rawCommand, options); | ||
| if (timeoutContext?.csotEnabled() && timeoutContext?.remainingTimeMS <= 0) { | ||
| throw new MongoOperationTimeoutError( | ||
| 'Timed out before call to mongocryptd markings request.' | ||
| ); | ||
| } | ||
| const response = await client.db(db).command(rawCommand, { | ||
| ...bsonOptions, | ||
| ...(timeoutContext?.csotEnabled() | ||
| ? { timeoutMS: timeoutContext?.remainingTimeMS } | ||
| : undefined) | ||
| }); | ||
|
|
||
| return serialize(response, this.bsonOptions); | ||
| } | ||
|
|
@@ -543,15 +586,21 @@ export class StateMachine { | |
| fetchKeys( | ||
| client: MongoClient, | ||
| keyVaultNamespace: string, | ||
| filter: Uint8Array | ||
| filter: Uint8Array, | ||
| timeoutContext?: TimeoutContext | ||
| ): Promise<Array<DataKey>> { | ||
| const { db: dbName, collection: collectionName } = | ||
| MongoDBCollectionNamespace.fromString(keyVaultNamespace); | ||
|
|
||
| if (timeoutContext?.csotEnabled() && timeoutContext?.remainingTimeMS <= 0) { | ||
baileympearson marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| throw new MongoOperationTimeoutError('Timed out before dataKey fetched.'); | ||
| } | ||
| return client | ||
| .db(dbName) | ||
| .collection<DataKey>(collectionName, { readConcern: { level: 'majority' } }) | ||
| .find(deserialize(filter)) | ||
| .find(deserialize(filter), { | ||
|
||
| timeoutMS: timeoutContext?.csotEnabled() ? timeoutContext?.remainingTimeMS : undefined | ||
baileympearson marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| }) | ||
| .toArray(); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.