1- import { on } from 'stream' ;
1+ import { type Readable , Transform , type TransformCallback } from 'stream' ;
22import { clearTimeout , setTimeout } from 'timers' ;
33import { promisify } from 'util' ;
44
@@ -61,6 +61,7 @@ import type { ClientMetadata } from './handshake/client_metadata';
6161import { MessageStream , type OperationDescription } from './message_stream' ;
6262import { StreamDescription , type StreamDescriptionOptions } from './stream_description' ;
6363import { decompressResponse } from './wire_protocol/compression' ;
64+ import { onData } from './wire_protocol/on_data' ;
6465import { getReadPreference , isSharded } from './wire_protocol/shared' ;
6566
6667/** @internal */
@@ -807,17 +808,19 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
807808 /** @internal */
808809 authContext ?: AuthContext ;
809810
810- /**@internal */
811811 delayedTimeoutId : NodeJS . Timeout | null = null ;
812812 /** @internal */
813813 [ kDescription ] : StreamDescription ;
814814 /** @internal */
815815 [ kGeneration ] : number ;
816816 /** @internal */
817817 [ kLastUseTime ] : number ;
818- /** @internal */
819- socket : Stream ;
820- controller : AbortController ;
818+
819+ private socket : Stream ;
820+ private controller : AbortController ;
821+ private messageStream : Readable ;
822+ private socketWrite : ( buffer : Uint8Array ) => Promise < void > ;
823+
821824 /** @internal */
822825 [ kHello ] : Document | null ;
823826 /** @internal */
@@ -857,9 +860,18 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
857860
858861 this . socket = stream ;
859862 this . controller = new AbortController ( ) ;
860- this . socket . on ( 'error' , this . onError . bind ( this ) ) ;
863+
864+ this . messageStream = this . socket
865+ . on ( 'error' , this . onError . bind ( this ) )
866+ . pipe ( new SizedMessageTransform ( { connection : this } ) )
867+ . on ( 'error' , this . onError . bind ( this ) ) ;
861868 this . socket . on ( 'close' , this . onClose . bind ( this ) ) ;
862869 this . socket . on ( 'timeout' , this . onTimeout . bind ( this ) ) ;
870+
871+ const socketWrite = promisify ( this . socket . write . bind ( this . socket ) ) ;
872+ this . socketWrite = async buffer => {
873+ return abortable ( socketWrite ( buffer ) , { signal : this . controller . signal } ) ;
874+ } ;
863875 }
864876
865877 async commandAsync ( ...args : Parameters < typeof this . command > ) {
@@ -1060,23 +1072,19 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
10601072 }
10611073
10621074 try {
1063- await writeCommand ( this , message , {
1075+ await this . writeCommand ( message , {
10641076 agreedCompressor : this . description . compressor ?? 'none' ,
1065- zlibCompressionLevel : this . description . zlibCompressionLevel ,
1066- signal : this . controller . signal
1077+ zlibCompressionLevel : this . description . zlibCompressionLevel
10671078 } ) ;
10681079
1069- // TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
1070- this . controller = new AbortController ( ) ;
1071-
10721080 if ( options . noResponse ) {
10731081 yield { ok : 1 } ;
10741082 return ;
10751083 }
10761084
10771085 this . controller . signal . throwIfAborted ( ) ;
10781086
1079- for await ( const response of readMany ( this , { signal : this . controller . signal } ) ) {
1087+ for await ( const response of this . readMany ( ) ) {
10801088 this . socket . setTimeout ( 0 ) ;
10811089 response . parse ( options ) ;
10821090
@@ -1094,9 +1102,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
10941102 }
10951103 }
10961104
1097- // TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
1098- this . controller = new AbortController ( ) ;
1099-
11001105 yield document ;
11011106 this . controller . signal . throwIfAborted ( ) ;
11021107
@@ -1214,121 +1219,83 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
12141219 } ;
12151220 exhaustLoop ( ) . catch ( replyListener ) ;
12161221 }
1217- }
12181222
1219- const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4 ;
1220-
1221- /**
1222- * @internal
1223- *
1224- * This helper reads chucks of data out of a socket and buffers them until it has received a
1225- * full wire protocol message.
1226- *
1227- * By itself, produces an infinite async generator of wire protocol messages and consumers must end
1228- * the stream by calling `return` on the generator.
1229- *
1230- * Note that `for-await` loops call `return` automatically when the loop is exited.
1231- */
1232- export async function * readWireProtocolMessages (
1233- connection : ModernConnection ,
1234- { signal } : { signal ?: AbortSignal } = { }
1235- ) : AsyncGenerator < Buffer > {
1236- const bufferPool = new BufferPool ( ) ;
1237- const maxBsonMessageSize = connection . hello ?. maxBsonMessageSize ?? kDefaultMaxBsonMessageSize ;
1238- for await ( const [ chunk ] of on ( connection . socket , 'data' , { signal } ) ) {
1239- if ( connection . delayedTimeoutId ) {
1240- clearTimeout ( connection . delayedTimeoutId ) ;
1241- connection . delayedTimeoutId = null ;
1242- }
1243-
1244- bufferPool . append ( chunk ) ;
1245- const sizeOfMessage = bufferPool . getInt32 ( ) ;
1223+ /**
1224+ * @internal
1225+ *
1226+ * Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
1227+ * waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
1228+ */
1229+ async writeCommand (
1230+ command : WriteProtocolMessageType ,
1231+ options : Partial < Pick < OperationDescription , 'agreedCompressor' | 'zlibCompressionLevel' > >
1232+ ) : Promise < void > {
1233+ const finalCommand =
1234+ options . agreedCompressor === 'none' || ! OpCompressedRequest . canCompress ( command )
1235+ ? command
1236+ : new OpCompressedRequest ( command , {
1237+ agreedCompressor : options . agreedCompressor ?? 'none' ,
1238+ zlibCompressionLevel : options . zlibCompressionLevel ?? 0
1239+ } ) ;
12461240
1247- if ( sizeOfMessage == null ) {
1248- continue ;
1249- }
1241+ const buffer = Buffer . concat ( await finalCommand . toBin ( ) ) ;
12501242
1251- if ( sizeOfMessage < 0 ) {
1252- throw new MongoParseError ( `Invalid message size: ${ sizeOfMessage } ` ) ;
1253- }
1243+ return this . socketWrite ( buffer ) ;
1244+ }
12541245
1255- if ( sizeOfMessage > maxBsonMessageSize ) {
1256- throw new MongoParseError (
1257- `Invalid message size: ${ sizeOfMessage } , max allowed: ${ maxBsonMessageSize } `
1258- ) ;
1259- }
1246+ /**
1247+ * @internal
1248+ *
1249+ * Returns an async generator that yields full wire protocol messages from the underlying socket. This function
1250+ * yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request
1251+ * by calling `return` on the generator.
1252+ *
1253+ * Note that `for-await` loops call `return` automatically when the loop is exited.
1254+ */
1255+ async * readMany ( ) : AsyncGenerator < OpMsgResponse | OpQueryResponse > {
1256+ for await ( const message of onData ( this . messageStream , { signal : this . controller . signal } ) ) {
1257+ const response = await decompressResponse ( message ) ;
1258+ yield response ;
12601259
1261- if ( sizeOfMessage > bufferPool . length ) {
1262- continue ;
1260+ if ( ! response . moreToCome ) {
1261+ return ;
1262+ }
12631263 }
1264-
1265- yield bufferPool . read ( sizeOfMessage ) ;
12661264 }
12671265}
12681266
1269- /**
1270- * @internal
1271- *
1272- * Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
1273- * waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
1274- */
1275- export async function writeCommand (
1276- connection : ModernConnection ,
1277- command : WriteProtocolMessageType ,
1278- options : Partial < Pick < OperationDescription , 'agreedCompressor' | 'zlibCompressionLevel' > > & {
1279- signal ?: AbortSignal ;
1280- }
1281- ) : Promise < void > {
1282- const finalCommand =
1283- options . agreedCompressor === 'none' || ! OpCompressedRequest . canCompress ( command )
1284- ? command
1285- : new OpCompressedRequest ( command , {
1286- agreedCompressor : options . agreedCompressor ?? 'none' ,
1287- zlibCompressionLevel : options . zlibCompressionLevel ?? 0
1288- } ) ;
1267+ /** @internal */
1268+ export class SizedMessageTransform extends Transform {
1269+ bufferPool : BufferPool ;
1270+ connection : ModernConnection ;
1271+
1272+ constructor ( { connection } : { connection : ModernConnection } ) {
1273+ super ( { objectMode : false } ) ;
1274+ this . bufferPool = new BufferPool ( ) ;
1275+ this . connection = connection ;
1276+ }
1277+ override _transform ( chunk : Buffer , encoding : unknown , callback : TransformCallback ) : void {
1278+ if ( this . connection . delayedTimeoutId != null ) {
1279+ clearTimeout ( this . connection . delayedTimeoutId ) ;
1280+ this . connection . delayedTimeoutId = null ;
1281+ }
12891282
1290- const buffer = Buffer . concat ( await finalCommand . toBin ( ) ) ;
1283+ this . bufferPool . append ( chunk ) ;
1284+ const sizeOfMessage = this . bufferPool . getInt32 ( ) ;
12911285
1292- const socketWriteFn = promisify ( connection . socket . write . bind ( connection . socket ) ) ;
1286+ if ( sizeOfMessage == null ) {
1287+ return callback ( ) ;
1288+ }
12931289
1294- return abortable ( socketWriteFn ( buffer ) , options ) ;
1295- }
1290+ if ( sizeOfMessage < 0 ) {
1291+ return callback ( new MongoParseError ( `Invalid message size: ${ sizeOfMessage } , too small` ) ) ;
1292+ }
12961293
1297- /**
1298- * @internal
1299- *
1300- * Returns an async generator that yields full wire protocol messages from the underlying socket. This function
1301- * yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request
1302- * by calling `return` on the generator.
1303- *
1304- * Note that `for-await` loops call `return` automatically when the loop is exited.
1305- */
1306- export async function * readMany (
1307- connection : ModernConnection ,
1308- options : { signal ?: AbortSignal } = { }
1309- ) : AsyncGenerator < OpMsgResponse | OpQueryResponse > {
1310- for await ( const message of readWireProtocolMessages ( connection , options ) ) {
1311- const response = await decompressResponse ( message ) ;
1312- yield response ;
1313-
1314- if ( ! response . moreToCome ) {
1315- return ;
1294+ if ( sizeOfMessage > this . bufferPool . length ) {
1295+ return callback ( ) ;
13161296 }
1317- }
1318- }
13191297
1320- /**
1321- * @internal
1322- *
1323- * Reads a single wire protocol message out of a connection.
1324- */
1325- export async function read (
1326- connection : ModernConnection ,
1327- options : { signal ?: AbortSignal } = { }
1328- ) : Promise < OpMsgResponse | OpQueryResponse > {
1329- for await ( const value of readMany ( connection , options ) ) {
1330- return value ;
1298+ const message = this . bufferPool . read ( sizeOfMessage ) ;
1299+ return callback ( null , message ) ;
13311300 }
1332-
1333- throw new MongoRuntimeError ( 'unable to read message off of connection' ) ;
13341301}
0 commit comments