Skip to content

Commit 3846bfe

Browse files
stream: use internal streams for TransformStream
1 parent f0b1036 commit 3846bfe

File tree

3 files changed

+90
-29
lines changed

3 files changed

+90
-29
lines changed

lib/internal/webstreams/readablestream.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3391,4 +3391,6 @@ module.exports = {
33913391
readableByteStreamControllerPullSteps,
33923392
setupReadableByteStreamController,
33933393
setupReadableByteStreamControllerFromSource,
3394+
createReadableStream,
3395+
createReadableByteStream,
33943396
};

lib/internal/webstreams/transformstream.js

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ const {
5151

5252
const {
5353
ReadableStream,
54+
createReadableStream,
5455
readableStreamDefaultControllerCanCloseOrEnqueue,
5556
readableStreamDefaultControllerClose,
5657
readableStreamDefaultControllerEnqueue,
@@ -61,6 +62,7 @@ const {
6162

6263
const {
6364
WritableStream,
65+
createWritableStream,
6466
writableStreamDefaultControllerErrorIfNeeded,
6567
} = require('internal/webstreams/writablestream');
6668

@@ -360,36 +362,24 @@ function initializeTransformStream(
360362
readableHighWaterMark,
361363
readableSizeAlgorithm) {
362364

363-
const writable = new WritableStream({
364-
__proto__: null,
365-
start() { return startPromise.promise; },
366-
write(chunk) {
367-
return transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
368-
},
369-
abort(reason) {
370-
return transformStreamDefaultSinkAbortAlgorithm(stream, reason);
371-
},
372-
close() {
373-
return transformStreamDefaultSinkCloseAlgorithm(stream);
374-
},
375-
}, {
376-
highWaterMark: writableHighWaterMark,
377-
size: writableSizeAlgorithm,
378-
});
365+
const startAlgorithm = () => startPromise.promise;
379366

380-
const readable = new ReadableStream({
381-
__proto__: null,
382-
start() { return startPromise.promise; },
383-
pull() {
384-
return transformStreamDefaultSourcePullAlgorithm(stream);
385-
},
386-
cancel(reason) {
387-
return transformStreamDefaultSourceCancelAlgorithm(stream, reason);
388-
},
389-
}, {
390-
highWaterMark: readableHighWaterMark,
391-
size: readableSizeAlgorithm,
392-
});
367+
const writable = createWritableStream(
368+
startAlgorithm,
369+
(chunk) => transformStreamDefaultSinkWriteAlgorithm(stream, chunk),
370+
() => transformStreamDefaultSinkCloseAlgorithm(stream),
371+
(reason) => transformStreamDefaultSinkAbortAlgorithm(stream, reason),
372+
writableHighWaterMark,
373+
writableSizeAlgorithm,
374+
);
375+
376+
const readable = createReadableStream(
377+
startAlgorithm,
378+
() => transformStreamDefaultSourcePullAlgorithm(stream),
379+
(reason) => transformStreamDefaultSourceCancelAlgorithm(stream, reason),
380+
readableHighWaterMark,
381+
readableSizeAlgorithm,
382+
);
393383

394384
stream[kState] = {
395385
readable,

lib/internal/webstreams/writablestream.js

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,74 @@ ObjectDefineProperties(WritableStreamDefaultController.prototype, {
581581
[SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStreamDefaultController.name),
582582
});
583583

584+
function InternalWritableStream(start, write, close, abort, highWaterMark, size) {
585+
markTransferMode(this, false, true);
586+
this[kType] = 'WritableStream';
587+
this[kState] = {
588+
close: createDeferredPromise(),
589+
closeRequest: {
590+
promise: undefined,
591+
resolve: undefined,
592+
reject: undefined,
593+
},
594+
inFlightWriteRequest: {
595+
promise: undefined,
596+
resolve: undefined,
597+
reject: undefined,
598+
},
599+
inFlightCloseRequest: {
600+
promise: undefined,
601+
resolve: undefined,
602+
reject: undefined,
603+
},
604+
pendingAbortRequest: {
605+
abort: {
606+
promise: undefined,
607+
resolve: undefined,
608+
reject: undefined,
609+
},
610+
reason: undefined,
611+
wasAlreadyErroring: false,
612+
},
613+
backpressure: false,
614+
controller: undefined,
615+
state: 'writable',
616+
storedError: undefined,
617+
writeRequests: [],
618+
writer: undefined,
619+
transfer: {
620+
readable: undefined,
621+
port1: undefined,
622+
port2: undefined,
623+
promise: undefined,
624+
},
625+
};
626+
this[kIsClosedPromise] = createDeferredPromise();
627+
628+
const controller = new WritableStreamDefaultController(kSkipThrow);
629+
setupWritableStreamDefaultController(
630+
this,
631+
controller,
632+
start,
633+
write,
634+
close,
635+
abort,
636+
highWaterMark,
637+
size
638+
)
639+
}
640+
641+
ObjectSetPrototypeOf(InternalWritableStream.prototype, WritableStream.prototype);
642+
ObjectSetPrototypeOf(InternalWritableStream, WritableStream);
643+
644+
function createWritableStream(start, write, close, abort, highWaterMark, size) {
645+
const stream = new InternalWritableStream(start, write, close, abort, highWaterMark, size);
646+
647+
// For spec compliance the InternalWritableStream must be a WritableStream
648+
stream.constructor = WritableStream;
649+
return stream;
650+
}
651+
584652
const isWritableStream =
585653
isBrandCheck('WritableStream');
586654
const isWritableStreamDefaultWriter =
@@ -1360,4 +1428,5 @@ module.exports = {
13601428
writableStreamDefaultControllerAdvanceQueueIfNeeded,
13611429
setupWritableStreamDefaultControllerFromSink,
13621430
setupWritableStreamDefaultController,
1431+
createWritableStream,
13631432
};

0 commit comments

Comments
 (0)