@@ -119,6 +119,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa
119119 _state . StateGCHandle = GCHandle . Alloc ( _state ) ;
120120 try
121121 {
122+ Debug . Assert ( ! Monitor . IsEntered ( _state ) , "!Monitor.IsEntered(_state)" ) ;
122123 MsQuicApi . Api . SetCallbackHandlerDelegate (
123124 _state . Handle ,
124125 s_streamDelegate ,
@@ -164,6 +165,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F
164165
165166 try
166167 {
168+ Debug . Assert ( ! Monitor . IsEntered ( _state ) , "!Monitor.IsEntered(_state)" ) ;
167169 uint status = MsQuicApi . Api . StreamOpenDelegate (
168170 connectionState . Handle ,
169171 flags ,
@@ -173,6 +175,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F
173175
174176 QuicExceptionHelpers . ThrowIfFailed ( status , "Failed to open stream to peer." ) ;
175177
178+ Debug . Assert ( ! Monitor . IsEntered ( _state ) , "!Monitor.IsEntered(_state)" ) ;
176179 status = MsQuicApi . Api . StreamStartDelegate ( _state . Handle , QUIC_STREAM_START_FLAGS . FAIL_BLOCKED ) ;
177180 QuicExceptionHelpers . ThrowIfFailed ( status , "Could not start stream." ) ;
178181 }
@@ -227,7 +230,7 @@ internal override int WriteTimeout
227230 get
228231 {
229232 ThrowIfDisposed ( ) ;
230- return _writeTimeout ;
233+ return _writeTimeout ;
231234 }
232235 set
233236 {
@@ -420,6 +423,8 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
420423 long abortError ;
421424 bool preCanceled = false ;
422425
426+ int bytesRead = - 1 ;
427+ bool reenableReceive = false ;
423428 lock ( _state )
424429 {
425430 initialReadState = _state . ReadState ;
@@ -482,22 +487,32 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
482487 {
483488 _state . ReadState = ReadState . None ;
484489
485- int taken = CopyMsQuicBuffersToUserBuffer ( _state . ReceiveQuicBuffers . AsSpan ( 0 , _state . ReceiveQuicBuffersCount ) , destination . Span ) ;
486- ReceiveComplete ( taken ) ;
490+ bytesRead = CopyMsQuicBuffersToUserBuffer ( _state . ReceiveQuicBuffers . AsSpan ( 0 , _state . ReceiveQuicBuffersCount ) , destination . Span ) ;
487491
488- if ( taken != _state . ReceiveQuicBuffersTotalBytes )
492+ if ( bytesRead != _state . ReceiveQuicBuffersTotalBytes )
489493 {
490494 // Need to re-enable receives because MsQuic will pause them when we don't consume the entire buffer.
491- EnableReceive ( ) ;
495+ reenableReceive = true ;
492496 }
493497 else if ( _state . ReceiveIsFinal )
494498 {
495499 // This was a final message and we've consumed everything. We can complete the state without waiting for PEER_SEND_SHUTDOWN
496500 _state . ReadState = ReadState . ReadsCompleted ;
497501 }
502+ }
503+ }
504+
505+ // methods below need to be called outside of the lock
506+ if ( bytesRead > - 1 )
507+ {
508+ ReceiveComplete ( bytesRead ) ;
498509
499- return new ValueTask < int > ( taken ) ;
510+ if ( reenableReceive )
511+ {
512+ EnableReceive ( ) ;
500513 }
514+
515+ return new ValueTask < int > ( bytesRead ) ;
501516 }
502517
503518 // All success scenarios returned at this point. Failure scenarios below:
@@ -510,7 +525,7 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
510525 ex = new InvalidOperationException ( "Only one read is supported at a time." ) ;
511526 break ;
512527 case ReadState . Aborted :
513- ex = preCanceled ? new OperationCanceledException ( cancellationToken ) :
528+ ex = preCanceled ? new OperationCanceledException ( cancellationToken ) :
514529 ThrowHelper . GetStreamAbortedException ( abortError ) ;
515530 break ;
516531 case ReadState . ConnectionClosed :
@@ -609,6 +624,7 @@ internal override void AbortWrite(long errorCode)
609624
610625 private void StartShutdown ( QUIC_STREAM_SHUTDOWN_FLAGS flags , long errorCode )
611626 {
627+ Debug . Assert ( ! Monitor . IsEntered ( _state ) , "!Monitor.IsEntered(_state)" ) ;
612628 uint status = MsQuicApi . Api . StreamShutdownDelegate ( _state . Handle , flags , errorCode ) ;
613629 QuicExceptionHelpers . ThrowIfFailed ( status , "StreamShutdown failed." ) ;
614630 }
@@ -818,15 +834,17 @@ private void Dispose(bool disposing)
818834 {
819835 // Handle race condition when stream can be closed handling SHUTDOWN_COMPLETE.
820836 StartShutdown ( QUIC_STREAM_SHUTDOWN_FLAGS . GRACEFUL , errorCode : 0 ) ;
821- } catch ( ObjectDisposedException ) { } ;
837+ }
838+ catch ( ObjectDisposedException ) { } ;
822839 }
823840
824841 if ( abortRead )
825842 {
826843 try
827844 {
828845 StartShutdown ( QUIC_STREAM_SHUTDOWN_FLAGS . ABORT_RECEIVE , 0xffffffff ) ;
829- } catch ( ObjectDisposedException ) { } ;
846+ }
847+ catch ( ObjectDisposedException ) { } ;
830848 }
831849
832850 if ( completeRead )
@@ -845,6 +863,7 @@ private void Dispose(bool disposing)
845863
846864 private void EnableReceive ( )
847865 {
866+ Debug . Assert ( ! Monitor . IsEntered ( _state ) , "!Monitor.IsEntered(_state)" ) ;
848867 uint status = MsQuicApi . Api . StreamReceiveSetEnabledDelegate ( _state . Handle , enabled : true ) ;
849868 QuicExceptionHelpers . ThrowIfFailed ( status , "StreamReceiveSetEnabled failed." ) ;
850869 }
@@ -1289,6 +1308,7 @@ private unsafe ValueTask SendReadOnlyMemoryAsync(
12891308 _state . BufferArrays [ 0 ] = handle ;
12901309 _state . SendBufferCount = 1 ;
12911310
1311+ Debug . Assert ( ! Monitor . IsEntered ( _state ) , "!Monitor.IsEntered(_state)" ) ;
12921312 uint status = MsQuicApi . Api . StreamSendDelegate (
12931313 _state . Handle ,
12941314 quicBuffers ,
@@ -1352,6 +1372,7 @@ private unsafe ValueTask SendReadOnlySequenceAsync(
13521372 ++ count ;
13531373 }
13541374
1375+ Debug . Assert ( ! Monitor . IsEntered ( _state ) , "!Monitor.IsEntered(_state)" ) ;
13551376 uint status = MsQuicApi . Api . StreamSendDelegate (
13561377 _state . Handle ,
13571378 quicBuffers ,
@@ -1412,6 +1433,7 @@ private unsafe ValueTask SendReadOnlyMemoryListAsync(
14121433 _state . BufferArrays [ i ] = handle ;
14131434 }
14141435
1436+ Debug . Assert ( ! Monitor . IsEntered ( _state ) , "!Monitor.IsEntered(_state)" ) ;
14151437 uint status = MsQuicApi . Api . StreamSendDelegate (
14161438 _state . Handle ,
14171439 quicBuffers ,
@@ -1434,6 +1456,7 @@ private unsafe ValueTask SendReadOnlyMemoryListAsync(
14341456
14351457 private void ReceiveComplete ( int bufferLength )
14361458 {
1459+ Debug . Assert ( ! Monitor . IsEntered ( _state ) , "!Monitor.IsEntered(_state)" ) ;
14371460 uint status = MsQuicApi . Api . StreamReceiveCompleteDelegate ( _state . Handle , ( ulong ) bufferLength ) ;
14381461 QuicExceptionHelpers . ThrowIfFailed ( status , "Could not complete receive call." ) ;
14391462 }
0 commit comments