11import Denque = require( 'denque' ) ;
2- import { MongoError , AnyError , isResumableError , MongoDriverError } from './error' ;
2+ import {
3+ MongoError ,
4+ AnyError ,
5+ isResumableError ,
6+ MongoDriverError ,
7+ MongoAPIError ,
8+ MongoChangeStreamError
9+ } from './error' ;
310import { AggregateOperation , AggregateOptions } from './operations/aggregate' ;
411import {
512 maxWireVersion ,
@@ -259,9 +266,8 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
259266 } else if ( parent instanceof MongoClient ) {
260267 this . type = CHANGE_DOMAIN_TYPES . CLUSTER ;
261268 } else {
262- // TODO(NODE-3404): Replace this with MongoChangeStreamError
263- throw new MongoDriverError (
264- 'Parent provided to ChangeStream constructor must an instance of Collection, Db, or MongoClient'
269+ throw new MongoChangeStreamError (
270+ 'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient'
265271 ) ;
266272 }
267273
@@ -365,8 +371,7 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
365371 */
366372 stream ( options ?: CursorStreamOptions ) : Readable {
367373 this . streamOptions = options ;
368- // TODO(NODE-3404): Replace this with MongoChangeStreamError
369- if ( ! this . cursor ) throw new MongoDriverError ( NO_CURSOR_ERROR ) ;
374+ if ( ! this . cursor ) throw new MongoChangeStreamError ( NO_CURSOR_ERROR ) ;
370375 return this . cursor . stream ( options ) ;
371376 }
372377
@@ -543,17 +548,18 @@ const CHANGE_STREAM_EVENTS = [
543548
544549function setIsEmitter < TSchema > ( changeStream : ChangeStream < TSchema > ) : void {
545550 if ( changeStream [ kMode ] === 'iterator' ) {
546- throw new MongoDriverError (
547- 'Cannot use ChangeStream as an EventEmitter after using as an iterator'
551+ // TODO(NODE-3485): Replace with MongoChangeStreamModeError
552+ throw new MongoAPIError (
553+ 'ChangeStream cannot be used as an EventEmitter after being used as an iterator'
548554 ) ;
549555 }
550556 changeStream [ kMode ] = 'emitter' ;
551557}
552558
553559function setIsIterator < TSchema > ( changeStream : ChangeStream < TSchema > ) : void {
554560 if ( changeStream [ kMode ] === 'emitter' ) {
555- throw new MongoDriverError (
556- 'Cannot use ChangeStream as iterator after using as an EventEmitter '
561+ throw new MongoAPIError (
562+ 'ChangeStream cannot be used as an EventEmitter after being used as an iterator '
557563 ) ;
558564 }
559565 changeStream [ kMode ] = 'iterator' ;
@@ -630,6 +636,7 @@ function waitForTopologyConnected(
630636 }
631637
632638 if ( calculateDurationInMs ( start ) > timeout ) {
639+ // TODO(NODE-3497): Replace with MongoNetworkTimeoutError
633640 return callback ( new MongoDriverError ( 'Timed out waiting for connection' ) ) ;
634641 }
635642
@@ -676,17 +683,23 @@ function processNewChange<TSchema>(
676683 callback ?: Callback < ChangeStreamDocument < TSchema > >
677684) {
678685 if ( changeStream [ kClosed ] ) {
686+ // TODO(NODE-3405): Replace with MongoStreamClosedError
679687 if ( callback ) callback ( new MongoDriverError ( CHANGESTREAM_CLOSED_ERROR ) ) ;
680688 return ;
681689 }
682690
683691 // a null change means the cursor has been notified, implicitly closing the change stream
684692 if ( change == null ) {
693+ // TODO(NODE-3405): Replace with MongoStreamClosedError
685694 return closeWithError ( changeStream , new MongoDriverError ( CHANGESTREAM_CLOSED_ERROR ) , callback ) ;
686695 }
687696
688697 if ( change && ! change . _id ) {
689- return closeWithError ( changeStream , new MongoDriverError ( NO_RESUME_TOKEN_ERROR ) , callback ) ;
698+ return closeWithError (
699+ changeStream ,
700+ new MongoChangeStreamError ( NO_RESUME_TOKEN_ERROR ) ,
701+ callback
702+ ) ;
690703 }
691704
692705 // cache the resume token
@@ -710,6 +723,7 @@ function processError<TSchema>(
710723
711724 // If the change stream has been closed explicitly, do not process error.
712725 if ( changeStream [ kClosed ] ) {
726+ // TODO(NODE-3405): Replace with MongoStreamClosedError
713727 if ( callback ) callback ( new MongoDriverError ( CHANGESTREAM_CLOSED_ERROR ) ) ;
714728 return ;
715729 }
@@ -770,6 +784,7 @@ function processError<TSchema>(
770784 */
771785function getCursor < T > ( changeStream : ChangeStream < T > , callback : Callback < ChangeStreamCursor < T > > ) {
772786 if ( changeStream [ kClosed ] ) {
787+ // TODO(NODE-3405): Replace with MongoStreamClosedError
773788 callback ( new MongoDriverError ( CHANGESTREAM_CLOSED_ERROR ) ) ;
774789 return ;
775790 }
@@ -795,11 +810,12 @@ function processResumeQueue<TSchema>(changeStream: ChangeStream<TSchema>, err?:
795810 const request = changeStream [ kResumeQueue ] . pop ( ) ;
796811 if ( ! err ) {
797812 if ( changeStream [ kClosed ] ) {
813+ // TODO(NODE-3405): Replace with MongoStreamClosedError
798814 request ( new MongoDriverError ( CHANGESTREAM_CLOSED_ERROR ) ) ;
799815 return ;
800816 }
801817 if ( ! changeStream . cursor ) {
802- request ( new MongoDriverError ( NO_CURSOR_ERROR ) ) ;
818+ request ( new MongoChangeStreamError ( NO_CURSOR_ERROR ) ) ;
803819 return ;
804820 }
805821 }
0 commit comments