From 7a270a19f2b9d53cddb83aaf093a676a143decb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Tue, 19 May 2020 23:15:02 +0000 Subject: [PATCH] Making sure AsyncEventHandlers are all run. Fixes issue #838. --- .../client/api/AsyncDefaultBasicConsumer.cs | 6 +----- .../client/events/AsyncEventHandler.cs | 16 +++++++++++++++- .../client/events/AsyncEventingBasicConsumer.cs | 10 +++++----- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs index 3d778736a3..be2d522d23 100644 --- a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs @@ -140,11 +140,7 @@ public virtual Task HandleModelShutdown(object model, ShutdownEventArgs reason) public virtual async Task OnCancel(params string[] consumerTags) { IsRunning = false; - foreach (AsyncEventHandler h in ConsumerCancelled?.GetInvocationList() ?? Array.Empty()) - { - await h(this, new ConsumerEventArgs(consumerTags)).ConfigureAwait(false); - } - + await ConsumerCancelled.InvokeAsync(this, new ConsumerEventArgs(consumerTags)).ConfigureAwait(false); foreach (string consumerTag in consumerTags) { _consumerTags.Remove(consumerTag); diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventHandler.cs b/projects/RabbitMQ.Client/client/events/AsyncEventHandler.cs index 7ab70fbefc..877779b15d 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventHandler.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventHandler.cs @@ -4,4 +4,18 @@ namespace RabbitMQ.Client.Events { public delegate Task AsyncEventHandler(object sender, TEvent @event) where TEvent : EventArgs; -} \ No newline at end of file + + internal static class AsyncEventHandlerExtensions + { + public static async Task InvokeAsync(this AsyncEventHandler eventHandler, object sender, TEvent @event) where TEvent : EventArgs + { + if(eventHandler != null) + { + foreach(AsyncEventHandler handlerInstance in eventHandler.GetInvocationList()) + { + await handlerInstance(sender, @event).ConfigureAwait(false); + } + } + } + } +} diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs index 59660a1226..b0e62ce611 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs @@ -34,28 +34,28 @@ public AsyncEventingBasicConsumer(IModel model) : base(model) public override async Task HandleBasicCancelOk(string consumerTag) { await base.HandleBasicCancelOk(consumerTag).ConfigureAwait(false); - await (Unregistered?.Invoke(this, new ConsumerEventArgs(new[] { consumerTag })) ?? Task.CompletedTask).ConfigureAwait(false); + await Unregistered.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag })).ConfigureAwait(false); } ///Fires when the server confirms successful consumer registration. public override async Task HandleBasicConsumeOk(string consumerTag) { await base.HandleBasicConsumeOk(consumerTag).ConfigureAwait(false); - await (Registered?.Invoke(this, new ConsumerEventArgs(new[] { consumerTag })) ?? Task.CompletedTask).ConfigureAwait(false); + await Registered.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag })).ConfigureAwait(false); } ///Fires the Received event. public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory body) { - await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); - await (Received?.Invoke(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)) ?? Task.CompletedTask).ConfigureAwait(false); + await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body).ConfigureAwait(false); + await Received.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)).ConfigureAwait(false); } ///Fires the Shutdown event. public override async Task HandleModelShutdown(object model, ShutdownEventArgs reason) { await base.HandleModelShutdown(model, reason).ConfigureAwait(false); - await (Shutdown?.Invoke(this, reason) ?? Task.CompletedTask).ConfigureAwait(false); + await Shutdown.InvokeAsync(this, reason).ConfigureAwait(false); } } }