Skip to content

Commit 9e3ba81

Browse files
refactor(NODE-4638): prevent parallel calls to fn in the async interval (#3413)
1 parent 5f34ad0 commit 9e3ba81

File tree

5 files changed

+277
-234
lines changed

5 files changed

+277
-234
lines changed

src/sdam/monitor.ts

Lines changed: 40 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -470,12 +470,6 @@ export interface MonitorIntervalOptions {
470470
minHeartbeatFrequencyMS: number;
471471
/** Whether the method should be called immediately when the interval is started */
472472
immediate: boolean;
473-
474-
/**
475-
* Only used for testing unreliable timer environments
476-
* @internal
477-
*/
478-
clock: () => number;
479473
}
480474

481475
/**
@@ -484,61 +478,56 @@ export interface MonitorIntervalOptions {
484478
export class MonitorInterval {
485479
fn: (callback: Callback) => void;
486480
timerId: NodeJS.Timeout | undefined;
487-
lastCallTime: number;
488-
isExpeditedCheckScheduled = false;
481+
lastExecutionEnded: number;
482+
isExpeditedCallToFnScheduled = false;
489483
stopped = false;
484+
isExecutionInProgress = false;
485+
hasExecutedOnce = false;
490486

491487
heartbeatFrequencyMS: number;
492488
minHeartbeatFrequencyMS: number;
493-
clock: () => number;
494489

495490
constructor(fn: (callback: Callback) => void, options: Partial<MonitorIntervalOptions> = {}) {
496491
this.fn = fn;
497-
this.lastCallTime = 0;
492+
this.lastExecutionEnded = -Infinity;
498493

499494
this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 1000;
500495
this.minHeartbeatFrequencyMS = options.minHeartbeatFrequencyMS ?? 500;
501-
this.clock = typeof options.clock === 'function' ? options.clock : now;
502496

503497
if (options.immediate) {
504498
this._executeAndReschedule();
505499
} else {
506-
this.lastCallTime = this.clock();
507500
this._reschedule(undefined);
508501
}
509502
}
510503

511504
wake() {
512-
const currentTime = this.clock();
513-
const nextScheduledCallTime = this.lastCallTime + this.heartbeatFrequencyMS;
514-
const timeUntilNextCall = nextScheduledCallTime - currentTime;
515-
516-
// For the streaming protocol: there is nothing obviously stopping this
517-
// interval from being woken up again while we are waiting "infinitely"
518-
// for `fn` to be called again`. Since the function effectively
519-
// never completes, the `timeUntilNextCall` will continue to grow
520-
// negatively unbounded, so it will never trigger a reschedule here.
521-
522-
// This is possible in virtualized environments like AWS Lambda where our
523-
// clock is unreliable. In these cases the timer is "running" but never
524-
// actually completes, so we want to execute immediately and then attempt
525-
// to reschedule.
526-
if (timeUntilNextCall < 0) {
527-
this._executeAndReschedule();
505+
const currentTime = now();
506+
const timeSinceLastCall = currentTime - this.lastExecutionEnded;
507+
508+
// TODO(NODE-4674): Add error handling and logging to the monitor
509+
if (timeSinceLastCall < 0) {
510+
return this._executeAndReschedule();
511+
}
512+
513+
if (this.isExecutionInProgress) {
528514
return;
529515
}
530516

531517
// debounce multiple calls to wake within the `minInterval`
532-
if (this.isExpeditedCheckScheduled) {
518+
if (this.isExpeditedCallToFnScheduled) {
533519
return;
534520
}
535521

536522
// reschedule a call as soon as possible, ensuring the call never happens
537523
// faster than the `minInterval`
538-
if (timeUntilNextCall > this.minHeartbeatFrequencyMS) {
539-
this._reschedule(this.minHeartbeatFrequencyMS);
540-
this.isExpeditedCheckScheduled = true;
524+
if (timeSinceLastCall < this.minHeartbeatFrequencyMS) {
525+
this.isExpeditedCallToFnScheduled = true;
526+
this._reschedule(this.minHeartbeatFrequencyMS - timeSinceLastCall);
527+
return;
541528
}
529+
530+
this._executeAndReschedule();
542531
}
543532

544533
stop() {
@@ -548,22 +537,26 @@ export class MonitorInterval {
548537
this.timerId = undefined;
549538
}
550539

551-
this.lastCallTime = 0;
552-
this.isExpeditedCheckScheduled = false;
540+
this.lastExecutionEnded = -Infinity;
541+
this.isExpeditedCallToFnScheduled = false;
553542
}
554543

555544
toString() {
556545
return JSON.stringify(this);
557546
}
558547

559548
toJSON() {
549+
const currentTime = now();
550+
const timeSinceLastCall = currentTime - this.lastExecutionEnded;
560551
return {
561552
timerId: this.timerId != null ? 'set' : 'cleared',
562-
lastCallTime: this.lastCallTime,
563-
isExpeditedCheckScheduled: this.isExpeditedCheckScheduled,
553+
lastCallTime: this.lastExecutionEnded,
554+
isExpeditedCheckScheduled: this.isExpeditedCallToFnScheduled,
564555
stopped: this.stopped,
565556
heartbeatFrequencyMS: this.heartbeatFrequencyMS,
566-
minHeartbeatFrequencyMS: this.minHeartbeatFrequencyMS
557+
minHeartbeatFrequencyMS: this.minHeartbeatFrequencyMS,
558+
currentTime,
559+
timeSinceLastCall
567560
};
568561
}
569562

@@ -577,11 +570,17 @@ export class MonitorInterval {
577570
}
578571

579572
private _executeAndReschedule = () => {
580-
this.isExpeditedCheckScheduled = false;
581-
this.lastCallTime = this.clock();
573+
if (this.stopped) return;
574+
if (this.timerId) {
575+
clearTimeout(this.timerId);
576+
}
577+
578+
this.isExpeditedCallToFnScheduled = false;
579+
this.isExecutionInProgress = true;
582580

583-
this.fn(err => {
584-
if (err) throw err;
581+
this.fn(() => {
582+
this.lastExecutionEnded = now();
583+
this.isExecutionInProgress = false;
585584
this._reschedule(this.heartbeatFrequencyMS);
586585
});
587586
};

test/integration/change-streams/change_stream.test.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,56 @@ describe('Change Streams', function () {
10371037
});
10381038

10391039
describe('Change Stream Resume Error Tests', function () {
1040+
describe('TODO(NODE-4670): fix consecutive resumes unified tests', function () {
1041+
let client: MongoClient;
1042+
let changeStream: ChangeStream;
1043+
1044+
beforeEach(async function () {
1045+
client = this.configuration.newClient();
1046+
await client.connect();
1047+
});
1048+
1049+
afterEach(async function () {
1050+
await changeStream.close();
1051+
await client.close();
1052+
});
1053+
1054+
it('should support consecutive resumes', {
1055+
metadata: { requires: { topology: 'replicaset', mongodb: '>=4.2' } },
1056+
async test() {
1057+
const failCommand: FailPoint = {
1058+
configureFailPoint: 'failCommand',
1059+
mode: {
1060+
times: 2
1061+
},
1062+
data: {
1063+
failCommands: ['getMore'],
1064+
closeConnection: true
1065+
}
1066+
};
1067+
1068+
await client.db('admin').command(failCommand);
1069+
1070+
const collection = client.db('test_consecutive_resume').collection('collection');
1071+
1072+
changeStream = collection.watch([], { batchSize: 1 });
1073+
1074+
await initIteratorMode(changeStream);
1075+
1076+
await collection.insertOne({ name: 'bumpy' });
1077+
await collection.insertOne({ name: 'bumpy' });
1078+
await collection.insertOne({ name: 'bumpy' });
1079+
1080+
await sleep(1000);
1081+
1082+
for (let i = 0; i < 3; ++i) {
1083+
const change = await changeStream.next();
1084+
expect(change).not.to.be.null;
1085+
}
1086+
}
1087+
});
1088+
});
1089+
10401090
it.skip('should continue piping changes after a resumable error', {
10411091
metadata: { requires: { topology: 'replicaset' } },
10421092
test: done => {

test/integration/change-streams/change_streams.spec.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,9 @@ import { loadSpecTests } from '../../spec';
44
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';
55

66
describe('Change Streams Spec - Unified', function () {
7-
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')));
7+
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')), test =>
8+
test.description === 'Test consecutive resume'
9+
? 'TODO(NODE-4670): fix consecutive resume change stream test'
10+
: false
11+
);
812
});

test/integration/unified-test-format/unified_test_format.spec.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ const filter: TestFilter = ({ description }) => {
2323
return 'TODO(NODE-3308): failures due unnecessary getMore and killCursors calls in 5.0';
2424
}
2525

26+
if (description === 'Test consecutive resume') {
27+
return 'TODO(NODE-4670): fix consecutive resume change stream test';
28+
}
29+
2630
if (
2731
process.env.AUTH === 'auth' &&
2832
[

0 commit comments

Comments
 (0)