-
Notifications
You must be signed in to change notification settings - Fork 4.6k
transport: Avoid buffer copies when reading Data frames #8657
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9875111
a6f1b72
3f98471
5fa01cd
5ec4a4e
6820fee
00586ba
8ef2d03
a121b15
e7b6d35
5585538
f0e9b13
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,6 @@ import ( | |
| "fmt" | ||
| "io" | ||
| "math" | ||
| "net" | ||
| "net/http" | ||
| "net/url" | ||
| "strconv" | ||
|
|
@@ -37,6 +36,7 @@ import ( | |
| "golang.org/x/net/http2" | ||
| "golang.org/x/net/http2/hpack" | ||
| "google.golang.org/grpc/codes" | ||
| "google.golang.org/grpc/mem" | ||
| ) | ||
|
|
||
| const ( | ||
|
|
@@ -300,11 +300,11 @@ type bufWriter struct { | |
| buf []byte | ||
| offset int | ||
| batchSize int | ||
| conn net.Conn | ||
| conn io.Writer | ||
| err error | ||
| } | ||
|
|
||
| func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter { | ||
| func newBufWriter(conn io.Writer, batchSize int, pool *sync.Pool) *bufWriter { | ||
| w := &bufWriter{ | ||
| batchSize: batchSize, | ||
| conn: conn, | ||
|
|
@@ -388,15 +388,34 @@ func toIOError(err error) error { | |
| return ioError{error: err} | ||
| } | ||
|
|
||
| type parsedDataFrame struct { | ||
| http2.FrameHeader | ||
| data mem.Buffer | ||
| } | ||
|
|
||
| func (df *parsedDataFrame) StreamEnded() bool { | ||
| return df.FrameHeader.Flags.Has(http2.FlagDataEndStream) | ||
| } | ||
|
|
||
| type framer struct { | ||
| writer *bufWriter | ||
| fr *http2.Framer | ||
| writer *bufWriter | ||
| fr *http2.Framer | ||
| reader io.Reader | ||
| dataFrame parsedDataFrame // Cached data frame to avoid heap allocations. | ||
| pool mem.BufferPool | ||
| errDetail error | ||
| } | ||
|
|
||
| var writeBufferPoolMap = make(map[int]*sync.Pool) | ||
| var writeBufferMutex sync.Mutex | ||
|
|
||
| func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer { | ||
| func newFramer(conn io.ReadWriter, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32, memPool mem.BufferPool) *framer { | ||
| if memPool == nil { | ||
| // Note that this is only supposed to be nil in tests. Otherwise, stream | ||
| // is always initialized with a BufferPool. | ||
| memPool = mem.DefaultBufferPool() | ||
| } | ||
|
Comment on lines
+413
to
+417
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it hard to fix the tests? Maybe we can do that as a follow-up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Acknowledged. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Raised #8688 with the cleanup. |
||
|
|
||
| if writeBufferSize < 0 { | ||
| writeBufferSize = 0 | ||
| } | ||
|
|
@@ -412,6 +431,8 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBu | |
| f := &framer{ | ||
| writer: w, | ||
| fr: http2.NewFramer(w, r), | ||
| reader: r, | ||
| pool: memPool, | ||
| } | ||
| f.fr.SetMaxReadFrameSize(http2MaxFrameLen) | ||
| // Opt-in to Frame reuse API on framer to reduce garbage. | ||
|
|
@@ -422,6 +443,111 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBu | |
| return f | ||
| } | ||
|
|
||
| // readFrame reads a single frame. The returned Frame is only valid | ||
| // until the next call to readFrame. | ||
| func (f *framer) readFrame() (any, error) { | ||
| f.errDetail = nil | ||
| fh, err := f.fr.ReadFrameHeader() | ||
| if err != nil { | ||
| f.errDetail = f.fr.ErrorDetail() | ||
| return nil, err | ||
| } | ||
| // Read the data frame directly from the underlying io.Reader to avoid | ||
| // copies. | ||
| if fh.Type == http2.FrameData { | ||
| err = f.readDataFrame(fh) | ||
| return &f.dataFrame, err | ||
| } | ||
| fr, err := f.fr.ReadFrameForHeader(fh) | ||
| if err != nil { | ||
| f.errDetail = f.fr.ErrorDetail() | ||
| return nil, err | ||
| } | ||
| return fr, err | ||
| } | ||
|
|
||
| // errorDetail returns a more detailed error of the last error | ||
| // returned by framer.readFrame. For instance, if readFrame | ||
| // returns a StreamError with code PROTOCOL_ERROR, errorDetail | ||
| // will say exactly what was invalid. errorDetail is not guaranteed | ||
| // to return a non-nil value. | ||
| // errorDetail is reset after the next call to readFrame. | ||
| func (f *framer) errorDetail() error { | ||
| return f.errDetail | ||
| } | ||
|
|
||
| func (f *framer) readDataFrame(fh http2.FrameHeader) (err error) { | ||
| if fh.StreamID == 0 { | ||
| // DATA frames MUST be associated with a stream. If a | ||
| // DATA frame is received whose stream identifier | ||
| // field is 0x0, the recipient MUST respond with a | ||
| // connection error (Section 5.4.1) of type | ||
| // PROTOCOL_ERROR. | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| f.errDetail = errors.New("DATA frame with stream ID 0") | ||
| return http2.ConnectionError(http2.ErrCodeProtocol) | ||
| } | ||
| // Converting a *[]byte to a mem.SliceBuffer incurs a heap allocation. This | ||
| // conversion is performed by mem.NewBuffer. To avoid the extra allocation | ||
| // a []byte is allocated directly if required and cast to a mem.SliceBuffer. | ||
| var buf []byte | ||
| // poolHandle is the pointer returned by the buffer pool (if it's used.). | ||
| var poolHandle *[]byte | ||
| useBufferPool := !mem.IsBelowBufferPoolingThreshold(int(fh.Length)) | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if useBufferPool { | ||
| poolHandle = f.pool.Get(int(fh.Length)) | ||
| buf = *poolHandle | ||
| defer func() { | ||
| if err != nil { | ||
| f.pool.Put(poolHandle) | ||
| } | ||
| }() | ||
| } else { | ||
| buf = make([]byte, int(fh.Length)) | ||
| } | ||
| if fh.Flags.Has(http2.FlagDataPadded) { | ||
| if fh.Length == 0 { | ||
| return io.ErrUnexpectedEOF | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| // This initial 1-byte read can be inefficient for unbuffered readers, | ||
| // but it allows the rest of the payload to be read directly to the | ||
| // start of the destination slice. This makes it easy to return the | ||
| // original slice back to the buffer pool. | ||
| if _, err := io.ReadFull(f.reader, buf[:1]); err != nil { | ||
| return err | ||
| } | ||
| padSize := buf[0] | ||
| buf = buf[:len(buf)-1] | ||
| if int(padSize) > len(buf) { | ||
| // If the length of the padding is greater than the | ||
| // length of the frame payload, the recipient MUST | ||
| // treat this as a connection error. | ||
| // Filed: https://github.com/http2/http2-spec/issues/610 | ||
| f.errDetail = errors.New("pad size larger than data payload") | ||
| return http2.ConnectionError(http2.ErrCodeProtocol) | ||
| } | ||
| if _, err := io.ReadFull(f.reader, buf); err != nil { | ||
| return err | ||
| } | ||
| buf = buf[:len(buf)-int(padSize)] | ||
| } else if _, err := io.ReadFull(f.reader, buf); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| f.dataFrame.FrameHeader = fh | ||
| if useBufferPool { | ||
| // Update the handle to point to the (potentially re-sliced) buf. | ||
| *poolHandle = buf | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| f.dataFrame.data = mem.NewBuffer(poolHandle, f.pool) | ||
| } else { | ||
| f.dataFrame.data = mem.SliceBuffer(buf) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (df *parsedDataFrame) Header() http2.FrameHeader { | ||
| return df.FrameHeader | ||
| } | ||
|
|
||
| func getWriteBufferPool(size int) *sync.Pool { | ||
| writeBufferMutex.Lock() | ||
| defer writeBufferMutex.Unlock() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