6565
6666const {
6767 ObjectSetPrototypeOf,
68+ Symbol
6869} = primordials ;
6970
7071module . exports = Transform ;
7172const {
72- ERR_METHOD_NOT_IMPLEMENTED ,
73- ERR_MULTIPLE_CALLBACK ,
74- ERR_TRANSFORM_ALREADY_TRANSFORMING ,
75- ERR_TRANSFORM_WITH_LENGTH_0
73+ ERR_METHOD_NOT_IMPLEMENTED
7674} = require ( 'internal/errors' ) . codes ;
7775const Duplex = require ( '_stream_duplex' ) ;
7876ObjectSetPrototypeOf ( Transform . prototype , Duplex . prototype ) ;
7977ObjectSetPrototypeOf ( Transform , Duplex ) ;
8078
81-
82- function afterTransform ( er , data ) {
83- const ts = this . _transformState ;
84- ts . transforming = false ;
85-
86- const cb = ts . writecb ;
87-
88- if ( cb === null ) {
89- return this . emit ( 'error' , new ERR_MULTIPLE_CALLBACK ( ) ) ;
90- }
91-
92- ts . writechunk = null ;
93- ts . writecb = null ;
94-
95- if ( data != null ) // Single equals check for both `null` and `undefined`
96- this . push ( data ) ;
97-
98- cb ( er ) ;
99-
100- const rs = this . _readableState ;
101- rs . reading = false ;
102- if ( rs . needReadable || rs . length < rs . highWaterMark ) {
103- this . _read ( rs . highWaterMark ) ;
104- }
105- }
106-
79+ const kCallback = Symbol ( 'kCallback' ) ;
10780
10881function Transform ( options ) {
10982 if ( ! ( this instanceof Transform ) )
11083 return new Transform ( options ) ;
11184
11285 Duplex . call ( this , options ) ;
11386
114- this . _transformState = {
115- afterTransform : afterTransform . bind ( this ) ,
116- needTransform : false ,
117- transforming : false ,
118- writecb : null ,
119- writechunk : null ,
120- writeencoding : null
121- } ;
122-
12387 // We have implemented the _read method, and done the other things
12488 // that Readable wants before the first _read call, so unset the
12589 // sync guard flag.
12690 this . _readableState . sync = false ;
12791
92+ this [ kCallback ] = null ;
93+
12894 if ( options ) {
12995 if ( typeof options . transform === 'function' )
13096 this . _transform = options . transform ;
@@ -133,89 +99,67 @@ function Transform(options) {
13399 this . _flush = options . flush ;
134100 }
135101
136- // When the writable side finishes, then flush out anything remaining.
102+ // TODO(ronag): Unfortunately _final is invoked asynchronously.
103+ // Use `prefinish` hack. `prefinish` is emitted synchronously when
104+ // and only when `_final` is not defined. Implementing `_final`
105+ // to a Transform should be an error.
137106 this . on ( 'prefinish' , prefinish ) ;
138107}
139108
140109function prefinish ( ) {
141- if ( typeof this . _flush === 'function' && ! this . _readableState . destroyed ) {
110+ if ( typeof this . _flush === 'function' && ! this . destroyed ) {
142111 this . _flush ( ( er , data ) => {
143- done ( this , er , data ) ;
112+ if ( er ) {
113+ this . destroy ( er ) ;
114+ return ;
115+ }
116+
117+ if ( data != null ) {
118+ this . push ( data ) ;
119+ }
120+ this . push ( null ) ;
144121 } ) ;
145122 } else {
146- done ( this , null , null ) ;
123+ this . push ( null ) ;
147124 }
148125}
149126
150- Transform . prototype . push = function ( chunk , encoding ) {
151- this . _transformState . needTransform = false ;
152- return Duplex . prototype . push . call ( this , chunk , encoding ) ;
153- } ;
154-
155- // This is the part where you do stuff!
156- // override this function in implementation classes.
157- // 'chunk' is an input chunk.
158- //
159- // Call `push(newChunk)` to pass along transformed output
160- // to the readable side. You may call 'push' zero or more times.
161- //
162- // Call `cb(err)` when you are done with this chunk. If you pass
163- // an error, then that'll put the hurt on the whole operation. If you
164- // never call cb(), then you'll never get another chunk.
165- Transform . prototype . _transform = function ( chunk , encoding , cb ) {
127+ Transform . prototype . _transform = function ( chunk , encoding , callback ) {
166128 throw new ERR_METHOD_NOT_IMPLEMENTED ( '_transform()' ) ;
167129} ;
168130
169- Transform . prototype . _write = function ( chunk , encoding , cb ) {
170- const ts = this . _transformState ;
171- ts . writecb = cb ;
172- ts . writechunk = chunk ;
173- ts . writeencoding = encoding ;
174- if ( ! ts . transforming ) {
175- const rs = this . _readableState ;
176- if ( ts . needTransform ||
177- rs . needReadable ||
178- rs . length < rs . highWaterMark )
179- this . _read ( rs . highWaterMark ) ;
180- }
131+ Transform . prototype . _write = function ( chunk , encoding , callback ) {
132+ const rState = this . _readableState ;
133+ const wState = this . _writableState ;
134+ const length = rState . length ;
135+
136+ this . _transform ( chunk , encoding , ( err , val ) => {
137+ if ( err ) {
138+ callback ( err ) ;
139+ return ;
140+ }
141+
142+ if ( val != null ) {
143+ this . push ( val ) ;
144+ }
145+
146+ if (
147+ wState . ended || // Backwards compat.
148+ length === rState . length || // Backwards compat.
149+ rState . length < rState . highWaterMark ||
150+ rState . length === 0
151+ ) {
152+ callback ( ) ;
153+ } else {
154+ this [ kCallback ] = callback ;
155+ }
156+ } ) ;
181157} ;
182158
183- // Doesn't matter what the args are here.
184- // _transform does all the work.
185- // That we got here means that the readable side wants more data.
186- Transform . prototype . _read = function ( n ) {
187- const ts = this . _transformState ;
188-
189- if ( ts . writechunk !== null && ! ts . transforming ) {
190- ts . transforming = true ;
191- this . _transform ( ts . writechunk , ts . writeencoding , ts . afterTransform ) ;
192- } else {
193- // Mark that we need a transform, so that any data that comes in
194- // will get processed, now that we've asked for it.
195- ts . needTransform = true ;
159+ Transform . prototype . _read = function ( ) {
160+ if ( this [ kCallback ] ) {
161+ const callback = this [ kCallback ] ;
162+ this [ kCallback ] = null ;
163+ callback ( ) ;
196164 }
197165} ;
198-
199-
200- Transform . prototype . _destroy = function ( err , cb ) {
201- Duplex . prototype . _destroy . call ( this , err , ( err2 ) => {
202- cb ( err2 ) ;
203- } ) ;
204- } ;
205-
206-
207- function done ( stream , er , data ) {
208- if ( er )
209- return stream . emit ( 'error' , er ) ;
210-
211- if ( data != null ) // Single equals check for both `null` and `undefined`
212- stream . push ( data ) ;
213-
214- // These two error cases are coherence checks that can likely not be tested.
215- if ( stream . _writableState . length )
216- throw new ERR_TRANSFORM_WITH_LENGTH_0 ( ) ;
217-
218- if ( stream . _transformState . transforming )
219- throw new ERR_TRANSFORM_ALREADY_TRANSFORMING ( ) ;
220- return stream . push ( null ) ;
221- }
0 commit comments