Skip to content

Commit d9d9166

Browse files
committed
Pass cancellation token to IFrameHandler.CloseAsync
1 parent 83645ca commit d9d9166

File tree

10 files changed

+76
-33
lines changed

10 files changed

+76
-33
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ await Task.Delay(_config.NetworkRecoveryInterval, token)
9494
.ConfigureAwait(false);
9595
success = await TryPerformAutomaticRecoveryAsync(token)
9696
.ConfigureAwait(false);
97-
} while (!success && !token.IsCancellationRequested);
97+
} while (false == success && false == token.IsCancellationRequested);
9898
}
9999
catch (OperationCanceledException)
100100
{

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ public override string ToString()
196196

197197
internal Task CloseFrameHandlerAsync()
198198
{
199-
return InnerConnection.FrameHandler.CloseAsync();
199+
return InnerConnection.FrameHandler.CloseAsync(CancellationToken.None);
200200
}
201201

202202
///<summary>API-side invocation of updating the secret.</summary>
@@ -208,17 +208,44 @@ public Task UpdateSecretAsync(string newSecret, string reason)
208208
}
209209

210210
///<summary>Asynchronous API-side invocation of connection.close with timeout.</summary>
211-
public async Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken = default)
211+
public async Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort,
212+
CancellationToken cancellationToken = default)
212213
{
213214
ThrowIfDisposed();
214-
// TODO
215-
// what to do if this method throws OperationCanceledException? We still need to close the connection.
216-
await StopRecoveryLoopAsync(cancellationToken)
217-
.ConfigureAwait(false);
218-
if (_innerConnection.IsOpen)
215+
216+
Task CloseInnerConnectionAsync()
217+
{
218+
if (_innerConnection.IsOpen)
219+
{
220+
return _innerConnection.CloseAsync(reasonCode, reasonText, timeout, abort, cancellationToken);
221+
}
222+
else
223+
{
224+
return Task.CompletedTask;
225+
}
226+
}
227+
228+
try
219229
{
220-
await _innerConnection.CloseAsync(reasonCode, reasonText, timeout, abort, cancellationToken)
230+
await StopRecoveryLoopAsync(cancellationToken)
221231
.ConfigureAwait(false);
232+
233+
await CloseInnerConnectionAsync()
234+
.ConfigureAwait(false);
235+
}
236+
catch (Exception ex)
237+
{
238+
try
239+
{
240+
await CloseInnerConnectionAsync()
241+
.ConfigureAwait(false);
242+
}
243+
catch (Exception innerConnectionException)
244+
{
245+
throw new AggregateException(ex, innerConnectionException);
246+
}
247+
248+
throw;
222249
}
223250
}
224251

projects/RabbitMQ.Client/client/impl/Connection.Commands.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken)
114114
{
115115
TerminateMainloop();
116116
// TODO hmmm
117-
FinishCloseAsync().EnsureCompleted();
117+
FinishCloseAsync(CancellationToken.None).EnsureCompleted();
118118
throw new ProtocolVersionMismatchException(Protocol.MajorVersion, Protocol.MinorVersion, serverVersion.Major, serverVersion.Minor);
119119
}
120120

projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ private void HeartbeatReadTimerCallback(object? state)
120120
{
121121
TerminateMainloop();
122122
// TODO hmmm
123-
FinishCloseAsync().EnsureCompleted();
123+
FinishCloseAsync(CancellationToken.None).EnsureCompleted();
124124
}
125125
else
126126
{

projects/RabbitMQ.Client/client/impl/Connection.Receive.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,18 @@ internal sealed partial class Connection
4545
private readonly IFrameHandler _frameHandler;
4646
private Task _mainLoopTask;
4747

48-
// TODO cancellation token
4948
private async Task MainLoop()
5049
{
51-
CancellationToken _mainLoopToken = _mainLoopCts.Token;
50+
CancellationToken mainLoopToken = _mainLoopCts.Token;
5251
try
5352
{
54-
await ReceiveLoopAsync(_mainLoopToken)
53+
await ReceiveLoopAsync(mainLoopToken)
5554
.ConfigureAwait(false);
5655
}
5756
catch (OperationCanceledException)
5857
{
5958
// TODO what to do here?
59+
// Debug log?
6060
}
6161
catch (EndOfStreamException eose)
6262
{
@@ -68,7 +68,7 @@ await ReceiveLoopAsync(_mainLoopToken)
6868
}
6969
catch (HardProtocolException hpe)
7070
{
71-
await HardProtocolExceptionHandlerAsync(hpe, _mainLoopToken)
71+
await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken)
7272
.ConfigureAwait(false);
7373
}
7474
catch (FileLoadException fileLoadException)
@@ -91,7 +91,9 @@ await HardProtocolExceptionHandlerAsync(hpe, _mainLoopToken)
9191
HandleMainLoopException(ea);
9292
}
9393

94-
await FinishCloseAsync();
94+
// TODO is this the best way?
95+
using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionCloseTimeout);
96+
await FinishCloseAsync(cts.Token);
9597
}
9698

