99
1010namespace  node  {
1111
12+ using  v8::ArrayBuffer;
1213using  v8::Boolean;
1314using  v8::Context;
1415using  v8::Float64Array;
@@ -978,7 +979,6 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
978979  //  Intentionally ignore the callback if the stream does not exist or has
979980  //  already been destroyed
980981  if  (stream != nullptr  && !stream->IsDestroyed ()) {
981-     stream->AddChunk (nullptr , 0 );
982982    stream->Close (code);
983983    //  It is possible for the stream close to occur before the stream is
984984    //  ever passed on to the javascript side. If that happens, skip straight
@@ -989,9 +989,8 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
989989        stream->object ()->Get (context, env->onstreamclose_string ())
990990            .ToLocalChecked ();
991991    if  (fn->IsFunction ()) {
992-       Local<Value> argv[2 ] = {
993-         Integer::NewFromUnsigned (isolate, code),
994-         Boolean::New (isolate, stream->HasDataChunks (true ))
992+       Local<Value> argv[] = {
993+         Integer::NewFromUnsigned (isolate, code)
995994      };
996995      stream->MakeCallback (fn.As <Function>(), arraysize (argv), argv);
997996    } else  {
@@ -1028,6 +1027,8 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
10281027  Http2Session* session = static_cast <Http2Session*>(user_data);
10291028  DEBUG_HTTP2SESSION2 (session, " buffering data chunk for stream %d, size: " 
10301029              " %d, flags: %d" 
1030+   Environment* env = session->env ();
1031+   HandleScope scope (env->isolate ());
10311032  //  We should never actually get a 0-length chunk so this check is
10321033  //  only a precaution at this point.
10331034  if  (len > 0 ) {
@@ -1039,8 +1040,25 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
10391040    //  If the stream has been destroyed, ignore this chunk
10401041    if  (stream->IsDestroyed ())
10411042      return  0 ;
1043+ 
10421044    stream->statistics_ .received_bytes  += len;
1043-     stream->AddChunk (data, len);
1045+ 
1046+     //  There is a single large array buffer for the entire data read from the
1047+     //  network; create a slice of that array buffer and emit it as the
1048+     //  received data buffer.
1049+     CHECK (!session->stream_buf_ab_ .IsEmpty ());
1050+     size_t  offset = reinterpret_cast <const  char *>(data) - session->stream_buf_ ;
1051+     //  Verify that the data offset is inside the current read buffer.
1052+     CHECK_LE (offset, session->stream_buf_size_ );
1053+ 
1054+     Local<Object> buf =
1055+         Buffer::New (env, session->stream_buf_ab_ , offset, len).ToLocalChecked ();
1056+ 
1057+     stream->EmitData (len, buf, Local<Object>());
1058+     if  (!stream->IsReading ())
1059+       stream->inbound_consumed_data_while_paused_  += len;
1060+     else 
1061+       nghttp2_session_consume_stream (handle, id, len);
10441062  }
10451063  return  0 ;
10461064}
@@ -1226,9 +1244,8 @@ inline void Http2Session::HandlePriorityFrame(const nghttp2_frame* frame) {
12261244
12271245
12281246//  Called by OnFrameReceived when a complete DATA frame has been received.
1229- //  If we know that this is the last DATA frame (because the END_STREAM flag
1230- //  is set), then we'll terminate the readable side of the StreamBase. If
1231- //  the StreamBase is flowing, we'll push the chunks of data out to JS land.
1247+ //  If we know that this was the last DATA frame (because the END_STREAM flag
1248+ //  is set), then we'll terminate the readable side of the StreamBase.
12321249inline  void  Http2Session::HandleDataFrame (const  nghttp2_frame* frame) {
12331250  int32_t  id = GetFrameID (frame);
12341251  DEBUG_HTTP2SESSION2 (this , " handling data frame for stream %d" 
@@ -1239,11 +1256,8 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
12391256    return ;
12401257
12411258  if  (frame->hd .flags  & NGHTTP2_FLAG_END_STREAM) {
1242-     stream->AddChunk ( nullptr ,  0 );
1259+     stream->EmitData (UV_EOF, Local<Object>(), Local<Object>() );
12431260  }
1244- 
1245-   if  (stream->IsReading ())
1246-     stream->FlushDataChunks ();
12471261}
12481262
12491263
@@ -1618,45 +1632,67 @@ void Http2Session::OnStreamAllocImpl(size_t suggested_size,
16181632                                     uv_buf_t * buf,
16191633                                     void * ctx) {
16201634  Http2Session* session = static_cast <Http2Session*>(ctx);
1621-   buf->base  = session->stream_alloc ();
1622-   buf->len  = kAllocBufferSize ;
1635+   CHECK_EQ (session->stream_buf_ , nullptr );
1636+   CHECK_EQ (session->stream_buf_size_ , 0 );
1637+   buf->base  = session->stream_buf_  = Malloc (suggested_size);
1638+   buf->len  = session->stream_buf_size_  = suggested_size;
1639+   session->IncrementCurrentSessionMemory (suggested_size);
16231640}
16241641
16251642//  Callback used to receive inbound data from the i/o stream
16261643void  Http2Session::OnStreamReadImpl (ssize_t  nread,
1627-                                     const  uv_buf_t * bufs ,
1644+                                     const  uv_buf_t * buf ,
16281645                                    uv_handle_type pending,
16291646                                    void * ctx) {
16301647  Http2Session* session = static_cast <Http2Session*>(ctx);
16311648  Http2Scope h2scope (session);
16321649  CHECK_NE (session->stream_ , nullptr );
16331650  DEBUG_HTTP2SESSION2 (session, " receiving %d bytes" 
1634-   if  (nread < 0 ) {
1635-     uv_buf_t  tmp_buf;
1636-     tmp_buf.base  = nullptr ;
1637-     tmp_buf.len  = 0 ;
1638-     session->prev_read_cb_ .fn (nread,
1639-                               &tmp_buf,
1640-                               pending,
1641-                               session->prev_read_cb_ .ctx );
1642-     return ;
1643-   }
1644-   if  (bufs->len  > 0 ) {
1651+   if  (nread <= 0 ) {
1652+     free (session->stream_buf_ );
1653+     if  (nread < 0 ) {
1654+       uv_buf_t  tmp_buf = uv_buf_init (nullptr , 0 );
1655+       session->prev_read_cb_ .fn (nread,
1656+                                 &tmp_buf,
1657+                                 pending,
1658+                                 session->prev_read_cb_ .ctx );
1659+     }
1660+   } else  {
16451661    //  Only pass data on if nread > 0
1646-     uv_buf_t  buf[] { uv_buf_init ((*bufs).base , nread) };
1662+ 
1663+     //  Verify that currently: There is memory allocated into which
1664+     //  the data has been read, and that memory buffer is at least as large
1665+     //  as the amount of data we have read, but we have not yet made an
1666+     //  ArrayBuffer out of it.
1667+     CHECK_NE (session->stream_buf_ , nullptr );
1668+     CHECK_EQ (session->stream_buf_ , buf->base );
1669+     CHECK_EQ (session->stream_buf_size_ , buf->len );
1670+     CHECK_GE (session->stream_buf_size_ , static_cast <size_t >(nread));
1671+     CHECK (session->stream_buf_ab_ .IsEmpty ());
1672+ 
1673+     Environment* env = session->env ();
1674+     Isolate* isolate = env->isolate ();
1675+     HandleScope scope (isolate);
1676+     Local<Context> context = env->context ();
1677+     Context::Scope context_scope (context);
1678+ 
1679+     //  Create an array buffer for the read data. DATA frames will be emitted
1680+     //  as slices of this array buffer to avoid having to copy memory.
1681+     session->stream_buf_ab_  =
1682+         ArrayBuffer::New (isolate,
1683+                          session->stream_buf_ ,
1684+                          session->stream_buf_size_ ,
1685+                          v8::ArrayBufferCreationMode::kInternalized );
1686+ 
1687+     uv_buf_t  buf_ = uv_buf_init (buf->base , nread);
16471688    session->statistics_ .data_received  += nread;
1648-     ssize_t  ret = session->Write (buf , 1 );
1689+     ssize_t  ret = session->Write (&buf_ , 1 );
16491690
16501691    //  Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef
16511692    //  ssize_t to int. Cast here so that the < 0 check actually works on
16521693    //  Windows.
16531694    if  (static_cast <int >(ret) < 0 ) {
16541695      DEBUG_HTTP2SESSION2 (session, " fatal error receiving data: %d" 
1655-       Environment* env = session->env ();
1656-       Isolate* isolate = env->isolate ();
1657-       HandleScope scope (isolate);
1658-       Local<Context> context = env->context ();
1659-       Context::Scope context_scope (context);
16601696
16611697      Local<Value> argv[1 ] = {
16621698        Integer::New (isolate, ret),
@@ -1667,6 +1703,13 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
16671703                          nghttp2_session_want_read (**session));
16681704    }
16691705  }
1706+ 
1707+   //  Since we are finished handling this write, reset the stream buffer.
1708+   //  The memory has either been free()d or was handed over to V8.
1709+   session->DecrementCurrentSessionMemory (session->stream_buf_size_ );
1710+   session->stream_buf_  = nullptr ;
1711+   session->stream_buf_size_  = 0 ;
1712+   session->stream_buf_ab_  = Local<ArrayBuffer>();
16701713}
16711714
16721715void  Http2Session::OnStreamDestructImpl (void * ctx) {
@@ -1781,30 +1824,6 @@ void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) {
17811824  }
17821825}
17831826
1784- inline  bool  Http2Stream::HasDataChunks (bool  ignore_eos) {
1785-   return  data_chunks_.size () > (ignore_eos ? 1  : 0 );
1786- }
1787- 
1788- //  Appends a chunk of received DATA frame data to this Http2Streams internal
1789- //  queue. Note that we must memcpy each chunk because of the way that nghttp2
1790- //  handles it's internal memory`.
1791- inline  void  Http2Stream::AddChunk (const  uint8_t * data, size_t  len) {
1792-   CHECK (!this ->IsDestroyed ());
1793-   if  (this ->statistics_ .first_byte  == 0 )
1794-     this ->statistics_ .first_byte  = uv_hrtime ();
1795-   if  (flags_ & NGHTTP2_STREAM_FLAG_EOS)
1796-     return ;
1797-   char * buf = nullptr ;
1798-   if  (len > 0  && data != nullptr ) {
1799-     buf = Malloc<char >(len);
1800-     memcpy (buf, data, len);
1801-   } else  if  (data == nullptr ) {
1802-     flags_ |= NGHTTP2_STREAM_FLAG_EOS;
1803-   }
1804-   data_chunks_.emplace (uv_buf_init (buf, len));
1805- }
1806- 
1807- 
18081827inline  void  Http2Stream::Close (int32_t  code) {
18091828  CHECK (!this ->IsDestroyed ());
18101829  flags_ |= NGHTTP2_STREAM_FLAG_CLOSED;
@@ -1841,13 +1860,6 @@ inline void Http2Stream::Destroy() {
18411860
18421861  DEBUG_HTTP2STREAM (this , " destroying stream" 
18431862
1844-   //  Free any remaining incoming data chunks.
1845-   while  (!data_chunks_.empty ()) {
1846-     uv_buf_t  buf = data_chunks_.front ();
1847-     free (buf.base );
1848-     data_chunks_.pop ();
1849-   }
1850- 
18511863  //  Wait until the start of the next loop to delete because there
18521864  //  may still be some pending operations queued for this stream.
18531865  env ()->SetImmediate ([](Environment* env, void * data) {
@@ -1873,39 +1885,6 @@ inline void Http2Stream::Destroy() {
18731885}
18741886
18751887
1876- //  Uses the StreamBase API to push a single chunk of queued inbound DATA
1877- //  to JS land.
1878- void  Http2Stream::OnDataChunk (uv_buf_t * chunk) {
1879-   CHECK (!this ->IsDestroyed ());
1880-   Isolate* isolate = env ()->isolate ();
1881-   HandleScope scope (isolate);
1882-   ssize_t  len = -1 ;
1883-   Local<Object> buf;
1884-   if  (chunk != nullptr ) {
1885-     len = chunk->len ;
1886-     buf = Buffer::New (isolate, chunk->base , len).ToLocalChecked ();
1887-   }
1888-   EmitData (len, buf, this ->object ());
1889- }
1890- 
1891- 
1892- inline  void  Http2Stream::FlushDataChunks () {
1893-   CHECK (!this ->IsDestroyed ());
1894-   Http2Scope h2scope (this );
1895-   if  (!data_chunks_.empty ()) {
1896-     uv_buf_t  buf = data_chunks_.front ();
1897-     data_chunks_.pop ();
1898-     if  (buf.len  > 0 ) {
1899-       CHECK_EQ (nghttp2_session_consume_stream (session_->session (),
1900-                                               id_, buf.len ), 0 );
1901-       OnDataChunk (&buf);
1902-     } else  {
1903-       OnDataChunk (nullptr );
1904-     }
1905-   }
1906- }
1907- 
1908- 
19091888//  Initiates a response on the Http2Stream using data provided via the
19101889//  StreamBase Streams API.
19111890inline  int  Http2Stream::SubmitResponse (nghttp2_nv* nva,
@@ -2012,13 +1991,20 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva,
20121991//  Switch the StreamBase into flowing mode to begin pushing chunks of data
20131992//  out to JS land.
20141993inline  int  Http2Stream::ReadStart () {
1994+   Http2Scope h2scope (this );
20151995  CHECK (!this ->IsDestroyed ());
20161996  flags_ |= NGHTTP2_STREAM_FLAG_READ_START;
20171997  flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
20181998
2019-   //  Flush any queued data chunks immediately out to the JS layer
2020-   FlushDataChunks ();
20211999  DEBUG_HTTP2STREAM (this , " reading starting" 
2000+ 
2001+   //  Tell nghttp2 about our consumption of the data that was handed
2002+   //  off to JS land.
2003+   nghttp2_session_consume_stream (session_->session (),
2004+                                  id_,
2005+                                  inbound_consumed_data_while_paused_);
2006+   inbound_consumed_data_while_paused_ = 0 ;
2007+ 
20222008  return  0 ;
20232009}
20242010
0 commit comments