Skip to content

Commit c809043

Browse files
committed
* Add a default consumer to see if perhaps that's why these tests sometimes fail.
1 parent 58026c3 commit c809043

File tree

2 files changed

+57
-2
lines changed

2 files changed

+57
-2
lines changed

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace RabbitMQ.Client.ConsumerDispatching
77
#nullable enable
88
internal abstract class ConsumerDispatcherBase
99
{
10-
private static readonly FallbackConsumer fallbackConsumer = new FallbackConsumer();
10+
private static readonly FallbackConsumer s_fallbackConsumer = new FallbackConsumer();
1111
private readonly Dictionary<string, IBasicConsumer> _consumers = new Dictionary<string, IBasicConsumer>();
1212

1313
public IBasicConsumer? DefaultConsumer { get; set; }
@@ -74,7 +74,7 @@ private void DoShutdownConsumers(ShutdownEventArgs reason)
7474
[MethodImpl(MethodImplOptions.NoInlining)]
7575
private IBasicConsumer GetDefaultOrFallbackConsumer()
7676
{
77-
return DefaultConsumer ?? fallbackConsumer;
77+
return DefaultConsumer ?? s_fallbackConsumer;
7878
}
7979
}
8080
}

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public TestAsyncConsumer(ITestOutputHelper output)
5252
[Fact]
5353
public async Task TestBasicRoundtripConcurrent()
5454
{
55+
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);
56+
5557
QueueDeclareOk q = await _channel.QueueDeclareAsync();
5658

5759
const int length = 4096;
@@ -129,6 +131,8 @@ public async Task TestBasicRoundtripConcurrent()
129131
[Fact]
130132
public async Task TestBasicRoundtripConcurrentManyMessages()
131133
{
134+
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);
135+
132136
const int publish_total = 4096;
133137
const int length = 512;
134138
string queueName = GenerateQueueName();
@@ -185,6 +189,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
185189
};
186190
using (IChannel publishChannel = await publishConn.CreateChannelAsync())
187191
{
192+
publishChannel.DefaultConsumer = new DefaultAsyncConsumer("publishChannel,", _output);
188193
publishChannel.ChannelShutdown += (o, ea) =>
189194
{
190195
HandleChannelShutdown(publishChannel, ea, (args) =>
@@ -221,6 +226,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
221226
};
222227
using (IChannel consumeChannel = await consumeConn.CreateChannelAsync())
223228
{
229+
consumeChannel.DefaultConsumer = new DefaultAsyncConsumer("consumeChannel,", _output);
224230
consumeChannel.ChannelShutdown += (o, ea) =>
225231
{
226232
HandleChannelShutdown(consumeChannel, ea, (args) =>
@@ -592,5 +598,54 @@ private static bool ByteArraysEqual(ReadOnlySpan<byte> a1, ReadOnlySpan<byte> a2
592598
{
593599
return a1.SequenceEqual(a2);
594600
}
601+
602+
private class DefaultAsyncConsumer : AsyncDefaultBasicConsumer
603+
{
604+
private readonly string _logPrefix;
605+
private readonly ITestOutputHelper _output;
606+
607+
public DefaultAsyncConsumer(string logPrefix, ITestOutputHelper output)
608+
{
609+
_logPrefix = logPrefix;
610+
_output = output;
611+
}
612+
613+
public override Task HandleBasicCancel(string consumerTag)
614+
{
615+
_output.WriteLine("[ERROR] {0} HandleBasicCancel {1}", _logPrefix, consumerTag);
616+
return base.HandleBasicCancel(consumerTag);
617+
}
618+
619+
public override Task HandleBasicCancelOk(string consumerTag)
620+
{
621+
_output.WriteLine("[ERROR] {0} HandleBasicCancelOk {1}", _logPrefix, consumerTag);
622+
return base.HandleBasicCancelOk(consumerTag);
623+
}
624+
625+
public override Task HandleBasicConsumeOk(string consumerTag)
626+
{
627+
_output.WriteLine("[ERROR] {0} HandleBasicConsumeOk {1}", _logPrefix, consumerTag);
628+
return base.HandleBasicConsumeOk(consumerTag);
629+
}
630+
631+
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered,
632+
string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
633+
{
634+
_output.WriteLine("[ERROR] {0} HandleBasicDeliver {1}", _logPrefix, consumerTag);
635+
return base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
636+
}
637+
638+
public override Task HandleChannelShutdown(object channel, ShutdownEventArgs reason)
639+
{
640+
_output.WriteLine("[ERROR] {0} HandleChannelShutdown", _logPrefix);
641+
return base.HandleChannelShutdown(channel, reason);
642+
}
643+
644+
public override Task OnCancel(params string[] consumerTags)
645+
{
646+
_output.WriteLine("[ERROR] {0} OnCancel {1}", _logPrefix, consumerTags[0]);
647+
return base.OnCancel(consumerTags);
648+
}
649+
}
595650
}
596651
}

0 commit comments

Comments
 (0)