Skip to content

Commit 960fd68

Browse files
committed
Add more use of CancellationToken in Async methods.
Correctly dispose of CancellationToken and CancellationTokenRegistration instances. Renames. Refactor Connection.Close to use async internally. Pass cancellation token around a bit more. Try to figure out what is keeping net472 tests from running. Fix test by adding WaitAsync that also takes a timeout. * Add `ConfigureAwait` where it was missing * Always create cancellation token source for recovery, and dispose it * Modify WaitAsync extension to see if task has already completed Don't swallow exceptions unless `abort` is specified. Add TaskCreationOptions to two spots Add `SetSessionClosingAsync` Remove debugging, use cancellation token to stop receieve loop Pass the main loop cancellation token into HardProtocolExceptionHandlerAsync Pass cancellation token to IFrameHandler.CloseAsync
1 parent 220f5a5 commit 960fd68

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+928
-662
lines changed

projects/RabbitMQ.Client/FrameworkExtension/Interlocked.cs

Lines changed: 0 additions & 29 deletions
This file was deleted.

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
918918
~RabbitMQ.Client.IChannel.TxCommitAsync() -> System.Threading.Tasks.Task
919919
~RabbitMQ.Client.IChannel.TxRollbackAsync() -> System.Threading.Tasks.Task
920920
~RabbitMQ.Client.IChannel.TxSelectAsync() -> System.Threading.Tasks.Task
921-
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort) -> System.Threading.Tasks.Task
921+
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
922922
~RabbitMQ.Client.IConnection.CreateChannelAsync() -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
923923
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason) -> System.Threading.Tasks.Task
924924
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>

projects/RabbitMQ.Client/client/TaskExtensions.cs

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,67 @@ public static bool IsCompletedSuccessfully(this Task task)
5353
private static readonly TaskContinuationOptions s_tco = TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously;
5454
private static void IgnoreTaskContinuation(Task t, object s) => t.Exception.Handle(e => true);
5555

56-
public static async Task WithCancellation(this Task task, CancellationToken cancellationToken)
56+
// https://devblogs.microsoft.com/pfxteam/how-do-i-cancel-non-cancelable-async-operations/
57+
public static Task WaitAsync(this Task task, TimeSpan timeout, CancellationToken cancellationToken)
5758
{
58-
var tcs = new TaskCompletionSource<bool>();
59+
if (task.IsCompletedSuccessfully())
60+
{
61+
return task;
62+
}
63+
else
64+
{
65+
return DoWaitWithTimeoutAsync(task, timeout, cancellationToken);
66+
}
67+
}
68+
69+
private static async Task DoWaitWithTimeoutAsync(this Task task, TimeSpan timeout, CancellationToken cancellationToken)
70+
{
71+
using var timeoutTokenCts = new CancellationTokenSource(timeout);
72+
CancellationToken timeoutToken = timeoutTokenCts.Token;
73+
74+
var linkedTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
75+
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(timeoutToken, cancellationToken);
76+
using CancellationTokenRegistration cancellationTokenRegistration =
77+
linkedCts.Token.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
78+
state: linkedTokenTcs, useSynchronizationContext: false);
79+
80+
if (task != await Task.WhenAny(task, linkedTokenTcs.Task).ConfigureAwait(false))
81+
{
82+
task.Ignore();
83+
if (timeoutToken.IsCancellationRequested)
84+
{
85+
throw new OperationCanceledException($"Operation timed out after {timeout}");
86+
}
87+
else
88+
{
89+
throw new OperationCanceledException(cancellationToken);
90+
}
91+
}
5992

