diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs index fc4ad86254c..57ab5ca612d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs @@ -20,8 +20,16 @@ namespace Akka.Cluster.Sharding.Delivery.Internal; /// INTERNAL API /// /// The types of messages handled by the ConsumerController -internal class ShardingConsumerController : ReceiveActor, IWithStash +internal class ShardingConsumerController : ReceiveActor, IWithStash, IWithTimers { + private const string ShutdownTimeoutTimerKey = nameof(ShutdownTimeoutTimerKey); + + private sealed class ShutdownTimeout + { + public static readonly ShutdownTimeout Instance = new (); + private ShutdownTimeout() { } + } + public ShardingConsumerController(Func consumerProps, ShardingConsumerController.Settings settings) { @@ -115,7 +123,17 @@ private void Active() Receive(t => t.ActorRef.Equals(_consumer), _ => { _log.Debug("Consumer terminated."); - Context.Stop(Self); + + // Short-circuit shutdown process, just shut down immediately if there's nothing to clean. + if (ProducerControllers.Count == 0 && ConsumerControllers.Count == 0) + { + _log.Debug("ShardingConsumerController terminated."); + Context.Stop(Self); + } + else + { + Become(ShuttingDown()); + } }); Receive(t => @@ -166,6 +184,62 @@ private void Active() }); } + // Shutdown state after `_consumer` actor is downed. + private Action ShuttingDown() + { + // start a 3-seconds shutdown timeout timer + Timers.StartSingleTimer(ShutdownTimeoutTimerKey, ShutdownTimeout.Instance, TimeSpan.FromSeconds(3), Self); + + _log.Debug("Shutting down child controllers"); + + foreach (var p in ProducerControllers.Keys) + Context.Unwatch(p); + ProducerControllers = ImmutableDictionary.Empty; + + foreach (var c in ConsumerControllers.Values.Distinct()) + Context.Stop(c); + + return () => + { + Receive>(seqMsg => + { + var messageType = seqMsg.Message.Chunk.HasValue + ? $"Manifest: {seqMsg.Message.Chunk.Value.Manifest}, SerializerId: {seqMsg.Message.Chunk.Value.SerializerId}" + : seqMsg.Message.Message?.GetType().FullName ?? "Unknown type"; + _log.Warning("Message [{0}] from [{1}] is being ignored because ShardingConsumerController is shutting down.", messageType, seqMsg.ProducerId); + }); + + Receive(_ => + { + // We somehow could not terminate cleanly within 3 seconds, shutdown immediately + _log.Warning("ShardingConsumerController cleanup timed out, force terminating."); + Context.Stop(Self); + }); + + Receive(t => + { + var removeList = ConsumerControllers + .Where(kv => kv.Value.Equals(t.ActorRef)) + .Select(kv => kv.Key) + .ToArray(); + + if(removeList.Length > 0) + { + foreach (var key in removeList) + _log.Debug("ConsumerController for producerId [{0}] terminated.", key); + + ConsumerControllers = ConsumerControllers.RemoveRange(removeList); + } + + if (ProducerControllers.Count > 0 || ConsumerControllers.Count > 0) + return; + + _log.Debug("ShardingConsumerController terminated."); + Context.Stop(Self); + }); + }; + } + private ImmutableDictionary UpdatedProducerControllers(IActorRef producerController, string producer) { @@ -183,4 +257,5 @@ protected override void PreStart() } public IStash Stash { get; set; } = null!; + public ITimerScheduler Timers { get; set; } = null!; }