Skip to content

Commit aeabbd7

Browse files
committed
wip
1 parent b59f17c commit aeabbd7

File tree

4 files changed

+103
-27
lines changed

4 files changed

+103
-27
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,22 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
7979
readPreference?: ReadPreferenceLike;
8080
readConcern?: ReadConcernLike;
8181
batchSize?: number;
82+
/**
83+
* For **`tailable=false` cursor** OR **`tailable=true && awaitData=false` cursor**,
84+
* - the driver MUST set `maxTimeMS` on the `find` command and MUST NOT set `maxTimeMS` on the `getMore` command.
85+
* - If `maxTimeMS` is not set in options, the driver SHOULD refrain from setting **maxTimeMS**
86+
*
87+
* For **`tailable=true && awaitData=true` cursor**
88+
* - the driver MUST provide a cursor level option named `maxAwaitTimeMS`.
89+
* - The `maxTimeMS` option on the `getMore` command MUST be set to the value of the option `maxAwaitTimeMS`.
90+
* - If no `maxAwaitTimeMS` is specified, the driver MUST not set `maxTimeMS` on the `getMore` command.
91+
* - `maxAwaitTimeMS` option is not set on the `aggregate` command nor `$changeStream` pipeline stage
92+
*
93+
* ## `maxCommitTimeMS`
94+
* Note, this option is an alias for the `maxTimeMS` commitTransaction command option.
95+
*/
8296
maxTimeMS?: number;
97+
maxAwaitTimeMS?: number;
8398
/**
8499
* Comment to apply to the operation.
85100
*
@@ -117,7 +132,7 @@ export abstract class AbstractCursor<
117132
CursorEvents extends AbstractCursorEvents = AbstractCursorEvents
118133
> extends TypedEventEmitter<CursorEvents> {
119134
/** @internal */
120-
[kId]?: Long;
135+
[kId]: Long;
121136
/** @internal */
122137
[kSession]: ClientSession;
123138
/** @internal */
@@ -155,6 +170,7 @@ export abstract class AbstractCursor<
155170
}
156171
this[kClient] = client;
157172
this[kNamespace] = namespace;
173+
this[kId] = Long.ZERO;
158174
this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230
159175
this[kInitialized] = false;
160176
this[kClosed] = false;
@@ -186,14 +202,18 @@ export abstract class AbstractCursor<
186202
this[kOptions].maxTimeMS = options.maxTimeMS;
187203
}
188204

205+
if (typeof options.maxAwaitTimeMS === 'number') {
206+
this[kOptions].maxAwaitTimeMS = options.maxAwaitTimeMS;
207+
}
208+
189209
if (options.session instanceof ClientSession) {
190210
this[kSession] = options.session;
191211
} else {
192212
this[kSession] = this[kClient].startSession({ owner: this, explicit: false });
193213
}
194214
}
195215