60-
// https://devblogs.microsoft.com/pfxteam/how-do-i-cancel-non-cancelable-async-operations/
61-
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
93+
await task.ConfigureAwait(false);
94+
}
95+
96+
// https://devblogs.microsoft.com/pfxteam/how-do-i-cancel-non-cancelable-async-operations/
97+
public static Task WaitAsync(this Task task, CancellationToken cancellationToken)
98+
{
99+
if (task.IsCompletedSuccessfully())
62100
{
63-
if (task != await Task.WhenAny(task, tcs.Task))
101+
return task;
102+
}
103+
else
104+
{
105+
return DoWaitAsync(task, cancellationToken);
106+
}
107+
}
108+
109+
private static async Task DoWaitAsync(this Task task, CancellationToken cancellationToken)
110+
{
111+
var cancellationTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
112+
113+
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
114+
state: cancellationTokenTcs, useSynchronizationContext: false))
115+
{
116+
if (task != await Task.WhenAny(task, cancellationTokenTcs.Task).ConfigureAwait(false))
64117
{
65118
task.Ignore();
66119
throw new OperationCanceledException(cancellationToken);
@@ -172,10 +225,13 @@ public static T EnsureCompleted<T>(this ValueTask<T> task)
172225

173226
public static void EnsureCompleted(this ValueTask task)
174227
{
175-
task.GetAwaiter().GetResult();
228+
if (false == task.IsCompletedSuccessfully)
229+
{
230+
task.GetAwaiter().GetResult();
231+
}
176232
}
177233

178-
#if !NET6_0_OR_GREATER
234+
#if NETSTANDARD
179235
// https://github.com/dotnet/runtime/issues/23878
180236
// https://github.com/dotnet/runtime/issues/23878#issuecomment-1398958645
181237
public static void Ignore(this Task task)

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
618618
internal async Task<IFrameHandler> CreateFrameHandlerAsync(
619619
AmqpTcpEndpoint endpoint, CancellationToken cancellationToken)
620620
{
621+
cancellationToken.ThrowIfCancellationRequested();
621622
IFrameHandler fh = new SocketFrameHandler(endpoint, SocketFactory, RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
622623
await fh.ConnectAsync(cancellationToken)
623624
.ConfigureAwait(false);

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Threading;
3435
using System.Threading.Tasks;
3536
using RabbitMQ.Client.Events;
3637
using RabbitMQ.Client.Exceptions;
@@ -222,9 +223,10 @@ public interface IConnection : INetworkConnection, IDisposable
222223
/// </summary>
223224
/// <param name="reasonCode">The close code (See under "Reply Codes" in the AMQP 0-9-1 specification).</param>
224225
/// <param name="reasonText">A message indicating the reason for closing the connection.</param>
225-
/// <param name="timeout">Operation timeout.</param>
226+
/// <param name="timeout"></param>
226227
/// <param name="abort">Whether or not this close is an abort (ignores certain exceptions).</param>
227-
Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort);
228+
/// <param name="cancellationToken"></param>
229+
Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken = default);
228230

229231
/// <summary>
230232
/// Asynchronously create and return a fresh channel, session, and channel.

projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ public static class IConnectionExtensions
2020
/// </remarks>
2121
public static Task CloseAsync(this IConnection connection)
2222
{
23-
return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", InternalConstants.DefaultConnectionCloseTimeout, false);
23+
return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", InternalConstants.DefaultConnectionCloseTimeout, false,
24+
CancellationToken.None);
2425
}
2526

2627
/// <summary>
@@ -38,7 +39,8 @@ public static Task CloseAsync(this IConnection connection)
3839
/// </remarks>
3940
public static Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText)
4041
{
41-
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionCloseTimeout, false);
42+
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionCloseTimeout, false,
43+
CancellationToken.None);
4244
}
4345

4446
/// <summary>
@@ -58,7 +60,8 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
5860
/// </remarks>
5961
public static Task CloseAsync(this IConnection connection, TimeSpan timeout)
6062
{
61-
return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", timeout, false);
63+
return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", timeout, false,
64+
CancellationToken.None);
6265
}
6366

6467
/// <summary>
@@ -80,7 +83,8 @@ public static Task CloseAsync(this IConnection connection, TimeSpan timeout)
8083
/// </remarks>
8184
public static Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout)
8285
{
83-
return connection.CloseAsync(reasonCode, reasonText, timeout, false);
86+
return connection.CloseAsync(reasonCode, reasonText, timeout, false,
87+
CancellationToken.None);
8488
}
8589

