Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,14 @@ export abstract class AbstractCursor<
};
}

cursorStream(): Readable {
return new ReadableCursorStream(this);
}

stream(options?: CursorStreamOptions): Readable & AsyncIterable<TSchema> {
if (options?.transform) {
const transform = options.transform;
const readable = new ReadableCursorStream(this);
const readable = this.cursorStream();

return readable.pipe(
new Transform({
Expand All @@ -338,7 +342,7 @@ export abstract class AbstractCursor<
);
}

return new ReadableCursorStream(this);
return this.cursorStream();
}

hasNext(): Promise<boolean>;
Expand Down Expand Up @@ -857,9 +861,10 @@ export function assertUninitialized(cursor: AbstractCursor): void {
}
}

class ReadableCursorStream extends Readable {
private _cursor: AbstractCursor;
private _readInProgress = false;
/** @internal */
export class ReadableCursorStream extends Readable {
_cursor: AbstractCursor;
_readInProgress = false;

constructor(cursor: AbstractCursor) {
super({
Expand All @@ -882,7 +887,7 @@ class ReadableCursorStream extends Readable {
this._cursor.close(err => process.nextTick(callback, err || error));
}

private _readNext() {
_readNext() {
next(this._cursor, true, (err, result) => {
if (err) {
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
Expand All @@ -903,6 +908,14 @@ class ReadableCursorStream extends Readable {
return this.push(null);
}

// NOTE: The two above checks on the message of the error will cause a null to be pushed
// to the stream, thus closing the stream before the destroy call happens. This means
// that either of those error messages on a change stream will not get a proper
// 'error' event to be emitted (the error passed to destroy). Change stream resumability
// relies on that error event to be emitted to create its new cursor and thus was not
// working on 4.4 servers because the error emitted on failover was "interrupted at
// shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
// See NODE-4475 for why change stream cursors have their own readable streams now.
return this.destroy(err);
}

Expand Down
36 changes: 35 additions & 1 deletion src/cursor/change_stream_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { Readable } from 'stream';

import type { Document, Long, Timestamp } from '../bson';
import {
type ChangeStreamDocument,
Expand All @@ -14,7 +16,12 @@ import type { CollationOptions } from '../operations/command';
import { type ExecutionResult, executeOperation } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import { type Callback, type MongoDBNamespace, maxWireVersion } from '../utils';
import { type AbstractCursorOptions, AbstractCursor } from './abstract_cursor';
import {
type AbstractCursorOptions,
AbstractCursor,
next,
ReadableCursorStream
} from './abstract_cursor';

/** @internal */
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
Expand Down Expand Up @@ -111,6 +118,10 @@ export class ChangeStreamCursor<
return options;
}

override cursorStream(): Readable {
return new ChangeStreamCursorStream(this);
}

cacheResumeToken(resumeToken: ResumeToken): void {
if (this.bufferedCount() === 0 && this.postBatchResumeToken) {
this.resumeToken = this.postBatchResumeToken;
Expand Down Expand Up @@ -192,3 +203,26 @@ export class ChangeStreamCursor<
});
}
}

/** @internal */
export class ChangeStreamCursorStream extends ReadableCursorStream {
override _readNext() {
next(this._cursor, true, (err, result) => {
if (err) {
return this.destroy(err);
}

if (result == null) {
this.push(null);
} else if (this.destroyed) {
this._cursor.close().catch(() => null);
} else {
if (this.push(result)) {
return this._readNext();
}

this._readInProgress = false;
}
});
}
}