9799
private async Task ReceiveLoopAsync(CancellationToken mainLoopCancelllationToken)

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public event EventHandler<ShutdownEventArgs> ConnectionShutdown
151151
add
152152
{
153153
ThrowIfDisposed();
154-
var reason = CloseReason;
154+
ShutdownEventArgs? reason = CloseReason;
155155
if (reason is null)
156156
{
157157
_connectionShutdownWrapper.AddHandler(value);
@@ -379,8 +379,7 @@ await _mainLoopTask.WaitAsync(timeout, cancellationToken)
379379
{
380380
try
381381
{
382-
// TODO cancellation token?
383-
await _frameHandler.CloseAsync()
382+
await _frameHandler.CloseAsync(cancellationToken)
384383
.ConfigureAwait(false);
385384
}
386385
catch
@@ -411,13 +410,13 @@ internal void ClosedViaPeer(ShutdownEventArgs reason)
411410
}
412411

413412
// Only call at the end of the Mainloop or HeartbeatLoop
414-
private async Task FinishCloseAsync()
413+
private async Task FinishCloseAsync(CancellationToken cancellationToken)
415414
{
416415
_mainLoopCts.Cancel();
417416
_closed = true;
418417
MaybeStopHeartbeatTimers();
419418

420-
await _frameHandler.CloseAsync();
419+
await _frameHandler.CloseAsync(cancellationToken);
421420
_channel0.SetCloseReason(CloseReason);
422421
_channel0.FinishClose();
423422
RabbitMqClientEventSource.Log.ConnectionClosed();
@@ -483,6 +482,17 @@ public void Dispose()
483482
{
484483
throw new InvalidOperationException("Connection must be closed before calling Dispose!");
485484
}
485+
486+
_session0.Dispose();
487+
/*
488+
* TODO dispose channel?
489+
if (_channel0 != null && _channel0.IsOpen)
490+
{
491+
IChannel ch0 = _channel0 as IChannel;
492+
ch0?.Dispose();
493+
}
494+
*/
495+
_mainLoopCts.Dispose();
486496
}
487497
catch (OperationInterruptedException)
488498
{

projects/RabbitMQ.Client/client/impl/IFrameHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ internal interface IFrameHandler
5656
///<summary>Socket write timeout. System.Threading.Timeout.InfiniteTimeSpan signals "infinity".</summary>
5757
TimeSpan WriteTimeout { set; }
5858

59-
Task CloseAsync();
59+
Task CloseAsync(CancellationToken cancellationToken);
6060

6161
///<summary>Read a frame from the underlying
6262
///transport. Returns null if the read operation timed out

projects/RabbitMQ.Client/client/impl/MainSession.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,10 @@
4343
namespace RabbitMQ.Client.Impl
4444
{
4545
///<summary>Small ISession implementation used only for channel 0.</summary>
46-
internal sealed class MainSession : Session
46+
internal sealed class MainSession : Session, IDisposable
4747
{
4848
private volatile bool _closeIsServerInitiated;
4949
private volatile bool _closing;
50-
// TODO dispose
5150
private readonly SemaphoreSlim _closingSemaphore = new SemaphoreSlim(1, 1);
5251

5352
public MainSession(Connection connection) : base(connection, 0)
@@ -167,5 +166,7 @@ public override ValueTask TransmitAsync<T>(in T cmd, CancellationToken cancellat
167166

168167
return base.TransmitAsync(in cmd, cancellationToken);
169168
}
169+
170+
public void Dispose() => ((IDisposable)_closingSemaphore).Dispose();
170171
}
171172
}

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken)
218218
}
219219
catch
220220
{
221-
await CloseAsync()
221+
await CloseAsync(cancellationToken)
222222
.ConfigureAwait(false);
223223
throw;
224224
}
@@ -231,18 +231,17 @@ await CloseAsync()
231231
_connected = true;
232232
}
233233

234-
// TODO cancellationToken
235-
public async Task CloseAsync()
234+
public async Task CloseAsync(CancellationToken cancellationToken)
236235
{
237236
if (_closed || _socket == null)
238237
{
239238
return;
240239
}
241240

242-
await _closingSemaphore.WaitAsync()
243-
.ConfigureAwait(false);
244241
try
245242
{
243+
await _closingSemaphore.WaitAsync(cancellationToken)
244+
.ConfigureAwait(false);
246245
try
247246
{
248247
_channelWriter.Complete();
@@ -269,9 +268,13 @@ await _pipeReader.CompleteAsync()
269268
// ignore, we are closing anyway
270269
}
271270
}
271+
catch
272+
{
273+
}
272274
finally
273275
{
274276
_closingSemaphore.Release();
277+
_closingSemaphore.Dispose();
275278
_closed = true;
276279
}
277280
}
@@ -441,7 +444,7 @@ await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port, linkedTokenSource.
441444
{
442445
if (timeoutTokenSource.Token.IsCancellationRequested)
443446
{
444-
// TODO do not use System.TimeoutException here
447+
// TODO maybe do not use System.TimeoutException here
445448
var timeoutException = new TimeoutException(msg, e);
446449
throw new ConnectFailureException(msg, timeoutException);
447450
}

projects/Test/Common/IntegrationFixtureBase.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -390,11 +390,11 @@ protected static async Task WaitAsync(TaskCompletionSource<bool> tcs, TimeSpan t
390390
await tcs.Task.WaitAsync(timeSpan);
391391
bool result = await tcs.Task;
392392
Assert.True((true == result) && (tcs.Task.IsCompletedSuccessfully()),
393-
$"waiting {timeSpan.TotalSeconds} seconds on a tcs for '{desc}' timed out");
393+
$"waiting {timeSpan.TotalSeconds} seconds on a tcs for '{desc}' failed");
394394
}
395-
catch (TimeoutException)
395+
catch (TimeoutException ex)
396396
{
397-
Assert.Fail($"waiting {timeSpan.TotalSeconds} seconds on a tcs for '{desc}' timed out");
397+
Assert.Fail($"waiting {timeSpan.TotalSeconds} seconds on a tcs for '{desc}' timed out, ex: {ex}");
398398
}
399399
}
400400

0 commit comments

Comments
 (0)