8690
/// <summary>
@@ -94,7 +98,8 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
9498
/// </remarks>
9599
public static Task AbortAsync(this IConnection connection)
96100
{
97-
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true);
101+
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true,
102+
CancellationToken.None);
98103
}
99104

100105
/// <summary>
@@ -112,7 +117,8 @@ public static Task AbortAsync(this IConnection connection)
112117
/// </remarks>
113118
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText)
114119
{
115-
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true);
120+
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true,
121+
CancellationToken.None);
116122
}
117123

118124
/// <summary>
@@ -130,7 +136,8 @@ public static Task AbortAsync(this IConnection connection, ushort reasonCode, st
130136
/// </remarks>
131137
public static Task AbortAsync(this IConnection connection, TimeSpan timeout)
132138
{
133-
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", timeout, true);
139+
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", timeout, true,
140+
CancellationToken.None);
134141
}
135142

136143
/// <summary>
@@ -149,7 +156,8 @@ public static Task AbortAsync(this IConnection connection, TimeSpan timeout)
149156
/// </remarks>
150157
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout)
151158
{
152-
return connection.CloseAsync(reasonCode, reasonText, timeout, true);
159+
return connection.CloseAsync(reasonCode, reasonText, timeout, true,
160+
CancellationToken.None);
153161
}
154162
}
155163
}

projects/RabbitMQ.Client/client/api/IEndpointResolverExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public static async Task<T> SelectOneAsync<T>(this IEndpointResolver resolver,
4545
var exceptions = new List<Exception>();
4646
foreach (AmqpTcpEndpoint ep in resolver.All())
4747
{
48+
cancellationToken.ThrowIfCancellationRequested();
4849
try
4950
{
5051
t = await selector(ep, cancellationToken).ConfigureAwait(false);

projects/RabbitMQ.Client/client/api/TcpClientAdapter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public virtual Task ConnectAsync(IPAddress ep, int port, CancellationToken cance
2929
#else
3030
public virtual Task ConnectAsync(IPAddress ep, int port, CancellationToken cancellationToken = default)
3131
{
32-
return _sock.ConnectAsync(ep, port).WithCancellation(cancellationToken);
32+
return _sock.ConnectAsync(ep, port).WaitAsync(cancellationToken);
3333
}
3434
#endif
3535

projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ public override async Task HandleBasicDeliverAsync(string consumerTag, ulong del
9292
BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
9393
using (Activity activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default)
9494
{
95-
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
95+
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
96+
.ConfigureAwait(false);
9697
Received?.Invoke(this, eventArgs);
9798
}
9899
}

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System.Collections.Generic;
33+
using System.Threading;
3234
using System.Threading.Tasks;
3335
using RabbitMQ.Client.client.framing;
3436
using RabbitMQ.Client.Impl;
@@ -69,19 +71,22 @@ public override void _Private_ConnectionCloseOk()
6971
public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple)
7072
{
7173
var method = new BasicAck(deliveryTag, multiple);
72-
return ModelSendAsync(method);
74+
// TODO cancellation token?
75+
return ModelSendAsync(method, CancellationToken.None);
7376
}
7477

7578
public override ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue)
7679
{
7780
var method = new BasicNack(deliveryTag, multiple, requeue);
78-
return ModelSendAsync(method);
81+
// TODO use cancellation token
82+
return ModelSendAsync(method, CancellationToken.None);
7983
}
8084

8185
public override Task BasicRejectAsync(ulong deliveryTag, bool requeue)
8286
{
8387
var method = new BasicReject(deliveryTag, requeue);
84-
return ModelSendAsync(method).AsTask();
88+
// TODO cancellation token?
89+
return ModelSendAsync(method, CancellationToken.None).AsTask();
8590
}
8691

8792
protected override bool DispatchAsynchronous(in IncomingCommand cmd)

0 commit comments

Comments
 (0)