@@ -8,6 +8,15 @@ const uv = process.binding('uv');
88const  debug  =  util . debuglog ( 'stream_wrap' ) ; 
99const  errors  =  require ( 'internal/errors' ) ; 
1010
11+ const  kCurrentWriteRequest  =  Symbol ( 'kCurrentWriteRequest' ) ; 
12+ const  kCurrentShutdownRequest  =  Symbol ( 'kCurrentShutdownRequest' ) ; 
13+ 
14+ function  isClosing ( )  {  return  this . owner . isClosing ( ) ;  } 
15+ function  onreadstart ( )  {  return  this . owner . readStart ( ) ;  } 
16+ function  onreadstop ( )  {  return  this . owner . readStop ( ) ;  } 
17+ function  onshutdown ( req )  {  return  this . owner . doShutdown ( req ) ;  } 
18+ function  onwrite ( req ,  bufs )  {  return  this . owner . doWrite ( req ,  bufs ) ;  } 
19+ 
1120/* This class serves as a wrapper for when the C++ side of Node wants access 
1221 * to a standard JS stream. For example, TLS or HTTP do not operate on network 
1322 * resources conceptually, although that is the common case and what we are 
@@ -27,12 +36,13 @@ class JSStreamWrap extends Socket {
2736      debug ( 'close' ) ; 
2837      this . doClose ( cb ) ; 
2938    } ; 
30-     handle . isAlive  =  ( )  =>  this . isAlive ( ) ; 
31-     handle . isClosing  =  ( )  =>  this . isClosing ( ) ; 
32-     handle . onreadstart  =  ( )  =>  this . readStart ( ) ; 
33-     handle . onreadstop  =  ( )  =>  this . readStop ( ) ; 
34-     handle . onshutdown  =  ( req )  =>  this . doShutdown ( req ) ; 
35-     handle . onwrite  =  ( req ,  bufs )  =>  this . doWrite ( req ,  bufs ) ; 
39+     // Inside of the following functions, `this` refers to the handle 
40+     // and `this.owner` refers to this JSStreamWrap instance. 
41+     handle . isClosing  =  isClosing ; 
42+     handle . onreadstart  =  onreadstart ; 
43+     handle . onreadstop  =  onreadstop ; 
44+     handle . onshutdown  =  onshutdown ; 
45+     handle . onwrite  =  onwrite ; 
3646
3747    stream . pause ( ) ; 
3848    stream . on ( 'error' ,  ( err )  =>  this . emit ( 'error' ,  err ) ) ; 
@@ -60,7 +70,10 @@ class JSStreamWrap extends Socket {
6070
6171    super ( {  handle,  manualStart : true  } ) ; 
6272    this . stream  =  stream ; 
63-     this . _list  =  null ; 
73+     this [ kCurrentWriteRequest ]  =  null ; 
74+     this [ kCurrentShutdownRequest ]  =  null ; 
75+ 
76+     // Start reading. 
6477    this . read ( 0 ) ; 
6578  } 
6679
@@ -69,10 +82,6 @@ class JSStreamWrap extends Socket {
6982    return  JSStreamWrap ; 
7083  } 
7184
72-   isAlive ( )  { 
73-     return  true ; 
74-   } 
75- 
7685  isClosing ( )  { 
7786    return  ! this . readable  ||  ! this . writable ; 
7887  } 
@@ -88,33 +97,56 @@ class JSStreamWrap extends Socket {
8897  } 
8998
9099  doShutdown ( req )  { 
100+     assert . strictEqual ( this [ kCurrentShutdownRequest ] ,  null ) ; 
101+     this [ kCurrentShutdownRequest ]  =  req ; 
102+ 
103+     // TODO(addaleax): It might be nice if we could get into a state where 
104+     // DoShutdown() is not called on streams while a write is still pending. 
105+     // 
106+     // Currently, the only part of the code base where that happens is the 
107+     // TLS implementation, which calls both DoWrite() and DoShutdown() on the 
108+     // underlying network stream inside of its own DoShutdown() method. 
109+     // Working around that on the native side is not quite trivial (yet?), 
110+     // so for now that is supported here. 
111+ 
112+     if  ( this [ kCurrentWriteRequest ]  !==  null ) 
113+       return  this . on ( 'drain' ,  ( )  =>  this . doShutdown ( req ) ) ; 
114+     assert . strictEqual ( this [ kCurrentWriteRequest ] ,  null ) ; 
115+ 
91116    const  handle  =  this . _handle ; 
92-     const  item  =  this . _enqueue ( 'shutdown' ,  req ) ; 
93117
94118    this . stream . end ( ( )  =>  { 
95119      // Ensure that write was dispatched 
96120      setImmediate ( ( )  =>  { 
97-         if  ( ! this . _dequeue ( item ) ) 
98-           return ; 
99- 
100-         handle . finishShutdown ( req ,  0 ) ; 
121+         this . finishShutdown ( handle ,  0 ) ; 
101122      } ) ; 
102123    } ) ; 
103124    return  0 ; 
104125  } 
105126
127+   // handle === this._handle except when called from doClose(). 
128+   finishShutdown ( handle ,  errCode )  { 
129+     // The shutdown request might already have been cancelled. 
130+     if  ( this [ kCurrentShutdownRequest ]  ===  null ) 
131+       return ; 
132+     const  req  =  this [ kCurrentShutdownRequest ] ; 
133+     this [ kCurrentShutdownRequest ]  =  null ; 
134+     handle . finishShutdown ( req ,  errCode ) ; 
135+   } 
136+ 
106137  doWrite ( req ,  bufs )  { 
107-     const  self  =  this ; 
108-     const  handle  =  this . _handle ; 
138+     assert . strictEqual ( this [ kCurrentWriteRequest ] ,  null ) ; 
139+     assert . strictEqual ( this [ kCurrentShutdownRequest ] ,  null ) ; 
140+     this [ kCurrentWriteRequest ]  =  req ; 
109141
110-     var  pending  =  bufs . length ; 
142+     const  handle  =  this . _handle ; 
143+     const  self  =  this ; 
111144
112-     // Queue the request to be able to cancel it 
113-     const  item  =  this . _enqueue ( 'write' ,  req ) ; 
145+     let  pending  =  bufs . length ; 
114146
115147    this . stream . cork ( ) ; 
116-     for  ( var  n  =  0 ;  n  <  bufs . length ;  n ++ ) 
117-       this . stream . write ( bufs [ n ] ,  done ) ; 
148+     for  ( var  i  =  0 ;  i  <  bufs . length ;  ++ i ) 
149+       this . stream . write ( bufs [ i ] ,  done ) ; 
118150    this . stream . uncork ( ) ; 
119151
120152    function  done ( err )  { 
@@ -126,93 +158,42 @@ class JSStreamWrap extends Socket {
126158
127159      let  errCode  =  0 ; 
128160      if  ( err )  { 
129-         const  code  =  uv [ `UV_${ err . code }  ] ; 
130-         errCode  =  ( err . code  &&  code )  ? code  : uv . UV_EPIPE ; 
161+         errCode  =  uv [ `UV_${ err . code }  ]  ||  uv . UV_EPIPE ; 
131162      } 
132163
133164      // Ensure that write was dispatched 
134-       setImmediate ( function ( )  { 
135-         // Do not invoke callback twice 
136-         if  ( ! self . _dequeue ( item ) ) 
137-           return ; 
138- 
139-         handle . finishWrite ( req ,  errCode ) ; 
165+       setImmediate ( ( )  =>  { 
166+         self . finishWrite ( handle ,  errCode ) ; 
140167      } ) ; 
141168    } 
142169
143170    return  0 ; 
144171  } 
145172
146-   _enqueue ( type ,  req )  { 
147-     const  item  =  new  QueueItem ( type ,  req ) ; 
148-     if  ( this . _list  ===  null )  { 
149-       this . _list  =  item ; 
150-       return  item ; 
151-     } 
152- 
153-     item . next  =  this . _list . next ; 
154-     item . prev  =  this . _list ; 
155-     item . next . prev  =  item ; 
156-     item . prev . next  =  item ; 
157- 
158-     return  item ; 
159-   } 
160- 
161-   _dequeue ( item )  { 
162-     assert ( item  instanceof  QueueItem ) ; 
163- 
164-     var  next  =  item . next ; 
165-     var  prev  =  item . prev ; 
166- 
167-     if  ( next  ===  null  &&  prev  ===  null ) 
168-       return  false ; 
169- 
170-     item . next  =  null ; 
171-     item . prev  =  null ; 
172- 
173-     if  ( next  ===  item )  { 
174-       prev  =  null ; 
175-       next  =  null ; 
176-     }  else  { 
177-       prev . next  =  next ; 
178-       next . prev  =  prev ; 
179-     } 
180- 
181-     if  ( this . _list  ===  item ) 
182-       this . _list  =  next ; 
173+   // handle === this._handle except when called from doClose(). 
174+   finishWrite ( handle ,  errCode )  { 
175+     // The write request might already have been cancelled. 
176+     if  ( this [ kCurrentWriteRequest ]  ===  null ) 
177+       return ; 
178+     const  req  =  this [ kCurrentWriteRequest ] ; 
179+     this [ kCurrentWriteRequest ]  =  null ; 
183180
184-     return   true ; 
181+     handle . finishWrite ( req ,   errCode ) ; 
185182  } 
186183
187184  doClose ( cb )  { 
188185    const  handle  =  this . _handle ; 
189186
190187    setImmediate ( ( )  =>  { 
191-       while  ( this . _list  !==  null )  { 
192-         const  item  =  this . _list ; 
193-         const  req  =  item . req ; 
194-         this . _dequeue ( item ) ; 
195- 
196-         const  errCode  =  uv . UV_ECANCELED ; 
197-         if  ( item . type  ===  'write' )  { 
198-           handle . finishWrite ( req ,  errCode ) ; 
199-         }  else  if  ( item . type  ===  'shutdown' )  { 
200-           handle . finishShutdown ( req ,  errCode ) ; 
201-         } 
202-       } 
203- 
204188      // Should be already set by net.js 
205189      assert . strictEqual ( this . _handle ,  null ) ; 
190+ 
191+       this . finishWrite ( handle ,  uv . UV_ECANCELED ) ; 
192+       this . finishShutdown ( handle ,  uv . UV_ECANCELED ) ; 
193+ 
206194      cb ( ) ; 
207195    } ) ; 
208196  } 
209197} 
210198
211- function  QueueItem ( type ,  req )  { 
212-   this . type  =  type ; 
213-   this . req  =  req ; 
214-   this . prev  =  this ; 
215-   this . next  =  this ; 
216- } 
217- 
218199module . exports  =  JSStreamWrap ; 
0 commit comments