From 919df89f41d53c6460d92dfbd63c35c786edaf2d Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 13 Mar 2025 23:07:37 -0500 Subject: [PATCH 1/2] Akka.Remote: `Endpoint` actor cleanup Removing lots of TBDs and applying some refactoring suggestions --- src/core/Akka.Remote/Endpoint.cs | 359 ++++++++++--------------------- 1 file changed, 111 insertions(+), 248 deletions(-) diff --git a/src/core/Akka.Remote/Endpoint.cs b/src/core/Akka.Remote/Endpoint.cs index 99045039ebb..38b191fe4ce 100644 --- a/src/core/Akka.Remote/Endpoint.cs +++ b/src/core/Akka.Remote/Endpoint.cs @@ -1009,18 +1009,6 @@ private void TryPublish(RemotingLifecycleEvent ev) /// internal sealed class EndpointWriter : EndpointActor { - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public EndpointWriter( AkkaProtocolHandle handleOrActive, Address localAddress, @@ -1088,11 +1076,7 @@ public EndpointWriter( private readonly IRemoteMetrics _remoteMetrics; #region ActorBase methods - - /// - /// TBD - /// - /// TBD + protected override SupervisorStrategy SupervisorStrategy() { return new OneForOneStrategy(ex => @@ -1101,20 +1085,12 @@ protected override SupervisorStrategy SupervisorStrategy() return Directive.Escalate; }); } - - /// - /// TBD - /// - /// TBD - /// TBD + protected override void PostRestart(Exception reason) { throw new IllegalActorStateException("EndpointWriter must not be restarted"); } - - /// - /// TBD - /// + protected override void PreStart() { if (_handle == null) @@ -1142,10 +1118,7 @@ private async Task AssociateAsync() return new Status.Failure(e.InnerException ?? e); } } - - /// - /// TBD - /// + protected override void PostStop() { _ackIdleTimerCancelable.CancelIfNotNull(); @@ -1185,6 +1158,7 @@ private void Initializing() PublishAndThrow(new InvalidAssociation($"Association failed with {RemoteAddress}", LocalAddress, RemoteAddress, failure.Cause), LogLevel.WarningLevel); }); + Receive(handle => { // Assert handle == None? @@ -1341,7 +1315,7 @@ private IActorRef StartReadEndpoint(AkkaProtocolHandle handle) EndpointReader.ReaderProps(LocalAddress, RemoteAddress, Transport, Settings, _codec, _msgDispatcher, Inbound, (int)handle.HandshakeInfo.Uid, _receiveBuffers, _reliableDeliverySupervisor) .WithDeploy(Deploy.Local)), - string.Format("endpointReader-{0}-{1}", AddressUrlEncoder.Encode(RemoteAddress), _readerId.Next())); + $"endpointReader-{AddressUrlEncoder.Encode(RemoteAddress)}-{_readerId.Next()}"); Context.Watch(newReader); handle.ReadHandlerSource.SetResult(new ActorHandleEventListener(newReader)); return newReader; @@ -1431,10 +1405,9 @@ private void TrySendPureAck() private void EnqueueInBuffer(object message) { var send = message as EndpointManager.Send; - if (send != null && send.Message is IPriorityMessage) + if (send is { Message: IPriorityMessage }) _prioBuffer.AddLast(send); - else if (send != null && send.Message is ActorSelectionMessage actorSelectionMessage && - actorSelectionMessage.Message is IPriorityMessage) + else if (send is { Message: ActorSelectionMessage { Message: IPriorityMessage } }) { _prioBuffer.AddLast(send); } @@ -1469,6 +1442,10 @@ internal static string LogPossiblyWrappedMessageType(object failedMsg) { if (failedMsg is IWrappedMessage wrappedMessage) { + var builder = new StringBuilder(); + LogWrapped(builder, wrappedMessage); + return builder.ToString(); + static void LogWrapped(StringBuilder builder, IWrappedMessage nextMsg) { builder.Append($"{nextMsg.GetType()}-->"); @@ -1483,10 +1460,6 @@ static void LogWrapped(StringBuilder builder, IWrappedMessage nextMsg) builder.Append(nextMsg.Message.GetType()); } } - - var builder = new StringBuilder(); - LogWrapped(builder, wrappedMessage); - return builder.ToString(); } return failedMsg.GetType().ToString(); @@ -1565,50 +1538,6 @@ private bool WriteSend(EndpointManager.Send send) private void SendBufferedMessages() { - bool SendDelegate(object msg) - { - switch (msg) - { - case EndpointManager.Send s: - return WriteSend(s); - case FlushAndStop f: - DoFlushAndStop(); - return false; - case StopReading stop: - _reader?.Tell(stop, stop.ReplyTo); - return true; - default: - return true; - } - } - - bool WriteLoop(int count) - { - if (count > 0 && _buffer.Any()) - { - if (SendDelegate(_buffer.First.Value)) - { - _buffer.RemoveFirst(); - _writeCount += 1; - return WriteLoop(count - 1); - } - return false; - } - - return true; - } - - bool WritePrioLoop() - { - if (!_prioBuffer.Any()) return true; - if (WriteSend(_prioBuffer.First.Value)) - { - _prioBuffer.RemoveFirst(); - return WritePrioLoop(); - } - return false; - } - var size = _buffer.Count; var ok = WritePrioLoop() && WriteLoop(SendBufferBatchSize); @@ -1648,6 +1577,55 @@ bool WritePrioLoop() AdjustAdaptiveBackup(); ScheduleBackoffTimer(); + return; + + bool SendDelegate(object msg) + { + switch (msg) + { + case EndpointManager.Send s: + return WriteSend(s); + case FlushAndStop f: + DoFlushAndStop(); + return false; + case StopReading stop: + _reader?.Tell(stop, stop.ReplyTo); + return true; + default: + return true; + } + } + + bool WriteLoop(int count) + { + while (true) + { + if (count <= 0 || !_buffer.Any()) return true; + if (!SendDelegate(_buffer.First!.Value)) return false; + _buffer.RemoveFirst(); + _writeCount += 1; + count = count - 1; + continue; + + break; + } + } + + bool WritePrioLoop() + { + while (true) + { + if (!_prioBuffer.Any()) return true; + if (WriteSend(_prioBuffer.First!.Value)) + { + _prioBuffer.RemoveFirst(); + continue; + } + + return false; + break; + } + } } #endregion @@ -1660,20 +1638,7 @@ bool WritePrioLoop() private const long MaxAdaptiveBackoffNanos = 2000000L; // 2 ms private const long LogBufferSizeInterval = 5000000000L; // 5 s, in nanoseconds private const int MaxWriteCount = 50; - - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD + public static Props EndpointWriterProps(AkkaProtocolHandle handleOrActive, Address localAddress, Address remoteAddress, int? refuseUid, AkkaProtocolTransport transport, RemoteSettings settings, AkkaPduCodec codec, ConcurrentDictionary receiveBuffers, IActorRef reliableDeliverySupervisor = null) @@ -1696,72 +1661,43 @@ public sealed class TakeOver : INoSerializationVerificationNeeded /// Create a new TakeOver command /// /// The handle of the new association - /// TBD + /// The local actor to reply to once the takeover is complete public TakeOver(AkkaProtocolHandle protocolHandle, IActorRef replyTo) { ProtocolHandle = protocolHandle; ReplyTo = replyTo; } - - /// - /// TBD - /// - public AkkaProtocolHandle ProtocolHandle { get; private set; } - - /// - /// TBD - /// - public IActorRef ReplyTo { get; private set; } + + public AkkaProtocolHandle ProtocolHandle { get; } + + public IActorRef ReplyTo { get; } } - - /// - /// TBD - /// + public sealed class TookOver : INoSerializationVerificationNeeded { - /// - /// TBD - /// - /// TBD - /// TBD public TookOver(IActorRef writer, AkkaProtocolHandle protocolHandle) { ProtocolHandle = protocolHandle; Writer = writer; } - - /// - /// TBD - /// + public IActorRef Writer { get; private set; } - - /// - /// TBD - /// + public AkkaProtocolHandle ProtocolHandle { get; private set; } } - - /// - /// TBD - /// + public sealed class BackoffTimer { private BackoffTimer() { } public static BackoffTimer Instance { get; } = new(); } - - /// - /// TBD - /// + public sealed class FlushAndStop { private FlushAndStop() { } public static FlushAndStop Instance { get; } = new(); } - - /// - /// TBD - /// + public sealed class AckIdleCheckTimer { private AckIdleCheckTimer() { } @@ -1773,98 +1709,53 @@ private sealed class FlushAndStopTimeout private FlushAndStopTimeout() { } public static FlushAndStopTimeout Instance { get; } = new(); } - - /// - /// TBD - /// + public sealed class Handle : INoSerializationVerificationNeeded { - /// - /// TBD - /// - /// TBD public Handle(AkkaProtocolHandle protocolHandle) { ProtocolHandle = protocolHandle; } - - /// - /// TBD - /// - public AkkaProtocolHandle ProtocolHandle { get; private set; } + + public AkkaProtocolHandle ProtocolHandle { get; } } - - /// - /// TBD - /// + public sealed class StopReading { - /// - /// TBD - /// - /// TBD - /// TBD public StopReading(IActorRef writer, IActorRef replyTo) { Writer = writer; ReplyTo = replyTo; } - - /// - /// TBD - /// + public IActorRef Writer { get; private set; } - - /// - /// TBD - /// + public IActorRef ReplyTo { get; private set; } } - - /// - /// TBD - /// + public sealed class StoppedReading { - /// - /// TBD - /// - /// TBD public StoppedReading(IActorRef writer) { Writer = writer; } - - /// - /// TBD - /// + public IActorRef Writer { get; private set; } } - - /// - /// TBD - /// + public sealed class OutboundAck { - /// - /// TBD - /// - /// TBD public OutboundAck(Ack ack) { Ack = ack; } - - /// - /// TBD - /// + public Ack Ack { get; private set; } } private const string AckIdleTimerName = "AckIdleTimer"; #endregion - } /// @@ -1872,19 +1763,6 @@ public OutboundAck(Ack ack) /// internal sealed class EndpointReader : EndpointActor { - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD public EndpointReader( Address localAddress, Address remoteAddress, @@ -1919,10 +1797,7 @@ public EndpointReader( private AckedReceiveBuffer _ackedReceiveBuffer = new(); #region ActorBase overrides - - /// - /// TBD - /// + protected override void PreStart() { if (_receiveBuffers.TryGetValue(new EndpointManager.Link(LocalAddress, RemoteAddress), out var resendState)) @@ -1934,10 +1809,7 @@ protected override void PreStart() } } } - - /// - /// TBD - /// + protected override void PostStop() { SaveState(); @@ -1952,9 +1824,7 @@ private void Reading() if (payload.Length > Transport.MaximumPayloadBytes) { var reason = new OversizedPayloadException( - string.Format("Discarding oversized payload received: max allowed size {0} bytes, actual size {1} bytes.", - Transport.MaximumPayloadBytes, - payload.Length)); + $"Discarding oversized payload received: max allowed size {Transport.MaximumPayloadBytes} bytes, actual size {payload.Length} bytes."); _log.Error(reason, "Transient error while reading from association (association remains live)"); } else @@ -2028,33 +1898,40 @@ private void NotReading() private void SaveState() { - EndpointManager.ResendState Merge(EndpointManager.ResendState current, - EndpointManager.ResendState oldState) - { - if (current.Uid == oldState.Uid) return new EndpointManager.ResendState(_uid, oldState.Buffer.MergeFrom(current.Buffer)); - return current; - } + var k = new EndpointManager.Link(LocalAddress, RemoteAddress); + UpdateSavedState(k, _receiveBuffers.GetValueOrDefault(k)); + return; void UpdateSavedState(EndpointManager.Link key, EndpointManager.ResendState expectedState) { - if (expectedState == null) + while (true) { - if (!_receiveBuffers.TryAdd(key, new EndpointManager.ResendState(_uid, _ackedReceiveBuffer))) + if (expectedState == null) + { + if (!_receiveBuffers.TryAdd(key, new EndpointManager.ResendState(_uid, _ackedReceiveBuffer))) + { + _receiveBuffers.TryGetValue(key, out var prevValue); + expectedState = prevValue; + continue; + } + } + else if (!_receiveBuffers.TryUpdate(key, Merge(new EndpointManager.ResendState(_uid, _ackedReceiveBuffer), expectedState), expectedState)) { _receiveBuffers.TryGetValue(key, out var prevValue); - UpdateSavedState(key, prevValue); + expectedState = prevValue; + continue; } - } - else if (!_receiveBuffers.TryUpdate(key, - Merge(new EndpointManager.ResendState(_uid, _ackedReceiveBuffer), expectedState), expectedState)) - { - _receiveBuffers.TryGetValue(key, out var prevValue); - UpdateSavedState(key, prevValue); + + break; } } - var k = new EndpointManager.Link(LocalAddress, RemoteAddress); - UpdateSavedState(k, !_receiveBuffers.TryGetValue(k, out var previousValue) ? null : previousValue); + EndpointManager.ResendState Merge(EndpointManager.ResendState current, + EndpointManager.ResendState oldState) + { + if (current.Uid == oldState.Uid) return new EndpointManager.ResendState(_uid, oldState.Buffer.MergeFrom(current.Buffer)); + return current; + } } private void HandleDisassociated(DisassociateInfo info) @@ -2098,21 +1975,7 @@ private AckAndMessage TryDecodeMessageAndAck(ByteString pdu) #endregion #region Static members - - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD + public static Props ReaderProps( Address localAddress, Address remoteAddress, From ce604f276f494f77aa9d335bf669c1ff06b4d6a0 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 13 Mar 2025 23:15:55 -0500 Subject: [PATCH 2/2] removed unneeded control statements --- src/core/Akka.Remote/Endpoint.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/core/Akka.Remote/Endpoint.cs b/src/core/Akka.Remote/Endpoint.cs index 38b191fe4ce..c1d2461a487 100644 --- a/src/core/Akka.Remote/Endpoint.cs +++ b/src/core/Akka.Remote/Endpoint.cs @@ -1604,10 +1604,7 @@ bool WriteLoop(int count) if (!SendDelegate(_buffer.First!.Value)) return false; _buffer.RemoveFirst(); _writeCount += 1; - count = count - 1; - continue; - - break; + count -= 1; } }