Skip to content

Commit b14bd95

Browse files
authored
Merge pull request #1873 from rabbitmq/gh-1865
Do not handle publisher confirms when disposed
2 parents 37c1018 + a748041 commit b14bd95

File tree

5 files changed

+207
-18
lines changed

5 files changed

+207
-18
lines changed

RabbitMQDotNetClient.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projec
4848
EndProject
4949
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1749", "projects\Applications\GH-1749\GH-1749.csproj", "{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}"
5050
EndProject
51+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1865", "projects\Applications\GH-1865\GH-1865.csproj", "{38CE721E-2801-AED1-DDF8-DC5F888C6C05}"
52+
EndProject
5153
Global
5254
GlobalSection(SolutionConfigurationPlatforms) = preSolution
5355
Debug|Any CPU = Debug|Any CPU
@@ -114,6 +116,10 @@ Global
114116
{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Debug|Any CPU.Build.0 = Debug|Any CPU
115117
{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Release|Any CPU.ActiveCfg = Release|Any CPU
116118
{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Release|Any CPU.Build.0 = Release|Any CPU
119+
{38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
120+
{38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Debug|Any CPU.Build.0 = Debug|Any CPU
121+
{38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Release|Any CPU.ActiveCfg = Release|Any CPU
122+
{38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Release|Any CPU.Build.0 = Release|Any CPU
117123
EndGlobalSection
118124
GlobalSection(SolutionProperties) = preSolution
119125
HideSolutionNode = FALSE
@@ -130,6 +136,7 @@ Global
130136
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
131137
{13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
132138
{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
139+
{38CE721E-2801-AED1-DDF8-DC5F888C6C05} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
133140
EndGlobalSection
134141
GlobalSection(ExtensibilityGlobals) = postSolution
135142
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<RootNamespace>GH_1865</RootNamespace>
7+
<ImplicitUsings>enable</ImplicitUsings>
8+
<Nullable>enable</Nullable>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<ProjectReference Include="../../RabbitMQ.Client/RabbitMQ.Client.csproj" />
13+
</ItemGroup>
14+
15+
</Project>
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Licensed under the Apache License, Version 2.0 (the "License");
8+
// you may not use this file except in compliance with the License.
9+
// You may obtain a copy of the License at
10+
//
11+
// https://www.apache.org/licenses/LICENSE-2.0
12+
//
13+
// Unless required by applicable law or agreed to in writing, software
14+
// distributed under the License is distributed on an "AS IS" BASIS,
15+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
// See the License for the specific language governing permissions and
17+
// limitations under the License.
18+
//---------------------------------------------------------------------------
19+
//
20+
// The MPL v2.0:
21+
//
22+
//---------------------------------------------------------------------------
23+
// This Source Code Form is subject to the terms of the Mozilla Public
24+
// License, v. 2.0. If a copy of the MPL was not distributed with this
25+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
26+
//---------------------------------------------------------------------------
27+
28+
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
29+
30+
using System.Diagnostics;
31+
using System.Globalization;
32+
using RabbitMQ.Client;
33+
34+
class Program
35+
{
36+
static int _channelsProcessed;
37+
static readonly TaskCompletionSource<bool> s_tcs = new();
38+
static readonly ThreadLocal<Random> s_rng = new(() => new Random());
39+
40+
private static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture);
41+
42+
static async Task Main(string[] args)
43+
{
44+
const int Repeats = 3;
45+
const int ChannelsToOpen = 20;
46+
47+
var connectionFactory = new ConnectionFactory
48+
{
49+
HostName = "localhost",
50+
Port = 5672,
51+
UserName = "guest",
52+
Password = "guest",
53+
VirtualHost = "/",
54+
RequestedConnectionTimeout = TimeSpan.FromMilliseconds(60000),
55+
RequestedHeartbeat = TimeSpan.FromSeconds(600),
56+
AutomaticRecoveryEnabled = false,
57+
TopologyRecoveryEnabled = false,
58+
ContinuationTimeout = TimeSpan.FromMilliseconds(1000)
59+
};
60+
await using IConnection connection = await connectionFactory.CreateConnectionAsync();
61+
62+
var watch = Stopwatch.StartNew();
63+
_ = Task.Run(async () =>
64+
{
65+
for (int i = 0; i < Repeats; i++)
66+
{
67+
try
68+
{
69+
var tasks = new Task[ChannelsToOpen];
70+
for (int j = 0; j < ChannelsToOpen; j++)
71+
{
72+
tasks[j] = Task.Run(async () =>
73+
{
74+
try
75+
{
76+
IChannel channel = await connection.CreateChannelAsync(
77+
new CreateChannelOptions(true, true));
78+
var cts = new CancellationTokenSource();
79+
int cancelAfterMs = s_rng.Value!.Next(1, 10000); // upper bound exclusive
80+
cts.CancelAfter(cancelAfterMs);
81+
var tcs = new TaskCompletionSource<int>();
82+
channel.ChannelShutdownAsync += async (sender, args) =>
83+
{
84+
await Task.Delay(100);
85+
tcs.TrySetResult(1);
86+
};
87+
try
88+
{
89+
await channel.CloseAsync();
90+
}
91+
catch (TaskCanceledException ex)
92+
{
93+
Console.WriteLine(
94+
$"{Now} CloseAsync canceled after {cancelAfterMs} ms " +
95+
$"{ex.Message}");
96+
}
97+
catch (OperationCanceledException ex)
98+
{
99+
Console.WriteLine(
100+
$"{Now} CloseAsync canceled after {cancelAfterMs} ms" +
101+
$"{ex.Message}");
102+
}
103+
catch (Exception exClose)
104+
{
105+
Console.WriteLine($"{Now} CloseAsync error: {exClose.GetType().Name} {exClose.Message}");
106+
}
107+
108+
// Wait a bit for the ChannelShutdown event to fire
109+
var delayTask = Task.Delay(15000);
110+
await Task.WhenAny(tcs.Task, delayTask);
111+
await channel.DisposeAsync();
112+
cts.Dispose();
113+
}
114+
catch (Exception exOuter)
115+
{
116+
Console.WriteLine($"{Now} outer error: {exOuter.GetType().Name} {exOuter.Message}");
117+
}
118+
finally
119+
{
120+
Interlocked.Increment(ref _channelsProcessed);
121+
}
122+
});
123+
}
124+
await Task.WhenAll(tasks);
125+
}
126+
catch (Exception ex)
127+
{
128+
Console.WriteLine($"{Now} connection error: {ex.GetType().Name} {ex.Message}");
129+
}
130+
}
131+
132+
s_tcs.SetResult(true);
133+
});
134+
135+
Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}");
136+
Console.WriteLine();
137+
Console.WriteLine("Opened");
138+
while (false == s_tcs.Task.IsCompleted)
139+
{
140+
await Task.Delay(500);
141+
Console.WriteLine($"{_channelsProcessed,5}");
142+
}
143+
watch.Stop();
144+
Console.WriteLine($"{_channelsProcessed,5}");
145+
Console.WriteLine();
146+
Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms");
147+
}
148+
}

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -268,22 +268,18 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent)
268268
[MethodImpl(MethodImplOptions.AggressiveInlining)]
269269
private async Task MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(ShutdownEventArgs reason)
270270
{
271+
if (_disposed)
272+
{
273+
return;
274+
}
275+
271276
if (_publisherConfirmationsEnabled)
272277
{
273278
await _confirmSemaphore.WaitAsync(reason.CancellationToken)
274279
.ConfigureAwait(false);
275280
try
276281
{
277-
if (!_confirmsTaskCompletionSources.IsEmpty)
278-
{
279-
var exception = new AlreadyClosedException(reason);
280-
foreach (TaskCompletionSource<bool> confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values)
281-
{
282-
confirmsTaskCompletionSource.TrySetException(exception);
283-
}
284-
285-
_confirmsTaskCompletionSources.Clear();
286-
}
282+
MaybeSetExceptionOnConfirmsTcs(reason);
287283
}
288284
finally
289285
{
@@ -404,5 +400,27 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
404400
}
405401
}
406402
}
403+
404+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
405+
private void MaybeSetExceptionOnConfirmsTcs(ShutdownEventArgs? reason = null)
406+
{
407+
if (!_confirmsTaskCompletionSources.IsEmpty)
408+
{
409+
Exception ex;
410+
if (reason is not null)
411+
{
412+
ex = new AlreadyClosedException(reason);
413+
}
414+
else
415+
{
416+
ex = new OperationInterruptedException();
417+
}
418+
foreach (TaskCompletionSource<bool> confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values)
419+
{
420+
confirmsTaskCompletionSource.TrySetException(ex);
421+
}
422+
_confirmsTaskCompletionSources.Clear();
423+
}
424+
}
407425
}
408426
}

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,7 @@ protected virtual void Dispose(bool disposing)
591591
{
592592
_rpcSemaphore.Dispose();
593593
_confirmSemaphore.Dispose();
594+
MaybeSetExceptionOnConfirmsTcs();
594595
}
595596
catch
596597
{
@@ -608,13 +609,13 @@ public async ValueTask DisposeAsync()
608609
return;
609610
}
610611

611-
await DisposeAsyncCore()
612+
await DisposeAsyncCoreAsync()
612613
.ConfigureAwait(false);
613614

614615
Dispose(false);
615616
}
616617

617-
protected virtual async ValueTask DisposeAsyncCore()
618+
protected virtual async ValueTask DisposeAsyncCoreAsync()
618619
{
619620
if (_disposed)
620621
{
@@ -669,7 +670,7 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart
669670
return ModelSendAsync(in method, cancellationToken).AsTask();
670671
}
671672

672-
protected async Task<bool> HandleBasicAck(IncomingCommand cmd,
673+
protected async Task<bool> HandleBasicAckAsync(IncomingCommand cmd,
673674
CancellationToken cancellationToken = default)
674675
{
675676
var ack = new BasicAck(cmd.MethodSpan);
@@ -685,7 +686,7 @@ await _basicAcksAsyncWrapper.InvokeAsync(this, args)
685686
return true;
686687
}
687688

688-
protected async Task<bool> HandleBasicNack(IncomingCommand cmd,
689+
protected async Task<bool> HandleBasicNackAsync(IncomingCommand cmd,
689690
CancellationToken cancellationToken = default)
690691
{
691692
var nack = new BasicNack(cmd.MethodSpan);
@@ -702,7 +703,7 @@ await _basicNacksAsyncWrapper.InvokeAsync(this, args)
702703
return true;
703704
}
704705

705-
protected async Task<bool> HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken)
706+
protected async Task<bool> HandleBasicReturnAsync(IncomingCommand cmd, CancellationToken cancellationToken)
706707
{
707708
var basicReturn = new BasicReturn(cmd.MethodSpan);
708709

@@ -1750,16 +1751,16 @@ private Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken c
17501751
}
17511752
case ProtocolCommandId.BasicAck:
17521753
{
1753-
return HandleBasicAck(cmd, cancellationToken);
1754+
return HandleBasicAckAsync(cmd, cancellationToken);
17541755
}
17551756
case ProtocolCommandId.BasicNack:
17561757
{
1757-
return HandleBasicNack(cmd, cancellationToken);
1758+
return HandleBasicNackAsync(cmd, cancellationToken);
17581759
}
17591760
case ProtocolCommandId.BasicReturn:
17601761
{
17611762
// Note: always returns true
1762-
return HandleBasicReturn(cmd, cancellationToken);
1763+
return HandleBasicReturnAsync(cmd, cancellationToken);
17631764
}
17641765
case ProtocolCommandId.ChannelClose:
17651766
{

0 commit comments

Comments
 (0)