From 19aa5fef224a7ddea53fc46c4b5dcdb99476dd97 Mon Sep 17 00:00:00 2001 From: Peter Chapman Date: Thu, 9 Oct 2025 14:48:18 +1300 Subject: [PATCH] Allow the outbox delivery service to reconnect on database timeout --- .../Services/OutboxDeliveryService.cs | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/ServiceToolkit/src/SIL.ServiceToolkit/Services/OutboxDeliveryService.cs b/src/ServiceToolkit/src/SIL.ServiceToolkit/Services/OutboxDeliveryService.cs index 38b999c2..590294dc 100644 --- a/src/ServiceToolkit/src/SIL.ServiceToolkit/Services/OutboxDeliveryService.cs +++ b/src/ServiceToolkit/src/SIL.ServiceToolkit/Services/OutboxDeliveryService.cs @@ -22,12 +22,24 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) .ToDictionary(o => (o.OutboxId, o.Method)); await ProcessMessagesAsync(consumers, messages, stoppingToken); using ISubscription subscription = await messages.SubscribeAsync(e => true, stoppingToken); - while (true) + while (!stoppingToken.IsCancellationRequested) { - await subscription.WaitForChangeAsync(cancellationToken: stoppingToken); - if (stoppingToken.IsCancellationRequested) + try + { + await subscription.WaitForChangeAsync(cancellationToken: stoppingToken); + stoppingToken.ThrowIfCancellationRequested(); + await ProcessMessagesAsync(consumers, messages, stoppingToken); + } + catch (TimeoutException e) + { + _logger.LogWarning(e, "Change stream interrupted, trying again..."); + await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); + } + catch (OperationCanceledException e) + { + _logger.LogInformation(e, "Cancellation requested, service is stopping..."); break; - await ProcessMessagesAsync(consumers, messages, stoppingToken); + } } }