Skip to content

Commit 7af7c06

Browse files
authored
Improve bindings recovery (#49)
* improve bindings recovery. Remove the bindings when a queue or an exchange is removed --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 9d8c535 commit 7af7c06

File tree

8 files changed

+252
-29
lines changed

8 files changed

+252
-29
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AM
3131
- [x] Recovery consumers on connection lost
3232
- [x] Implement Environment to manage the connections
3333
- [x] Complete the consumer part with `pause` and `unpause`
34-
- [ ] Complete the binding/unbinding with the special characters
35-
- [ ] Complete the queues/exchanges name with the special characters
34+
- [x] Complete the binding/unbinding with the special characters
35+
- [x] Complete the queues/exchanges name with the special characters
3636
- [ ] Implement metrics ( See `System.Diagnostics.DiagnosticSource` [Link](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation) )
3737
- [x] Recovery exchanges on connection lost
3838
- [x] Recovery bindings on connection lost

RabbitMQ.AMQP.Client/IEntities.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public interface IExchangeSpecification : IEntitySpecification
152152
public interface IBindingSpecification
153153
{
154154
IBindingSpecification SourceExchange(IExchangeSpecification exchangeSpec);
155-
155+
156156
IBindingSpecification SourceExchange(string exchangeName);
157157
string SourceExchangeName();
158158

RabbitMQ.AMQP.Client/ITopologyListener.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,17 @@ public interface ITopologyListener
99
void ExchangeDeclared(IExchangeSpecification specification);
1010

1111
void ExchangeDeleted(string name);
12-
13-
12+
13+
1414
void BindingDeclared(IBindingSpecification specification);
15-
15+
1616
void BindingDeleted(string path);
1717

1818
void Clear();
1919

2020
int QueueCount();
2121

2222
int ExchangeCount();
23-
23+
2424
int BindingCount();
2525
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,8 +567,8 @@ await Management.Exchange(spec).DeclareAsync()
567567
}
568568
}
569569
}
570-
571-
570+
571+
572572
public async Task VisitBindingsAsync(IEnumerable<BindingSpec> bindingSpec)
573573
{
574574
// TODO this could be done in parallel

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public IBindingSpecification Binding()
106106
{
107107
return new AmqpBindingSpecification(this);
108108
}
109-
109+
110110
public IBindingSpecification Binding(BindingSpec spec)
111111
{
112112
return Binding()

RabbitMQ.AMQP.Client/Impl/RecordingTopologyListener.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,29 @@ public class RecordingTopologyListener : ITopologyListener
2424

2525
private readonly ConcurrentDictionary<string, BindingSpec> _bindingSpecifications = new();
2626

27+
private void RemoveBindingsSpecificationFromQueue(string queueName)
28+
{
29+
foreach (var binding in _bindingSpecifications.Values)
30+
{
31+
if (binding.DestinationQueue == queueName)
32+
{
33+
_bindingSpecifications.TryRemove(binding.Path, out _);
34+
}
35+
}
36+
}
37+
38+
private void RemoveBindingsSpecificationFromExchange(string exchangeName)
39+
{
40+
foreach (var binding in _bindingSpecifications.Values)
41+
{
42+
if (binding.SourceExchange == exchangeName
43+
|| binding.DestinationExchange == exchangeName)
44+
{
45+
_bindingSpecifications.TryRemove(binding.Path, out _);
46+
}
47+
}
48+
}
49+
2750

2851
public void QueueDeclared(IQueueSpecification specification)
2952
{
@@ -33,6 +56,7 @@ public void QueueDeclared(IQueueSpecification specification)
3356
public void QueueDeleted(string name)
3457
{
3558
_queueSpecifications.TryRemove(name, out _);
59+
RemoveBindingsSpecificationFromQueue(name);
3660
}
3761

3862
public void ExchangeDeclared(IExchangeSpecification specification)
@@ -43,6 +67,7 @@ public void ExchangeDeclared(IExchangeSpecification specification)
4367
public void ExchangeDeleted(string name)
4468
{
4569
_exchangeSpecifications.TryRemove(name, out _);
70+
RemoveBindingsSpecificationFromExchange(name);
4671
}
4772

4873
public void BindingDeclared(IBindingSpecification specification)

Tests/BindingsTests.cs

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public async Task SimpleBindingsBetweenExchangeAndQueue(string sourceExchange, s
4141

4242
await bindingSpec.UnbindAsync();
4343

44-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(sourceExchangeSpec, destinationQueueSpec);
44+
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(sourceExchangeSpec,
45+
destinationQueueSpec);
4546

4647
/*
4748
* TODO dispose assertions?
@@ -105,7 +106,12 @@ public async Task BindBetweenExchangeAndQueueTwoTimes()
105106
"[7][8][9] 他被广泛认为是理论计算机科学和人工智能之父。")]
106107
[InlineData("ήταν Άγγλος μαθηματικός, επιστήμονας υπολογιστών",
107108
"ήταν Άγγλος μαθηματικός, επιστήμονας", "επι")]
108-
public async Task SimpleBindingsBetweenExchangeAndExchange(string sourceExchangeName, string destinationExchangeName,
109+
[InlineData("(~~~!!++@~./.,€€#!!±§##§¶¡€#¢)",
110+
"~~~!!++@----.", "==`£!-=+")]
111+
112+
113+
public async Task SimpleBindingsBetweenExchangeAndExchange(string sourceExchangeName,
114+
string destinationExchangeName,
109115
string key)
110116
{
111117
Assert.NotNull(_connection);
@@ -127,11 +133,13 @@ await WhenAllComplete(
127133
await bindingSpecification.BindAsync();
128134

129135
await SystemUtils.WaitUntilExchangeExistsAsync(sourceExchangeSpec);
130-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(sourceExchangeSpec, destinationExchangeSpec);
136+
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(sourceExchangeSpec,
137+
destinationExchangeSpec);
131138

132139
await bindingSpecification.UnbindAsync();
133140

134-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(sourceExchangeSpec, destinationExchangeSpec);
141+
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(sourceExchangeSpec,
142+
destinationExchangeSpec);
135143

136144
await sourceExchangeSpec.DeleteAsync();
137145
await destinationExchangeSpec.DeleteAsync();
@@ -146,6 +154,7 @@ await WhenAllComplete(
146154
[InlineData("B", 10000L, "H", 0.0001)]
147155
[InlineData("是英国", 10000.32, "W", 3.0001)]
148156
[InlineData("是英国", "是英国23", "W", 3.0001)]
157+
[InlineData("(~~~!!++@----./.,€€#####§¶¡€#¢)", "~~~!!++@----", "==`£!-=+", "===£!-=+")]
149158
public async Task BindingsBetweenExchangeAndQueuesWithArgumentsDifferentValues(string key1, object value1,
150159
string key2, object value2)
151160
{
@@ -196,8 +205,7 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync
196205
[Theory]
197206
[InlineData("my_source_exchange_multi_123", "my_destination_789", "myKey")]
198207
[InlineData("是英国v_", "destination_是英国v_", "μαθηματικός")]
199-
// TODO: to validate. Atm it seems there is a server side problem
200-
// [InlineData("(~~~!!++@----./.,€€#####§¶¡€#¢)", ",,~~~!!++@----./.,€€#####§¶¡€#¢@@@", "===£!-=+")]
208+
[InlineData("(~~~!!++@----./.,€€#####§¶¡€#¢)", ",,~~~!!++@----./.,€€#####§¶¡€#¢@@@", "===£!-=+")]
201209
public async Task MultiBindingsBetweenExchangeAndQueuesWithArgumentsDifferentValues(string source,
202210
string destination, string key)
203211
{
@@ -210,7 +218,6 @@ public async Task MultiBindingsBetweenExchangeAndQueuesWithArgumentsDifferentVal
210218
await WhenAllComplete(exchangeSpec.DeclareAsync(), queueSpec.DeclareAsync());
211219

212220
// add 10 bindings to have a list of bindings to find
213-
var bindingSpecs = new List<IBindingSpecification>();
214221
var bindingSpecTasks = new List<Task>();
215222
for (int i = 0; i < 10; i++)
216223
{
@@ -219,31 +226,35 @@ public async Task MultiBindingsBetweenExchangeAndQueuesWithArgumentsDifferentVal
219226
.DestinationQueue(queueSpec)
220227
.Key(key) // single key to use different args
221228
.Arguments(new Dictionary<string, object>() { { $"是英国v_{i}", $"p_{i}" } });
222-
bindingSpecs.Add(bindingSpec);
223229
bindingSpecTasks.Add(bindingSpec.BindAsync());
224230
}
231+
225232
await WhenAllComplete(bindingSpecTasks);
226233
bindingSpecTasks.Clear();
227234

228-
var specialBindArgs = new Dictionary<string, object>() { { $"v_8", $"p_8" }, { $"v_1", 1 }, { $"v_r", 1000L }, };
235+
var specialBindArgs =
236+
new Dictionary<string, object>() { { $"v_8", $"p_8" }, { $"v_1", 1 }, { $"v_r", 1000L }, };
229237
IBindingSpecification specialBindSpec = _management.Binding()
230238
.SourceExchange(exchangeSpec)
231239
.DestinationQueue(queueSpec)
232240
.Key(key) // single key to use different args
233241
.Arguments(specialBindArgs);
234242
await specialBindSpec.BindAsync();
235243

236-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec, specialBindArgs);
244+
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
245+
specialBindArgs);
237246

238247
await specialBindSpec.UnbindAsync();
239248

240-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec, specialBindArgs);
249+
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
250+
specialBindArgs);
241251

242252
for (int i = 0; i < 10; i++)
243253
{
244254
var bindArgs = new Dictionary<string, object>() { { $"是英国v_{i}", $"p_{i}" } };
245255

246-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec, bindArgs);
256+
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
257+
bindArgs);
247258

248259
await _management.Binding()
249260
.SourceExchange(exchangeSpec)
@@ -252,14 +263,11 @@ await _management.Binding()
252263
.Arguments(bindArgs)
253264
.UnbindAsync();
254265

255-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec, bindArgs);
266+
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
267+
bindArgs);
256268
}
257-
258-
/*
259-
* NB: test DisposeAsync will do this.
260-
await _management.ExchangeDeletion().Delete(source);
261-
await _management.QueueDeletion().Delete(destination);
269+
await exchangeSpec.DeleteAsync();
270+
await queueSpec.DeleteAsync();
262271
await _connection.CloseAsync();
263-
*/
264272
}
265273
}

0 commit comments

Comments
 (0)