196-
get id(): Long | undefined {
216+
get id(): Long {
197217
return this[kId];
198218
}
199219

@@ -292,7 +312,7 @@ export abstract class AbstractCursor<
292312
hasNext(callback: Callback<boolean>): void;
293313
hasNext(callback?: Callback<boolean>): Promise<boolean> | void {
294314
return maybePromise(callback, done => {
295-
if (this[kId] === Long.ZERO) {
315+
if (this[kId].isZero()) {
296316
return done(undefined, false);
297317
}
298318

@@ -320,7 +340,7 @@ export abstract class AbstractCursor<
320340
next(callback?: Callback<TSchema | null>): Promise<TSchema | null> | void;
321341
next(callback?: Callback<TSchema | null>): Promise<TSchema | null> | void {
322342
return maybePromise(callback, done => {
323-
if (this[kId] === Long.ZERO) {
343+
if (this[kId].isZero()) {
324344
return done(new MongoCursorExhaustedError());
325345
}
326346

@@ -335,7 +355,7 @@ export abstract class AbstractCursor<
335355
tryNext(callback: Callback<TSchema | null>): void;
336356
tryNext(callback?: Callback<TSchema | null>): Promise<TSchema | null> | void {
337357
return maybePromise(callback, done => {
338-
if (this[kId] === Long.ZERO) {
358+
if (this[kId].isZero()) {
339359
return done(new MongoCursorExhaustedError());
340360
}
341361

@@ -617,21 +637,7 @@ export abstract class AbstractCursor<
617637

618638
/** @internal */
619639
_getMore(batchSize: number, callback: Callback<Document>): void {
620-
const cursorId = this[kId];
621-
const cursorNs = this[kNamespace];
622-
const server = this[kServer];
623-
624-
if (cursorId == null) {
625-
callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
626-
return;
627-
}
628-
629-
if (server == null) {
630-
callback(new MongoRuntimeError('Unable to iterate cursor without selected server'));
631-
return;
632-
}
633-
634-
const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, {
640+
const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId], this[kServer], {
635641
...this[kOptions],
636642
session: this[kSession],
637643
batchSize

src/operations/command.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export interface CommandOperationOptions
4444
readConcern?: ReadConcernLike;
4545
/** Collation */
4646
collation?: CollationOptions;
47+
/** TODO This is probably in the wrong place................. specs only mention this being a thing for createIndex/dropIndex */
4748
maxTimeMS?: number;
4849
/**
4950
* Comment to apply to the operation.

src/operations/get_more.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export class GetMoreOperation extends AbstractOperation {
3939
cursorId: Long;
4040
override options: GetMoreOptions;
4141

42-
constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) {
42+
constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions) {
4343
super(options);
4444

4545
this.options = options;
@@ -63,6 +63,10 @@ export class GetMoreOperation extends AbstractOperation {
6363
);
6464
}
6565

66+
if (this.cursorId == null || this.cursorId.isZero()) {
67+
return callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
68+
}
69+
6670
const collection = this.ns.collection;
6771
if (collection == null) {
6872
// Cursors should have adopted the namespace returned by MongoDB

test/integration/change-streams/change_stream.test.ts

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,7 +1111,7 @@ describe('Change Streams', function () {
11111111
changeStream.next((err, doc) => {
11121112
expect(err).to.exist;
11131113
expect(doc).to.not.exist;
1114-
expect(err.message).to.equal('ChangeStream is closed');
1114+
expect(err?.message).to.equal('ChangeStream is closed');
11151115
changeStream.close(() => client.close(done));
11161116
});
11171117
});
@@ -1372,23 +1372,88 @@ describe('Change Streams', function () {
13721372
)
13731373
.run();
13741374

1375+
UnifiedTestSuiteBuilder.describe('entity.watch() server-side options')
1376+
.createEntities([
1377+
{ client: { id: 'client0', observeEvents: ['commandStartedEvent'] } },
1378+
{ database: { id: 'db0', client: 'client0', databaseName: 'watchOpts' } },
1379+
{ collection: { id: 'collection0', database: 'db0', collectionName: 'watchOpts' } }
1380+
])
1381+
.test(
1382+
TestBuilder.it('should send maxAwaitTimeMS to the server')
1383+
.operation({
1384+
object: 'collection0',
1385+
name: 'createChangeStream',
1386+
saveResultAsEntity: 'changeStreamOnClient',
1387+
arguments: { maxAwaitTimeMS: 5000 }
1388+
})
1389+
.operation({
1390+
name: 'insertOne',
1391+
object: 'collection0',
1392+
arguments: { document: { a: 1 } },
1393+
ignoreResultAndError: true
1394+
})
1395+
.operation({
1396+
object: 'changeStreamOnClient',
1397+
name: 'iterateUntilDocumentOrError',
1398+
ignoreResultAndError: true
1399+
})
1400+
.expectEvents({
1401+
client: 'client0',
1402+
events: [
1403+
{ commandStartedEvent: { commandName: 'aggregate' } },
1404+
{ commandStartedEvent: { commandName: 'insert' } },
1405+
{ commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 5000 } } }
1406+
]
1407+
})
1408+
.toJSON()
1409+
)
1410+
.test(
1411+
TestBuilder.it('should send maxTimeMS to the server')
1412+
.operation({
1413+
object: 'collection0',
1414+
name: 'createChangeStream',
1415+
saveResultAsEntity: 'changeStreamOnClient',
1416+
arguments: { maxTimeMS: 5000 }
1417+
})
1418+
.operation({
1419+
name: 'insertOne',
1420+
object: 'collection0',
1421+
arguments: { document: { a: 1 } },
1422+
ignoreResultAndError: true
1423+
})
1424+
.operation({
1425+
object: 'changeStreamOnClient',
1426+
name: 'iterateUntilDocumentOrError',
1427+
ignoreResultAndError: true
1428+
})
1429+
.expectEvents({
1430+
client: 'client0',
1431+
events: [
1432+
{ commandStartedEvent: { commandName: 'aggregate' } },
1433+
{ commandStartedEvent: { commandName: 'insert' } },
1434+
{ commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 5000 } } }
1435+
]
1436+
})
1437+
.toJSON()
1438+
)
1439+
.run();
1440+
13751441
describe('BSON Options', function () {
13761442
let client: MongoClient;
13771443
let db: Db;
13781444
let collection: Collection;
13791445
let cs: ChangeStream;
1446+
13801447
beforeEach(async function () {
13811448
client = await this.configuration.newClient({ monitorCommands: true }).connect();
13821449
db = client.db('db');
13831450
collection = await db.createCollection('collection');
13841451
});
1452+
13851453
afterEach(async function () {
13861454
await db.dropCollection('collection');
13871455
await cs.close();
13881456
await client.close();
1389-
client = undefined;
1390-
db = undefined;
1391-
collection = undefined;
13921457
});
13931458

13941459
context('promoteLongs', () => {
@@ -1452,7 +1517,7 @@ describe('Change Streams', function () {
14521517
it('does not send invalid options on the aggregate command', {
14531518
metadata: { requires: { topology: '!single' } },
14541519
test: async function () {
1455-
const started = [];
1520+
const started: CommandStartedEvent[] = [];
14561521

14571522
client.on('commandStarted', filterForCommands(['aggregate'], started));
14581523
const doc = { invalidBSONOption: true };
@@ -1473,7 +1538,7 @@ describe('Change Streams', function () {
14731538
it('does not send invalid options on the getMore command', {
14741539
metadata: { requires: { topology: '!single' } },
14751540
test: async function () {
1476-
const started = [];
1541+
const started: CommandStartedEvent[] = [];
14771542

14781543
client.on('commandStarted', filterForCommands(['aggregate'], started));
14791544
const doc = { invalidBSONOption: true };

0 commit comments

Comments
 (0)