Skip to content

Commit 84e838b

Browse files
authored
Merge pull request #128 from zlepper/rabbitmq-client-v7-support
Add support for v7 of the Rabbitmq client using pure async-await
2 parents ff754ec + 75f706d commit 84e838b

29 files changed

+497
-581
lines changed

Rebus.RabbitMq.Tests/Bugs/AutomaticallyCreatesErrorQueue.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using NUnit.Framework;
1+
using System.Threading.Tasks;
2+
using NUnit.Framework;
23
using Rebus.Activation;
34
using Rebus.Config;
45
using Rebus.Retry.Simple;
@@ -12,13 +13,13 @@ public class AutomaticallyCreatesErrorQueue : FixtureBase
1213
[TestCase("error")]
1314
[TestCase("error_customized")]
1415
[Description("Tried without success to reproduce an error where error queues would, for some reason, not be there after starting")]
15-
public void WhatTheFixtureSays(string errorQueueName)
16+
public async Task WhatTheFixtureSays(string errorQueueName)
1617
{
1718
var inputQueueName = TestConfig.GetName("input");
1819

1920
// ensure the queues do not exist beforehand
20-
RabbitMqTransportFactory.DeleteQueue(inputQueueName);
21-
RabbitMqTransportFactory.DeleteQueue(errorQueueName);
21+
await RabbitMqTransportFactory.DeleteQueue(inputQueueName);
22+
await RabbitMqTransportFactory.DeleteQueue(errorQueueName);
2223

2324
// ensure they're cleaned up afterwards too
2425
Using(new QueueDeleter(inputQueueName));
@@ -42,7 +43,7 @@ public void WhatTheFixtureSays(string errorQueueName)
4243
})
4344
.Start();
4445

45-
Assert.That(RabbitMqTransportFactory.QueueExists(errorQueueName), Is.True,
46+
Assert.That(await RabbitMqTransportFactory.QueueExists(errorQueueName), Is.True,
4647
$"The error queue '{errorQueueName}' was not found as expected");
4748
}
4849
}

