Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,30 @@ internal void CompleteWriter(Exception? exception)

internal void AdvanceReader(in SequencePosition consumed)
{
AdvanceReader(consumed, consumed);
// If the reader is completed
if (_readerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}

long examinedIndex = consumed.GetInteger();
BufferSegment? examinedSegment = (BufferSegment?)consumed.GetObject();
if (examinedSegment != null &&
// Avoid the lock if we're examining the entire segment, don't need to look at the last examined index in that case
examinedSegment.Length - examinedIndex > 0)
{
lock (SyncObj)
Copy link
Member Author

@BrennanConroy BrennanConroy Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would technically be ok to access _lastExaminedIndex outside of the lock as it should only be modified by read calls or the Pipe completing, both of which should be synchronized (in user code) with the advance from the current read.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this lock isn't necessary considering it's only comparing values supplied by the read loop.

I think we need to be able to advance examinedSegment and examinedIndex to subsequent BufferSegments if the goal is to not throw for unexamining. If we want to go that route, I think it makes sense to store the entire _lastExaminedSequencePosition rather than just the _lastExaminedIndex and use that if it's greater than consumed which could also probably be done outside a lock.

{
// If the last examined index is further than the consumed pointer, let's use the last examined index

// _lastExaminedIndex includes the RunningIndex so we remove that to calculate how many bytes we're examining
examinedIndex = Math.Max(examinedIndex, _lastExaminedIndex - examinedSegment.RunningIndex);
}
}

// TODO: Use new SequenceMarshal.TryGetReadOnlySequenceSegment to get the correct data
// directly casting only works because the type value in ReadOnlySequenceSegment is 0
AdvanceReader((BufferSegment?)consumed.GetObject(), consumed.GetInteger(), examinedSegment, (int)examinedIndex);
}

internal void AdvanceReader(in SequencePosition consumed, in SequencePosition examined)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ protected virtual async ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSi
/// <param name="consumed">Marks the extent of the data that has been successfully processed.</param>
/// <remarks>The memory for the consumed data will be released and no longer available.
/// The <see cref="System.IO.Pipelines.ReadResult.Buffer" /> previously returned from <see cref="System.IO.Pipelines.PipeReader.ReadAsync(System.Threading.CancellationToken)" /> must not be accessed after this call.
/// This is equivalent to calling <see cref="System.IO.Pipelines.PipeReader.AdvanceTo(System.SequencePosition,System.SequencePosition)" /> with identical examined and consumed positions.
/// This is equivalent to calling <see cref="System.IO.Pipelines.PipeReader.AdvanceTo(System.SequencePosition,System.SequencePosition)" /> with identical examined and consumed positions,
/// unless a previous <see cref="PipeReader.AdvanceTo(SequencePosition, SequencePosition)"/> call was made with a further examined index, then it will use the further examined index.
/// The examined data communicates to the pipeline when it should signal more data is available.
/// Because the consumed parameter doubles as the examined parameter, the consumed parameter should be greater than or equal to the examined position in the previous call to `AdvanceTo`. Otherwise, an <see cref="System.InvalidOperationException" /> is thrown.</remarks>
/// </remarks>
public abstract void AdvanceTo(SequencePosition consumed);

/// <summary>Moves forward the pipeline's read cursor to after the consumed data, marking the data as processed, read and examined.</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public async Task DisposingPipeReaderStreamCompletesPipeReader(bool dataInPipe)
for (int i = 0; i < 2; i++)
{
s.Dispose();
#if NETCOREAPP
await s.DisposeAsync();
#endif
}

// Make sure OnReaderCompleted was invoked.
Expand Down Expand Up @@ -296,6 +298,32 @@ public void AsStreamDoNotCompleteReader()
pipeReader.AsStream(leaveOpen: true).Dispose();
}

