@@ -108,6 +108,7 @@ const kWriteCb = 1 << 26;
108108const kExpectWriteCb = 1 << 27 ;
109109const kAfterWriteTickInfo = 1 << 28 ;
110110const kAfterWritePending = 1 << 29 ;
111+ const kIsDuplex = 1 << 30 ;
111112
112113// TODO(benjamingr) it is likely slower to do it this way than with free functions
113114function makeBitMapDescriptor ( bit ) {
@@ -286,6 +287,7 @@ function WritableState(options, stream, isDuplex) {
286287
287288 if ( options && options . objectMode ) this . state |= kObjectMode ;
288289 if ( isDuplex && options && options . writableObjectMode ) this . state |= kObjectMode ;
290+ if ( isDuplex ) this . state |= kIsDuplex ;
289291
290292 // The point at which write() starts returning false
291293 // Note: 0 is a valid value, means that we always return false if
@@ -513,14 +515,6 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
513515
514516 state . length += len ;
515517
516- // stream._write resets state.length
517- const ret = state . length < state . highWaterMark ;
518-
519- // We must ensure that previous needDrain will not be reset to false.
520- if ( ! ret ) {
521- state . state |= kNeedDrain ;
522- }
523-
524518 if ( ( state . state & ( kWriting | kErrored | kCorked | kConstructed ) ) !== kConstructed ) {
525519 state . buffered . push ( { chunk, encoding, callback } ) ;
526520 if ( ( state . state & kAllBuffers ) !== 0 && encoding !== 'buffer' ) {
@@ -539,6 +533,15 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
539533 state . state &= ~ kSync ;
540534 }
541535
536+ const ret = (
537+ state . length < state . highWaterMark &&
538+ ( ( state . state & kIsDuplex ) === 0 || stream . _readableState ?. ended !== true )
539+ ) ;
540+
541+ if ( ! ret ) {
542+ state . state |= kNeedDrain ;
543+ }
544+
542545 // Return false if errored or destroyed in order to break
543546 // any synchronous while(stream.write(data)) loops.
544547 return ret && ( state . state & ( kDestroyed | kErrored ) ) === 0 ;
0 commit comments