@@ -8,6 +8,15 @@ const uv = process.binding('uv');
88const debug = util . debuglog ( 'stream_wrap' ) ;
99const errors = require ( 'internal/errors' ) ;
1010
11+ const kCurrentWriteRequest = Symbol ( 'kCurrentWriteRequest' ) ;
12+ const kCurrentShutdownRequest = Symbol ( 'kCurrentShutdownRequest' ) ;
13+
14+ function isClosing ( ) { return this . owner . isClosing ( ) ; }
15+ function onreadstart ( ) { return this . owner . readStart ( ) ; }
16+ function onreadstop ( ) { return this . owner . readStop ( ) ; }
17+ function onshutdown ( req ) { return this . owner . doShutdown ( req ) ; }
18+ function onwrite ( req , bufs ) { return this . owner . doWrite ( req , bufs ) ; }
19+
1120/* This class serves as a wrapper for when the C++ side of Node wants access
1221 * to a standard JS stream. For example, TLS or HTTP do not operate on network
1322 * resources conceptually, although that is the common case and what we are
@@ -27,12 +36,13 @@ class JSStreamWrap extends Socket {
2736 debug ( 'close' ) ;
2837 this . doClose ( cb ) ;
2938 } ;
30- handle . isAlive = ( ) => this . isAlive ( ) ;
31- handle . isClosing = ( ) => this . isClosing ( ) ;
32- handle . onreadstart = ( ) => this . readStart ( ) ;
33- handle . onreadstop = ( ) => this . readStop ( ) ;
34- handle . onshutdown = ( req ) => this . doShutdown ( req ) ;
35- handle . onwrite = ( req , bufs ) => this . doWrite ( req , bufs ) ;
39+ // Inside of the following functions, `this` refers to the handle
40+ // and `this.owner` refers to this JSStreamWrap instance.
41+ handle . isClosing = isClosing ;
42+ handle . onreadstart = onreadstart ;
43+ handle . onreadstop = onreadstop ;
44+ handle . onshutdown = onshutdown ;
45+ handle . onwrite = onwrite ;
3646
3747 stream . pause ( ) ;
3848 stream . on ( 'error' , ( err ) => this . emit ( 'error' , err ) ) ;
@@ -60,7 +70,10 @@ class JSStreamWrap extends Socket {
6070
6171 super ( { handle, manualStart : true } ) ;
6272 this . stream = stream ;
63- this . _list = null ;
73+ this [ kCurrentWriteRequest ] = null ;
74+ this [ kCurrentShutdownRequest ] = null ;
75+
76+ // Start reading.
6477 this . read ( 0 ) ;
6578 }
6679
@@ -69,10 +82,6 @@ class JSStreamWrap extends Socket {
6982 return JSStreamWrap ;
7083 }
7184
72- isAlive ( ) {
73- return true ;
74- }
75-
7685 isClosing ( ) {
7786 return ! this . readable || ! this . writable ;
7887 }
@@ -88,33 +97,56 @@ class JSStreamWrap extends Socket {
8897 }
8998
9099 doShutdown ( req ) {
100+ assert . strictEqual ( this [ kCurrentShutdownRequest ] , null ) ;
101+ this [ kCurrentShutdownRequest ] = req ;
102+
103+ // TODO(addaleax): It might be nice if we could get into a state where
104+ // DoShutdown() is not called on streams while a write is still pending.
105+ //
106+ // Currently, the only part of the code base where that happens is the
107+ // TLS implementation, which calls both DoWrite() and DoShutdown() on the
108+ // underlying network stream inside of its own DoShutdown() method.
109+ // Working around that on the native side is not quite trivial (yet?),
110+ // so for now that is supported here.
111+
112+ if ( this [ kCurrentWriteRequest ] !== null )
113+ return this . on ( 'drain' , ( ) => this . doShutdown ( req ) ) ;
114+ assert . strictEqual ( this [ kCurrentWriteRequest ] , null ) ;
115+
91116 const handle = this . _handle ;
92- const item = this . _enqueue ( 'shutdown' , req ) ;
93117
94118 this . stream . end ( ( ) => {
95119 // Ensure that write was dispatched
96120 setImmediate ( ( ) => {
97- if ( ! this . _dequeue ( item ) )
98- return ;
99-
100- handle . finishShutdown ( req , 0 ) ;
121+ this . finishShutdown ( handle , 0 ) ;
101122 } ) ;
102123 } ) ;
103124 return 0 ;
104125 }
105126
127+ // handle === this._handle except when called from doClose().
128+ finishShutdown ( handle , errCode ) {
129+ // The shutdown request might already have been cancelled.
130+ if ( this [ kCurrentShutdownRequest ] === null )
131+ return ;
132+ const req = this [ kCurrentShutdownRequest ] ;
133+ this [ kCurrentShutdownRequest ] = null ;
134+ handle . finishShutdown ( req , errCode ) ;
135+ }
136+
106137 doWrite ( req , bufs ) {
107- const self = this ;
108- const handle = this . _handle ;
138+ assert . strictEqual ( this [ kCurrentWriteRequest ] , null ) ;
139+ assert . strictEqual ( this [ kCurrentShutdownRequest ] , null ) ;
140+ this [ kCurrentWriteRequest ] = req ;
109141
110- var pending = bufs . length ;
142+ const handle = this . _handle ;
143+ const self = this ;
111144
112- // Queue the request to be able to cancel it
113- const item = this . _enqueue ( 'write' , req ) ;
145+ let pending = bufs . length ;
114146
115147 this . stream . cork ( ) ;
116- for ( var n = 0 ; n < bufs . length ; n ++ )
117- this . stream . write ( bufs [ n ] , done ) ;
148+ for ( var i = 0 ; i < bufs . length ; ++ i )
149+ this . stream . write ( bufs [ i ] , done ) ;
118150 this . stream . uncork ( ) ;
119151
120152 function done ( err ) {
@@ -126,93 +158,42 @@ class JSStreamWrap extends Socket {
126158
127159 let errCode = 0 ;
128160 if ( err ) {
129- const code = uv [ `UV_${ err . code } ` ] ;
130- errCode = ( err . code && code ) ? code : uv . UV_EPIPE ;
161+ errCode = uv [ `UV_${ err . code } ` ] || uv . UV_EPIPE ;
131162 }
132163
133164 // Ensure that write was dispatched
134- setImmediate ( function ( ) {
135- // Do not invoke callback twice
136- if ( ! self . _dequeue ( item ) )
137- return ;
138-
139- handle . finishWrite ( req , errCode ) ;
165+ setImmediate ( ( ) => {
166+ self . finishWrite ( handle , errCode ) ;
140167 } ) ;
141168 }
142169
143170 return 0 ;
144171 }
145172
146- _enqueue ( type , req ) {
147- const item = new QueueItem ( type , req ) ;
148- if ( this . _list === null ) {
149- this . _list = item ;
150- return item ;
151- }
152-
153- item . next = this . _list . next ;
154- item . prev = this . _list ;
155- item . next . prev = item ;
156- item . prev . next = item ;
157-
158- return item ;
159- }
160-
161- _dequeue ( item ) {
162- assert ( item instanceof QueueItem ) ;
163-
164- var next = item . next ;
165- var prev = item . prev ;
166-
167- if ( next === null && prev === null )
168- return false ;
169-
170- item . next = null ;
171- item . prev = null ;
172-
173- if ( next === item ) {
174- prev = null ;
175- next = null ;
176- } else {
177- prev . next = next ;
178- next . prev = prev ;
179- }
180-
181- if ( this . _list === item )
182- this . _list = next ;
173+ // handle === this._handle except when called from doClose().
174+ finishWrite ( handle , errCode ) {
175+ // The write request might already have been cancelled.
176+ if ( this [ kCurrentWriteRequest ] === null )
177+ return ;
178+ const req = this [ kCurrentWriteRequest ] ;
179+ this [ kCurrentWriteRequest ] = null ;
183180
184- return true ;
181+ handle . finishWrite ( req , errCode ) ;
185182 }
186183
187184 doClose ( cb ) {
188185 const handle = this . _handle ;
189186
190187 setImmediate ( ( ) => {
191- while ( this . _list !== null ) {
192- const item = this . _list ;
193- const req = item . req ;
194- this . _dequeue ( item ) ;
195-
196- const errCode = uv . UV_ECANCELED ;
197- if ( item . type === 'write' ) {
198- handle . finishWrite ( req , errCode ) ;
199- } else if ( item . type === 'shutdown' ) {
200- handle . finishShutdown ( req , errCode ) ;
201- }
202- }
203-
204188 // Should be already set by net.js
205189 assert . strictEqual ( this . _handle , null ) ;
190+
191+ this . finishWrite ( handle , uv . UV_ECANCELED ) ;
192+ this . finishShutdown ( handle , uv . UV_ECANCELED ) ;
193+
206194 cb ( ) ;
207195 } ) ;
208196 }
209197}
210198
211- function QueueItem ( type , req ) {
212- this . type = type ;
213- this . req = req ;
214- this . prev = this ;
215- this . next = this ;
216- }
217-
218199module . exports = JSStreamWrap ;
0 commit comments