diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs index 65379ef68f9a30..8019e17984ae9a 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs @@ -59,6 +59,17 @@ public void SetOwnedMemory(byte[] arrayPoolBuffer) AvailableMemory = arrayPoolBuffer; } + // Resets memory and internal state, should be called when removing the segment from the linked list + public void Reset() + { + ResetMemory(); + + Next = null; + RunningIndex = 0; + _next = null; + } + + // Resets memory only, should be called when keeping the BufferSegment in the linked list and only swapping out the memory public void ResetMemory() { IMemoryOwner? memoryOwner = _memoryOwner; @@ -74,10 +85,8 @@ public void ResetMemory() _array = null; } - Next = null; - RunningIndex = 0; + Memory = default; - _next = null; _end = 0; AvailableMemory = default; } diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 0c743f4c74a678..85dbc333ebe9d8 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -197,10 +197,21 @@ private void AllocateWriteHeadSynchronized(int sizeHint) _writingHeadBytesBuffered = 0; } - BufferSegment newSegment = AllocateSegment(sizeHint); + if (_writingHead.Length == 0) + { + // If we got here that means Advance was called with 0 bytes or GetMemory was called again without any writes occurring + // And, the newly requested memory size is greater than our unused segments internal memory buffer + // So we should reuse the BufferSegment and replace the memory it's holding, this way ReadAsync will not receive a buffer with one segment being empty + _writingHead.ResetMemory(); + RentMemory(_writingHead, sizeHint); + } + else + { + BufferSegment newSegment = AllocateSegment(sizeHint); - _writingHead.SetNext(newSegment); - _writingHead = newSegment; + _writingHead.SetNext(newSegment); + _writingHead = newSegment; + } } } } @@ -208,9 +219,19 @@ private void AllocateWriteHeadSynchronized(int sizeHint) private BufferSegment AllocateSegment(int sizeHint) { - Debug.Assert(sizeHint >= 0); BufferSegment newSegment = CreateSegmentUnsynchronized(); + RentMemory(newSegment, sizeHint); + + return newSegment; + } + + private void RentMemory(BufferSegment segment, int sizeHint) + { + // Segment should be new or reset, otherwise a memory leak could occur + Debug.Assert(segment.MemoryOwner is null); + Debug.Assert(sizeHint >= 0); + MemoryPool? pool = null; int maxSize = -1; @@ -223,18 +244,16 @@ private BufferSegment AllocateSegment(int sizeHint) if (sizeHint <= maxSize) { // Use the specified pool as it fits. Specified pool is not null as maxSize == -1 if _pool is null. - newSegment.SetOwnedMemory(pool!.Rent(GetSegmentSize(sizeHint, maxSize))); + segment.SetOwnedMemory(pool!.Rent(GetSegmentSize(sizeHint, maxSize))); } else { // Use the array pool int sizeToRequest = GetSegmentSize(sizeHint); - newSegment.SetOwnedMemory(ArrayPool.Shared.Rent(sizeToRequest)); + segment.SetOwnedMemory(ArrayPool.Shared.Rent(sizeToRequest)); } - _writingHeadMemory = newSegment.AvailableMemory; - - return newSegment; + _writingHeadMemory = segment.AvailableMemory; } private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) @@ -553,7 +572,7 @@ void MoveReturnEndToNextBlock() while (returnStart != null && returnStart != returnEnd) { BufferSegment? next = returnStart.NextSegment; - returnStart.ResetMemory(); + returnStart.Reset(); ReturnSegmentUnsynchronized(returnStart); returnStart = next; } @@ -866,7 +885,7 @@ private void CompletePipe() BufferSegment returnSegment = segment; segment = segment.NextSegment; - returnSegment.ResetMemory(); + returnSegment.Reset(); } _writingHead = null; diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs index 3ec9cdd6d7f103..b911295f493345 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs @@ -151,7 +151,6 @@ private void AdvanceTo(BufferSegment? consumedSegment, int consumedIndex, Buffer while (returnStart != returnEnd) { BufferSegment next = returnStart.NextSegment!; - returnStart.ResetMemory(); ReturnSegmentUnsynchronized(returnStart); returnStart = next; } @@ -192,7 +191,7 @@ private bool CompleteAndGetNeedsDispose() BufferSegment returnSegment = segment; segment = segment.NextSegment; - returnSegment.ResetMemory(); + returnSegment.Reset(); } return !LeaveOpen; @@ -624,6 +623,8 @@ private void ReturnSegmentUnsynchronized(BufferSegment segment) Debug.Assert(segment != _readHead, "Returning _readHead segment that's in use!"); Debug.Assert(segment != _readTail, "Returning _readTail segment that's in use!"); + segment.Reset(); + if (_bufferSegmentPool.Count < MaxSegmentPoolSize) { _bufferSegmentPool.Push(segment); diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs index ce83c11733b6e1..c065df5cd1a10b 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs @@ -193,6 +193,7 @@ private BufferSegment CreateSegmentUnsynchronized() private void ReturnSegmentUnsynchronized(BufferSegment segment) { + segment.Reset(); if (_bufferSegmentPool.Count < MaxSegmentPoolSize) { _bufferSegmentPool.Push(segment); @@ -323,7 +324,6 @@ private async ValueTask FlushAsyncInternal(bool writeToStream, Read await InnerStream.WriteAsync(returnSegment.Memory, localToken).ConfigureAwait(false); } - returnSegment.ResetMemory(); ReturnSegmentUnsynchronized(returnSegment); // Update the head segment after we return the current segment @@ -400,7 +400,6 @@ private void FlushInternal(bool writeToStream) #endif } - returnSegment.ResetMemory(); ReturnSegmentUnsynchronized(returnSegment); // Update the head segment after we return the current segment diff --git a/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs index 5dbbeeef1bb7eb..7cc75ef4ef424f 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs @@ -162,6 +162,71 @@ public async Task CanWriteNothingToBuffer() await buffer.FlushAsync(); } + [Fact] + public async Task WriteNothingThenWriteToNewSegment() + { + // Regression test: write nothing to force a segment to be created, then do a large write that's larger than the currently empty segment to force another new segment + // Verify that no 0 length segments are returned from the Reader. + PipeWriter buffer = Pipe.Writer; + Memory memory = buffer.GetMemory(); + buffer.Advance(0); // doing nothing, the hard way + await buffer.FlushAsync(); + + memory = buffer.GetMemory(memory.Length + 1); + buffer.Advance(memory.Length); + await buffer.FlushAsync(); + + var res = await Pipe.Reader.ReadAsync(); + Assert.True(res.Buffer.IsSingleSegment); + Assert.Equal(memory.Length, res.Buffer.Length); + } + + [Fact] + public async Task WriteNothingBetweenTwoFullWrites() + { + int totalWrittenLength = 0; + PipeWriter buffer = Pipe.Writer; + Memory memory = buffer.GetMemory(); + buffer.Advance(memory.Length); // doing nothing, the hard way + totalWrittenLength += memory.Length; + await buffer.FlushAsync(); + + memory = buffer.GetMemory(); + buffer.Advance(0); // doing nothing, the hard way + await buffer.FlushAsync(); + + memory = buffer.GetMemory(memory.Length + 1); + buffer.Advance(memory.Length); + totalWrittenLength += memory.Length; + await buffer.FlushAsync(); + + var res = await Pipe.Reader.ReadAsync(); + var segmentCount = 0; + foreach (ReadOnlyMemory _ in res.Buffer) + { + segmentCount++; + } + Assert.Equal(2, segmentCount); + Assert.Equal(totalWrittenLength, res.Buffer.Length); + } + + [Fact] + public async Task WriteNothingThenWriteSomeBytes() + { + PipeWriter buffer = Pipe.Writer; + _ = buffer.GetMemory(); + buffer.Advance(0); // doing nothing, the hard way + await buffer.FlushAsync(); + + var memory = buffer.GetMemory(); + buffer.Advance(memory.Length); + await buffer.FlushAsync(); + + var res = await Pipe.Reader.ReadAsync(); + Assert.True(res.Buffer.IsSingleSegment); + Assert.Equal(memory.Length, res.Buffer.Length); + } + [Fact] public void EmptyWriteDoesNotThrow() {