-
Notifications
You must be signed in to change notification settings - Fork 815
Fix unobserved exceptions with retries #2255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2918d27
ad14100
eb02e6c
3f93a6b
ab2c2f1
7bfd25e
8ef308a
6d36d76
c5779fd
8bd2b5a
7b25b72
14cca32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,8 @@ internal abstract partial class RetryCallBase<TRequest, TResponse> : IGrpcCall<T | |
| private readonly TaskCompletionSource<IGrpcCall<TRequest, TResponse>> _commitedCallTcs; | ||
| private RetryCallBaseClientStreamReader<TRequest, TResponse>? _retryBaseClientStreamReader; | ||
| private RetryCallBaseClientStreamWriter<TRequest, TResponse>? _retryBaseClientStreamWriter; | ||
| private Task<TResponse>? _responseTask; | ||
| private Task<Metadata>? _responseHeadersTask; | ||
|
|
||
| // Internal for unit testing. | ||
| internal CancellationTokenRegistration? _ctsRegistration; | ||
|
|
@@ -111,13 +113,34 @@ protected RetryCallBase(GrpcChannel channel, Method<TRequest, TResponse> method, | |
| } | ||
| } | ||
|
|
||
| public async Task<TResponse> GetResponseAsync() | ||
| public Task<TResponse> GetResponseAsync() => _responseTask ??= GetResponseCoreAsync(); | ||
|
|
||
| private async Task<TResponse> GetResponseCoreAsync() | ||
| { | ||
| var call = await CommitedCallTask.ConfigureAwait(false); | ||
| return await call.GetResponseAsync().ConfigureAwait(false); | ||
| } | ||
|
|
||
| public async Task<Metadata> GetResponseHeadersAsync() | ||
| public Task<Metadata> GetResponseHeadersAsync() | ||
| { | ||
| if (_responseHeadersTask == null) | ||
| { | ||
| _responseHeadersTask = GetResponseHeadersCoreAsync(); | ||
|
|
||
| // ResponseHeadersAsync could be called inside a client interceptor when a call is wrapped. | ||
| // Most people won't use the headers result. Observed exception to avoid unobserved exception event. | ||
| _responseHeadersTask.ObserveException(); | ||
|
|
||
| // If there was an error fetching response headers then it's likely the same error is reported | ||
| // by response TCS. The user is unlikely to observe both errors. | ||
| // Observed exception to avoid unobserved exception event. | ||
| _responseTask?.ObserveException(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment makes it seem like the desirability of this behavior depends on the outcome of the header task, but that's not obviously reflected in the code. Will this observe/suppress the response exception if there's no header exception?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Potentially. No error getting headers should mean there isn't an error getting the body. Observing like this is a pragmatic balance. |
||
| } | ||
|
|
||
| return _responseHeadersTask; | ||
| } | ||
|
|
||
| private async Task<Metadata> GetResponseHeadersCoreAsync() | ||
| { | ||
| var call = await CommitedCallTask.ConfigureAwait(false); | ||
| return await call.GetResponseHeadersAsync().ConfigureAwait(false); | ||
|
|
@@ -369,7 +392,7 @@ protected void CommitCall(IGrpcCall<TRequest, TResponse> call, CommitReason comm | |
| // A commited call that has already cleaned up is likely a StatusGrpcCall. | ||
| if (call.Disposed) | ||
| { | ||
| Cleanup(); | ||
| Cleanup(observeExceptions: false); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -382,6 +405,11 @@ protected bool HasClientStream() | |
| return Method.Type == MethodType.ClientStreaming || Method.Type == MethodType.DuplexStreaming; | ||
| } | ||
|
|
||
| protected bool HasResponseStream() | ||
| { | ||
| return Method.Type == MethodType.ServerStreaming || Method.Type == MethodType.DuplexStreaming; | ||
| } | ||
|
|
||
| protected void SetNewActiveCallUnsynchronized(IGrpcCall<TRequest, TResponse> call) | ||
| { | ||
| Debug.Assert(Monitor.IsEntered(Lock), "Should be called with lock."); | ||
|
|
@@ -436,11 +464,11 @@ protected virtual void Dispose(bool disposing) | |
| CommitedCallTask.Result.Dispose(); | ||
| } | ||
|
|
||
| Cleanup(); | ||
| Cleanup(observeExceptions: true); | ||
| } | ||
| } | ||
|
|
||
| protected void Cleanup() | ||
| protected void Cleanup(bool observeExceptions) | ||
| { | ||
| Channel.FinishActiveCall(this); | ||
|
|
||
|
|
@@ -449,6 +477,12 @@ protected void Cleanup() | |
| CancellationTokenSource.Cancel(); | ||
|
|
||
| ClearRetryBuffer(); | ||
|
|
||
| if (observeExceptions) | ||
| { | ||
| _responseTask?.ObserveException(); | ||
| _responseHeadersTask?.ObserveException(); | ||
| } | ||
| } | ||
|
|
||
| internal bool TryAddToRetryBuffer(ReadOnlyMemory<byte> message) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.