@@ -3,7 +3,7 @@ import { Writable } from 'stream';
33import type { Document } from '../bson' ;
44import { ObjectId } from '../bson' ;
55import type { Collection } from '../collection' ;
6- import { type AnyError , MongoAPIError , MONGODB_ERROR_CODES , MongoError } from '../error' ;
6+ import { MongoAPIError , MONGODB_ERROR_CODES , MongoError } from '../error' ;
77import type { Callback } from '../utils' ;
88import type { WriteConcernOptions } from '../write_concern' ;
99import { WriteConcern } from './../write_concern' ;
@@ -38,7 +38,7 @@ export interface GridFSBucketWriteStreamOptions extends WriteConcernOptions {
3838 * Do not instantiate this class directly. Use `openUploadStream()` instead.
3939 * @public
4040 */
41- export class GridFSBucketWriteStream extends Writable implements NodeJS . WritableStream {
41+ export class GridFSBucketWriteStream extends Writable {
4242 bucket : GridFSBucket ;
4343 chunks : Collection < GridFSChunk > ;
4444 filename : string ;
@@ -59,15 +59,7 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
5959 } ;
6060 writeConcern ?: WriteConcern ;
6161
62- /** @event */
63- static readonly CLOSE = 'close' ;
64- /** @event */
65- static readonly ERROR = 'error' ;
66- /**
67- * `end()` was called and the write stream successfully wrote the file metadata and all the chunks to MongoDB.
68- * @event
69- */
70- static readonly FINISH = 'finish' ;
62+ fileMetadata : GridFSFile | null = null ;
7163
7264 /**
7365 * @param bucket - Handle for this stream's corresponding bucket
@@ -115,6 +107,16 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
115107 }
116108 }
117109
110+ /**
111+ * The stream is considered constructed when the indexes ßare done being created
112+ */
113+ override _construct ( callback : ( error ?: Error | null ) => void ) : void {
114+ if ( this . bucket . s . checkedIndexes ) {
115+ return process . nextTick ( callback ) ;
116+ }
117+ this . bucket . once ( 'index' , callback ) ;
118+ }
119+
118120 /**
119121 * Write a buffer to the stream.
120122 *
@@ -123,22 +125,20 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
123125 * @param callback - Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
124126 * @returns False if this write required flushing a chunk to MongoDB. True otherwise.
125127 */
126- override write ( chunk : Buffer | string ) : boolean ;
127- override write ( chunk : Buffer | string , callback : Callback < void > ) : boolean ;
128- override write ( chunk : Buffer | string , encoding : BufferEncoding | undefined ) : boolean ;
129- override write (
128+ override _write (
130129 chunk : Buffer | string ,
131- encoding : BufferEncoding | undefined ,
130+ encoding : BufferEncoding ,
132131 callback : Callback < void >
133- ) : boolean ;
134- override write (
135- chunk : Buffer | string ,
136- encodingOrCallback ?: Callback < void > | BufferEncoding ,
137- callback ?: Callback < void >
138- ) : boolean {
139- const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback ;
140- callback = typeof encodingOrCallback === 'function' ? encodingOrCallback : callback ;
141- return waitForIndexes ( this , ( ) => doWrite ( this , chunk , encoding , callback ) ) ;
132+ ) : void {
133+ doWrite ( this , chunk , encoding , callback ) ;
134+ }
135+
136+ override _final ( callback : ( error ?: Error | null ) => void ) : void {
137+ if ( this . state . streamEnd ) {
138+ return process . nextTick ( callback ) ;
139+ }
140+ this . state . streamEnd = true ;
141+ writeRemnant ( this , callback ) ;
142142 }
143143
144144 /**
@@ -159,76 +159,15 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
159159 this . state . aborted = true ;
160160 await this . chunks . deleteMany ( { files_id : this . id } ) ;
161161 }
162-
163- /**
164- * Tells the stream that no more data will be coming in. The stream will
165- * persist the remaining data to MongoDB, write the files document, and
166- * then emit a 'finish' event.
167- *
168- * @param chunk - Buffer to write
169- * @param encoding - Optional encoding for the buffer
170- * @param callback - Function to call when all files and chunks have been persisted to MongoDB
171- */
172- override end ( ) : this;
173- override end ( chunk : Buffer ) : this;
174- override end ( callback : Callback < GridFSFile | void > ) : this;
175- override end ( chunk : Buffer , callback : Callback < GridFSFile | void > ) : this;
176- override end ( chunk : Buffer , encoding : BufferEncoding ) : this;
177- override end (
178- chunk : Buffer ,
179- encoding : BufferEncoding | undefined ,
180- callback : Callback < GridFSFile | void >
181- ) : this;
182- override end (
183- chunkOrCallback ?: Buffer | Callback < GridFSFile | void > ,
184- encodingOrCallback ?: BufferEncoding | Callback < GridFSFile | void > ,
185- callback ?: Callback < GridFSFile | void >
186- ) : this {
187- const chunk = typeof chunkOrCallback === 'function' ? undefined : chunkOrCallback ;
188- const encoding = typeof encodingOrCallback === 'function' ? undefined : encodingOrCallback ;
189- callback =
190- typeof chunkOrCallback === 'function'
191- ? chunkOrCallback
192- : typeof encodingOrCallback === 'function'
193- ? encodingOrCallback
194- : callback ;
195-
196- if ( this . state . streamEnd || checkAborted ( this , callback ) ) return this ;
197-
198- this . state . streamEnd = true ;
199-
200- if ( callback ) {
201- this . once ( GridFSBucketWriteStream . FINISH , ( result : GridFSFile ) => {
202- if ( callback ) callback ( undefined , result ) ;
203- } ) ;
204- }
205-
206- if ( ! chunk ) {
207- waitForIndexes ( this , ( ) => ! ! writeRemnant ( this ) ) ;
208- return this ;
209- }
210-
211- this . write ( chunk , encoding , ( ) => {
212- writeRemnant ( this ) ;
213- } ) ;
214-
215- return this ;
216- }
217162}
218163
219- function __handleError (
220- stream : GridFSBucketWriteStream ,
221- error : AnyError ,
222- callback ?: Callback
223- ) : void {
164+ function handleError ( stream : GridFSBucketWriteStream , error : Error , callback : Callback ) : void {
224165 if ( stream . state . errored ) {
166+ process . nextTick ( callback ) ;
225167 return ;
226168 }
227169 stream . state . errored = true ;
228- if ( callback ) {
229- return callback ( error ) ;
230- }
231- stream . emit ( GridFSBucketWriteStream . ERROR , error ) ;
170+ process . nextTick ( callback , error ) ;
232171}
233172
234173function createChunkDoc ( filesId : ObjectId , n : number , data : Buffer ) : GridFSChunk {
@@ -271,13 +210,16 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void>
271210 }
272211}
273212
274- function checkDone ( stream : GridFSBucketWriteStream , callback ?: Callback ) : boolean {
275- if ( stream . done ) return true ;
213+ function checkDone ( stream : GridFSBucketWriteStream , callback : Callback ) : void {
214+ if ( stream . done ) {
215+ return process . nextTick ( callback ) ;
216+ }
217+
276218 if ( stream . state . streamEnd && stream . state . outstandingRequests === 0 && ! stream . state . errored ) {
277219 // Set done so we do not trigger duplicate createFilesDoc
278220 stream . done = true ;
279221 // Create a new files doc
280- const filesDoc = createFilesDoc (
222+ const fileMetadata = createFilesDoc (
281223 stream . id ,
282224 stream . length ,
283225 stream . chunkSizeBytes ,
@@ -287,24 +229,21 @@ function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolea
287229 stream . options . metadata
288230 ) ;
289231
290- if ( checkAborted ( stream , callback ) ) {
291- return false ;
232+ if ( isAborted ( stream , callback ) ) {
233+ return ;
292234 }
293235
294- stream . files . insertOne ( filesDoc , { writeConcern : stream . writeConcern } ) . then (
236+ stream . files . insertOne ( fileMetadata , { writeConcern : stream . writeConcern } ) . then (
295237 ( ) => {
296- stream . emit ( GridFSBucketWriteStream . FINISH , filesDoc ) ;
297- stream . emit ( GridFSBucketWriteStream . CLOSE ) ;
238+ stream . fileMetadata = fileMetadata ;
239+ callback ( ) ;
298240 } ,
299- error => {
300- return __handleError ( stream , error , callback ) ;
301- }
241+ error => handleError ( stream , error , callback )
302242 ) ;
303-
304- return true ;
243+ return ;
305244 }
306245
307- return false ;
246+ process . nextTick ( callback ) ;
308247}
309248
310249async function checkIndexes ( stream : GridFSBucketWriteStream ) : Promise < void > {
@@ -377,11 +316,11 @@ function createFilesDoc(
377316function doWrite (
378317 stream : GridFSBucketWriteStream ,
379318 chunk : Buffer | string ,
380- encoding ? : BufferEncoding ,
381- callback ? : Callback < void >
382- ) : boolean {
383- if ( checkAborted ( stream , callback ) ) {
384- return false ;
319+ encoding : BufferEncoding ,
320+ callback : Callback < void >
321+ ) : void {
322+ if ( isAborted ( stream , callback ) ) {
323+ return ;
385324 }
386325
387326 const inputBuf = Buffer . isBuffer ( chunk ) ? chunk : Buffer . from ( chunk , encoding ) ;
@@ -392,13 +331,8 @@ function doWrite(
392331 if ( stream . pos + inputBuf . length < stream . chunkSizeBytes ) {
393332 inputBuf . copy ( stream . bufToStore , stream . pos ) ;
394333 stream . pos += inputBuf . length ;
395-
396- callback && callback ( ) ;
397-
398- // Note that we reverse the typical semantics of write's return value
399- // to be compatible with node's `.pipe()` function.
400- // True means client can keep writing.
401- return true ;
334+ process . nextTick ( callback ) ;
335+ return ;
402336 }
403337
404338 // Otherwise, buffer is too big for current chunk, so we need to flush
@@ -418,8 +352,8 @@ function doWrite(
418352 ++ stream . state . outstandingRequests ;
419353 ++ outstandingRequests ;
420354
421- if ( checkAborted ( stream , callback ) ) {
422- return false ;
355+ if ( isAborted ( stream , callback ) ) {
356+ return ;
423357 }
424358
425359 stream . chunks . insertOne ( doc , { writeConcern : stream . writeConcern } ) . then (
@@ -429,13 +363,10 @@ function doWrite(
429363
430364 if ( ! outstandingRequests ) {
431365 stream . emit ( 'drain' , doc ) ;
432- callback && callback ( ) ;
433- checkDone ( stream ) ;
366+ checkDone ( stream , callback ) ;
434367 }
435368 } ,
436- error => {
437- return __handleError ( stream , error ) ;
438- }
369+ error => handleError ( stream , error , callback )
439370 ) ;
440371
441372 spaceRemaining = stream . chunkSizeBytes ;
@@ -445,29 +376,9 @@ function doWrite(
445376 inputBufRemaining -= numToCopy ;
446377 numToCopy = Math . min ( spaceRemaining , inputBufRemaining ) ;
447378 }
448-
449- // Note that we reverse the typical semantics of write's return value
450- // to be compatible with node's `.pipe()` function.
451- // False means the client should wait for the 'drain' event.
452- return false ;
453- }
454-
455- function waitForIndexes (
456- stream : GridFSBucketWriteStream ,
457- callback : ( res : boolean ) => boolean
458- ) : boolean {
459- if ( stream . bucket . s . checkedIndexes ) {
460- return callback ( false ) ;
461- }
462-
463- stream . bucket . once ( 'index' , ( ) => {
464- callback ( true ) ;
465- } ) ;
466-
467- return true ;
468379}
469380
470- function writeRemnant ( stream : GridFSBucketWriteStream , callback ? : Callback ) : boolean {
381+ function writeRemnant ( stream : GridFSBucketWriteStream , callback : Callback ) : void {
471382 // Buffer is empty, so don't bother to insert
472383 if ( stream . pos === 0 ) {
473384 return checkDone ( stream , callback ) ;
@@ -482,28 +393,22 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boo
482393 const doc = createChunkDoc ( stream . id , stream . n , remnant ) ;
483394
484395 // If the stream was aborted, do not write remnant
485- if ( checkAborted ( stream , callback ) ) {
486- return false ;
396+ if ( isAborted ( stream , callback ) ) {
397+ return ;
487398 }
488399
489400 stream . chunks . insertOne ( doc , { writeConcern : stream . writeConcern } ) . then (
490401 ( ) => {
491402 -- stream . state . outstandingRequests ;
492- checkDone ( stream ) ;
403+ checkDone ( stream , callback ) ;
493404 } ,
494- error => {
495- return __handleError ( stream , error ) ;
496- }
405+ error => handleError ( stream , error , callback )
497406 ) ;
498- return true ;
499407}
500408
501- function checkAborted ( stream : GridFSBucketWriteStream , callback ? : Callback < void > ) : boolean {
409+ function isAborted ( stream : GridFSBucketWriteStream , callback : Callback < void > ) : boolean {
502410 if ( stream . state . aborted ) {
503- if ( typeof callback === 'function' ) {
504- // TODO(NODE-3485): Replace with MongoGridFSStreamClosedError
505- callback ( new MongoAPIError ( 'Stream has been aborted' ) ) ;
506- }
411+ process . nextTick ( callback , new MongoAPIError ( 'Stream has been aborted' ) ) ;
507412 return true ;
508413 }
509414 return false ;
0 commit comments