// Regression test: https://github.com/dotnet/runtime/issues/107213
[Fact]
public async Task ZeroByteReadWorksWhenExaminedDoesNotEqualConsumed()
{
Pipe pipe = new Pipe();
Stream stream = pipe.Reader.AsStream();

await pipe.Writer.WriteAsync(new byte[2]);

ReadResult readResult = await pipe.Reader.ReadAsync();
// Consume: 0, Advance: 2
pipe.Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);

// Write more so that the next read will see unexamined bytes available and not block
await pipe.Writer.WriteAsync(new byte[2]);

// Zero-byte read to test that advancing (via PipeReader.AdvanceTo(consumed)) doesn't throw due to examined being less than
// the last examined position
int result = await stream.ReadAsync(Memory<byte>.Empty);
Assert.Equal(0, result);

// Real read to make sure data is immediately available
result = await stream.ReadAsync(new byte[100]);
Assert.Equal(4, result);
}

public class BuggyPipeReader : PipeReader
{
public override void AdvanceTo(SequencePosition consumed)
Expand Down Expand Up @@ -405,7 +433,7 @@ public static IEnumerable<object[]> ReadCalls

ReadAsyncDelegate readSpanSync = (stream, data) =>
{
return Task.FromResult(stream.Read(data));
return Task.FromResult(stream.Read(data, 0, data.Length));
};

yield return new object[] { readArrayAsync };
Expand Down
32 changes: 32 additions & 0 deletions src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,38 @@ public async Task ReadAsyncReturnsDataAfterCanceledRead()
pipe.Reader.AdvanceTo(readResult.Buffer.End);
}

// Regression test: https://github.com/dotnet/runtime/issues/107213
[Fact]
public async Task AdvanceToWithoutExaminedUsesFurthestExaminedIndex()
{
PipeWriter buffer = _pipe.Writer;
buffer.Write("Hello Worl"u8.ToArray());
await buffer.FlushAsync();

bool gotData = _pipe.Reader.TryRead(out ReadResult result);
Assert.True(gotData);

Assert.Equal("Hello Worl", Encoding.ASCII.GetString(result.Buffer.ToArray()));

// Advance past 'Hello ' and examine everything else
_pipe.Reader.AdvanceTo(result.Buffer.GetPosition(6), result.Buffer.End);

// Write so that the next ReadAsync will be unblocked
buffer.Write("d"u8.ToArray());
await buffer.FlushAsync();

result = await _pipe.Reader.ReadAsync();

Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray()));

// Previous examine is at the end of 'Worl', calling AdvanceTo without passing examined will honor the previous examined index
_pipe.Reader.AdvanceTo(result.Buffer.Start);

// Double check that ReadAsync is still unblocked and works
result = await _pipe.Reader.ReadAsync();
Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray()));
}

private bool IsTaskWithResult<T>(ValueTask<T> task)
{
return task == new ValueTask<T>(task.Result);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TargetFrameworks>$(NetCoreAppCurrent);$(NetFrameworkMinimum)</TargetFrameworks>
Expand All @@ -25,6 +25,7 @@
<Compile Include="PipeCompletionCallbacksTests.cs" />
<Compile Include="PipeOptionsTests.cs" />
<Compile Include="PipeReaderWriterFacts.cs" />
<Compile Include="PipeReaderStreamTests.cs" />
<Compile Include="PipePoolTests.cs" />
<Compile Include="PipeResetTests.cs" />
<Compile Include="PipeTest.cs" />
Expand All @@ -51,7 +52,6 @@
<Compile Include="PipeResetTests.nonnetstandard.cs" />
<Compile Include="PipePoolTests.nonnetstandard.cs" />
<Compile Include="PipeWriterStreamTests.nonnetstandard.cs" />
<Compile Include="PipeReaderStreamTests.nonnetstandard.cs" />
<Compile Include="PipeReaderWriterStreamTests.nonnetstandard.cs" />
</ItemGroup>
<ItemGroup>
Expand Down