Rebus.RabbitMq.Tests/Examples/CanSendDelayedMessageWithDelayedMessageExchangePlugin.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public async Task ShowHowItIsDone_TimeoutManager_AutomaticDeclaration()
6969
[Test]
7070
public async Task ShowHowItIsDone_TimeoutManager()
7171
{
72-
DeclareDelayedMessageExchange("RebusDelayed");
72+
await DeclareDelayedMessageExchange("RebusDelayed");
7373

7474
using var gotTheMessage = new ManualResetEvent(initialState: false);
7575

@@ -103,7 +103,7 @@ public async Task ShowHowItIsDone_TimeoutManager()
103103
[Test]
104104
public async Task ShowHowItIsDone_Manual()
105105
{
106-
DeclareDelayedMessageExchange("RebusDelayed");
106+
await DeclareDelayedMessageExchange("RebusDelayed");
107107

108108
using var gotTheMessage = new ManualResetEvent(initialState: false);
109109

@@ -133,13 +133,13 @@ public async Task ShowHowItIsDone_Manual()
133133
Assert.That(elapsed, Is.GreaterThan(TimeSpan.FromSeconds(5)));
134134
}
135135

136-
void DeclareDelayedMessageExchange(string exchangeName)
136+
async Task DeclareDelayedMessageExchange(string exchangeName)
137137
{
138138
var connectionFactory = new ConnectionFactory { Uri = new(_connectionString) };
139-
using var connection = connectionFactory.CreateConnection();
140-
using var model = connection.CreateModel();
139+
await using var connection = await connectionFactory.CreateConnectionAsync();
140+
await using var model = await connection.CreateChannelAsync();
141141

142-
model.ExchangeDeclare(
142+
await model.ExchangeDeclareAsync(
143143
exchange: exchangeName,
144144
type: "x-delayed-message",
145145
durable: true,

Rebus.RabbitMq.Tests/QueueDeleter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public void Dispose()
1616
{
1717
try
1818
{
19-
RabbitMqTransportFactory.DeleteQueue(_queueName);
19+
RabbitMqTransportFactory.DeleteQueue(_queueName).GetAwaiter().GetResult();
2020

2121
Console.WriteLine($"Queue '{_queueName}' deleted");
2222
}

Rebus.RabbitMq.Tests/RabbitMqCreateQueueTest.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
using System;
66
using System.Collections.Generic;
77
using System.Threading;
8+
using System.Threading.Tasks;
9+
810
// ReSharper disable AccessToDisposedClosure
911
// ReSharper disable UnusedVariable
1012

@@ -14,7 +16,7 @@ namespace Rebus.RabbitMq.Tests;
1416
public class RabbitMqCreateQueueTest : FixtureBase
1517
{
1618
[Test]
17-
public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_False_AND_TTL0_THEN_BusCanStart_()
19+
public async Task Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_False_AND_TTL0_THEN_BusCanStart_()
1820
{
1921
using var testScope = new QueueNameTestScope();
2022
using var activator = new BuiltinHandlerActivator();
@@ -35,7 +37,7 @@ public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_False_AND_TTL0_TH
3537
}
3638

3739
Thread.Sleep(5000);
38-
Assert.That(RabbitMqTransportFactory.QueueExists(testScope.QueueName), Is.True, $"The queue '{testScope.QueueName}' does not exist");
40+
Assert.That(await RabbitMqTransportFactory.QueueExists(testScope.QueueName), Is.True, $"The queue '{testScope.QueueName}' does not exist");
3941
}
4042

4143
[Test]
@@ -84,7 +86,7 @@ void InitializeWithZeroTtl()
8486
}
8587

8688
[Test]
87-
public void Test_CreateQueue_WHEN_InputQueueOptions_SetQueueTTL_5000_THEN_QueueIsDeleted_WHEN_5000msAfterConnectionClosed()
89+
public async Task Test_CreateQueue_WHEN_InputQueueOptions_SetQueueTTL_5000_THEN_QueueIsDeleted_WHEN_5000msAfterConnectionClosed()
8890
{
8991
using var testScope = new QueueNameTestScope();
9092

@@ -108,13 +110,13 @@ public void Test_CreateQueue_WHEN_InputQueueOptions_SetQueueTTL_5000_THEN_QueueI
108110

109111

110112
Thread.Sleep(5000);
111-
Assert.That(RabbitMqTransportFactory.QueueExists(testScope.QueueName), Is.False, $"The queue '{testScope.QueueName}' was still there");
113+
Assert.That(await RabbitMqTransportFactory.QueueExists(testScope.QueueName), Is.False, $"The queue '{testScope.QueueName}' was still there");
112114
}
113115

114116
class QueueNameTestScope : IDisposable
115117
{
116118
public string QueueName { get; } = Guid.NewGuid().ToString();
117119

118-
public void Dispose() => RabbitMqTransportFactory.DeleteQueue(QueueName);
120+
public void Dispose() => RabbitMqTransportFactory.DeleteQueue(QueueName).GetAwaiter().GetResult();
119121
}
120122
}

Rebus.RabbitMq.Tests/RabbitMqCustomExchangeNamesTest.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ public async Task CanUseCustomExchangeName()
2424
const string customDirectExchangeName = "Dingo";
2525
const string customTopicExchangeName = "Topico";
2626

27-
RabbitMqTransportFactory.DeleteExchange(RabbitMqOptionsBuilder.DefaultDirectExchangeName);
28-
RabbitMqTransportFactory.DeleteExchange(RabbitMqOptionsBuilder.DefaultTopicExchangeName);
29-
RabbitMqTransportFactory.DeleteExchange(customDirectExchangeName);
30-
RabbitMqTransportFactory.DeleteExchange(customTopicExchangeName);
27+
await RabbitMqTransportFactory.DeleteExchange(RabbitMqOptionsBuilder.DefaultDirectExchangeName);
28+
await RabbitMqTransportFactory.DeleteExchange(RabbitMqOptionsBuilder.DefaultTopicExchangeName);
29+
await RabbitMqTransportFactory.DeleteExchange(customDirectExchangeName);
30+
await RabbitMqTransportFactory.DeleteExchange(customTopicExchangeName);
3131

3232
using (var activator = new BuiltinHandlerActivator())
3333
{
@@ -49,10 +49,10 @@ public async Task CanUseCustomExchangeName()
4949
gotString.WaitOrDie(TimeSpan.FromSeconds(3));
5050
}
5151

52-
Assert.That(RabbitMqTransportFactory.ExchangeExists(RabbitMqOptionsBuilder.DefaultDirectExchangeName), Is.False);
53-
Assert.That(RabbitMqTransportFactory.ExchangeExists(RabbitMqOptionsBuilder.DefaultTopicExchangeName), Is.False);
54-
Assert.That(RabbitMqTransportFactory.ExchangeExists(customDirectExchangeName), Is.True);
55-
Assert.That(RabbitMqTransportFactory.ExchangeExists(customTopicExchangeName), Is.True);
52+
Assert.That(await RabbitMqTransportFactory.ExchangeExists(RabbitMqOptionsBuilder.DefaultDirectExchangeName), Is.False);
53+
Assert.That(await RabbitMqTransportFactory.ExchangeExists(RabbitMqOptionsBuilder.DefaultTopicExchangeName), Is.False);
54+
Assert.That(await RabbitMqTransportFactory.ExchangeExists(customDirectExchangeName), Is.True);
55+
Assert.That(await RabbitMqTransportFactory.ExchangeExists(customTopicExchangeName), Is.True);
5656
}
5757

5858
[Test]

Rebus.RabbitMq.Tests/RabbitMqHeaderTest.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Rebus.Activation;
88
using Rebus.Bus;
99
using Rebus.Config;
10+
using Rebus.Exceptions;
1011
using Rebus.Extensions;
1112
using Rebus.Logging;
1213
using Rebus.Tests.Contracts;
@@ -22,7 +23,7 @@ public class RabbitMqHeaderTest : FixtureBase
2223

2324
protected override void SetUp()
2425
{
25-
RabbitMqTransportFactory.DeleteQueue(_noneExistingQueueName);
26+
RabbitMqTransportFactory.DeleteQueue(_noneExistingQueueName).GetAwaiter().GetResult();
2627
}
2728

2829
[Test]
@@ -61,7 +62,13 @@ void Callback(object sender, BasicReturnEventArgs eventArgs)
6162
};
6263

6364
var bus = StartOneWayClient(Callback);
64-
await bus.Advanced.Routing.Send(_noneExistingQueueName, "I have headers", headers);
65+
try
66+
{
67+
await bus.Advanced.Routing.Send(_noneExistingQueueName, "I have headers", headers);
68+
}
69+
catch (RebusApplicationException)
70+
{
71+
}
6572

6673
gotCallback.WaitOrDie(TimeSpan.FromSeconds(2));
6774

Rebus.RabbitMq.Tests/RabbitMqMandatoryDeliveryTest.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ public class RabbitMqMandatoryDeliveryTest : FixtureBase
2222

2323
protected override void SetUp()
2424
{
25-
RabbitMqTransportFactory.DeleteQueue(_noneExistingQueueName);
26-
RabbitMqTransportFactory.DeleteQueue(_mandatoryQueue);
25+
RabbitMqTransportFactory.DeleteQueue(_noneExistingQueueName).GetAwaiter().GetResult();
26+
RabbitMqTransportFactory.DeleteQueue(_mandatoryQueue).GetAwaiter().GetResult();
2727
}
2828

2929
[Test]
@@ -56,11 +56,14 @@ void Callback(object sender, BasicReturnEventArgs eventArgs)
5656

5757
var bus = StartOneWayClient(Callback);
5858

59-
await bus.Advanced.Routing.Send(_noneExistingQueueName, "I'm mandatory", new Dictionary<string, string>
59+
try
6060
{
61-
[RabbitMqHeaders.MessageId] = messageId.ToString(),
62-
[RabbitMqHeaders.Mandatory] = bool.TrueString,
63-
});
61+
await bus.Advanced.Routing.Send(_noneExistingQueueName, "I'm mandatory", new Dictionary<string, string>
62+
{
63+
[RabbitMqHeaders.MessageId] = messageId.ToString(),
64+
[RabbitMqHeaders.Mandatory] = bool.TrueString,
65+
});
66+
} catch(RebusApplicationException){}
6467

