Skip to content
This repository was archived by the owner on Nov 17, 2023. It is now read-only.

Commit 167bb16

Browse files
authored
Merge pull request #987 from dotnet-architecture/fix/888-rabbitmq-message-processing-problem
Use AsyncEventingBasicConsumer in RabbitMQ
2 parents 3f7307a + 396d33f commit 167bb16

File tree

10 files changed

+51
-23
lines changed

10 files changed

+51
-23
lines changed

src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -178,27 +178,46 @@ private void StartBasicConsume()
178178
{
179179
if (_consumerChannel != null)
180180
{
181-
var consumer = new EventingBasicConsumer(_consumerChannel);
182-
consumer.Received += async (model, ea) =>
183-
{
184-
var eventName = ea.RoutingKey;
185-
var message = Encoding.UTF8.GetString(ea.Body);
186-
187-
await ProcessEvent(eventName, message);
181+
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
188182

189-
_consumerChannel.BasicAck(ea.DeliveryTag, multiple: false);
190-
};
183+
consumer.Received += Consumer_Received;
191184

192-
_consumerChannel.BasicConsume(queue: _queueName,
193-
autoAck: false,
194-
consumer: consumer);
185+
_consumerChannel.BasicConsume(
186+
queue: _queueName,
187+
autoAck: false,
188+
consumer: consumer);
195189
}
196190
else
197191
{
198-
_logger.LogError("StartBasicConsume can not call on _consumerChannelCreated == false");
192+
_logger.LogError("StartBasicConsume can't call on _consumerChannel == null");
199193
}
200194
}
201195

196+
private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
197+
{
198+
var eventName = eventArgs.RoutingKey;
199+
var message = Encoding.UTF8.GetString(eventArgs.Body);
200+
201+
try
202+
{
203+
if (message.ToLowerInvariant().Contains("throw-fake-exception"))
204+
{
205+
throw new InvalidOperationException($"Fake exception requested: \"{message}\"");
206+
}
207+
208+
await ProcessEvent(eventName, message);
209+
}
210+
catch (Exception ex)
211+
{
212+
_logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message);
213+
}
214+
215+
// Even on exception we take the message off the queue.
216+
// in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX).
217+
// For more information see: https://www.rabbitmq.com/dlx.html
218+
_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
219+
}
220+
202221
private IModel CreateConsumerChannel()
203222
{
204223
if (!_persistentConnection.IsConnected)
@@ -209,7 +228,7 @@ private IModel CreateConsumerChannel()
209228
var channel = _persistentConnection.CreateModel();
210229

211230
channel.ExchangeDeclare(exchange: BROKER_NAME,
212-
type: "direct");
231+
type: "direct");
213232

214233
channel.QueueDeclare(queue: _queueName,
215234
durable: true,

src/Services/Basket/Basket.API/Startup.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)
105105

106106
var factory = new ConnectionFactory()
107107
{
108-
HostName = Configuration["EventBusConnection"]
108+
HostName = Configuration["EventBusConnection"],
109+
DispatchConsumersAsync = true
109110
};
110111

111112
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))

src/Services/Catalog/Catalog.API/Startup.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@ public static IServiceCollection AddIntegrationServices(this IServiceCollection
297297

298298
var factory = new ConnectionFactory()
299299
{
300-
HostName = configuration["EventBusConnection"]
300+
HostName = configuration["EventBusConnection"],
301+
DispatchConsumersAsync = true
301302
};
302303

303304
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))

src/Services/Location/Locations.API/Startup.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)
7777

7878
var factory = new ConnectionFactory()
7979
{
80-
HostName = Configuration["EventBusConnection"]
80+
HostName = Configuration["EventBusConnection"],
81+
DispatchConsumersAsync = true
8182
};
8283

8384
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))

src/Services/Marketing/Marketing.API/Startup.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)
101101

102102
var factory = new ConnectionFactory()
103103
{
104-
HostName = Configuration["EventBusConnection"]
104+
HostName = Configuration["EventBusConnection"],
105+
DispatchConsumersAsync = true
105106
};
106107

107108
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))

src/Services/Ordering/Ordering.API/Startup.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,8 @@ public static IServiceCollection AddCustomIntegrations(this IServiceCollection s
305305

306306
var factory = new ConnectionFactory()
307307
{
308-
HostName = configuration["EventBusConnection"]
308+
HostName = configuration["EventBusConnection"],
309+
DispatchConsumersAsync = true
309310
};
310311

311312
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))

src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)
6868

6969
var factory = new ConnectionFactory()
7070
{
71-
HostName = Configuration["EventBusConnection"]
71+
HostName = Configuration["EventBusConnection"],
72+
DispatchConsumersAsync = true
7273
};
7374

7475
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))

src/Services/Ordering/Ordering.SignalrHub/Startup.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)
8080

8181
var factory = new ConnectionFactory()
8282
{
83-
HostName = Configuration["EventBusConnection"]
83+
HostName = Configuration["EventBusConnection"],
84+
DispatchConsumersAsync = true
8485
};
8586

8687
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))

src/Services/Payment/Payment.API/Startup.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)
5858
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
5959
var factory = new ConnectionFactory()
6060
{
61-
HostName = Configuration["EventBusConnection"]
61+
HostName = Configuration["EventBusConnection"],
62+
DispatchConsumersAsync = true
6263
};
6364

6465
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))

src/Services/Webhooks/Webhooks.API/Startup.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,8 @@ public static IServiceCollection AddIntegrationServices(this IServiceCollection
320320

321321
var factory = new ConnectionFactory()
322322
{
323-
HostName = configuration["EventBusConnection"]
323+
HostName = configuration["EventBusConnection"],
324+
DispatchConsumersAsync = true
324325
};
325326

326327
if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))

0 commit comments

Comments
 (0)