6568
gotCallback.WaitOrDie(TimeSpan.FromSeconds(2));
6669
}

Rebus.RabbitMq.Tests/RabbitMqPriorityQueueTest.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class RabbitMqPriorityQueueTest : FixtureBase
2424

2525
protected override void SetUp()
2626
{
27-
RabbitMqTransportFactory.DeleteQueue(_priorityQueueName);
27+
RabbitMqTransportFactory.DeleteQueue(_priorityQueueName).GetAwaiter().GetResult();
2828
}
2929

3030
[Test]
@@ -34,24 +34,24 @@ public void PriorityInputQueueCreate()
3434
Using(StartServer(_priorityQueueName, 10));
3535

3636
// Check if queues exists
37-
Assert.DoesNotThrow(() =>
37+
Assert.That(async () =>
3838
{
3939
var connectionFactory = new ConnectionFactory { Uri = new Uri(RabbitMqTransportFactory.ConnectionString) };
4040

41-
using var connection = connectionFactory.CreateConnection();
41+
await using var connection = await connectionFactory.CreateConnectionAsync();
4242

43-
using var model = connection.CreateModel();
43+
await using var model = await connection.CreateChannelAsync();
4444

4545
// Throws exception if queue paramters differ
46-
model.QueueDeclare(_priorityQueueName,
46+
await model.QueueDeclareAsync(_priorityQueueName,
4747
exclusive: false,
4848
durable: true,
4949
autoDelete: false,
5050
arguments: new Dictionary<string, object>
5151
{
5252
{"x-max-priority", 10}
5353
});
54-
});
54+
}, Throws.Nothing);
5555
}
5656

5757
[Test]
@@ -60,23 +60,23 @@ public void PriorityInputQueueCreateThrowsOnDifferentPriority()
6060
// Start a server with priority
6161
Using(StartServer(_priorityQueueName, 10));
6262

63-
Assert.Throws<OperationInterruptedException>(() =>
63+
Assert.That(async () =>
6464
{
6565
var connectionFactory = new ConnectionFactory { Uri = new Uri(RabbitMqTransportFactory.ConnectionString) };
6666

67-
using var connection = connectionFactory.CreateConnection();
67+
await using var connection = await connectionFactory.CreateConnectionAsync();
6868

69-
using var model = connection.CreateModel();
69+
await using var model = await connection.CreateChannelAsync();
7070

71-
model.QueueDeclare(_priorityQueueName,
71+
await model.QueueDeclareAsync(_priorityQueueName,
7272
exclusive: false,
7373
durable: true,
7474
autoDelete: false,
7575
arguments: new Dictionary<string, object>
7676
{
7777
{"x-max-priority", 1}
7878
});
79-
});
79+
}, Throws.TypeOf<OperationInterruptedException>());
8080
}
8181

8282
[Test]

Rebus.RabbitMq.Tests/RabbitMqPubSubTest.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ public class RabbitMqPubSubTest : FixtureBase
2222

2323
protected override void SetUp()
2424
{
25-
RabbitMqTransportFactory.DeleteQueue(_publisherQueueName);
26-
RabbitMqTransportFactory.DeleteQueue(_subscriber1QueueName);
27-
RabbitMqTransportFactory.DeleteQueue(_subscriber2QueueName);
25+
RabbitMqTransportFactory.DeleteQueue(_publisherQueueName).GetAwaiter().GetResult();
26+
RabbitMqTransportFactory.DeleteQueue(_subscriber1QueueName).GetAwaiter().GetResult();
27+
RabbitMqTransportFactory.DeleteQueue(_subscriber2QueueName).GetAwaiter().GetResult();
2828

2929
_publisher = GetBus(_publisherQueueName);
3030
}

Rebus.RabbitMq.Tests/RabbitMqQueueExistsTest.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using NUnit.Framework;
1+
using System.Threading.Tasks;
2+
using NUnit.Framework;
23
using Rebus.Activation;
34
using Rebus.Bus;
45
using Rebus.Config;
@@ -23,11 +24,12 @@ protected override void SetUp()
2324
.Start();
2425
}
2526

27+
2628
[Test]
27-
public void ThrowExceptionWhenQueueDoesNotExist()
29+
public async Task ThrowExceptionWhenQueueDoesNotExist()
2830
{
2931
var queueName = TestConfig.GetName("non-existing-queue");
30-
RabbitMqTransportFactory.DeleteQueue(queueName);
32+
await RabbitMqTransportFactory.DeleteQueue(queueName);
3133

3234
Assert.ThrowsAsync<RebusApplicationException>(() => _bus.Advanced.Routing.Send(queueName, "hej"));
3335
}

0 commit comments

Comments
 (0)