diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 779a8153cac..36bf7bcfab8 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -2269,6 +2269,7 @@ namespace Akka.Configuration public virtual System.Collections.Generic.IList GetBooleanList(string path) { } public virtual System.Collections.Generic.IList GetByteList(string path) { } public virtual System.Nullable GetByteSize(string path) { } + [return: System.Diagnostics.CodeAnalysis.NotNullIfNotNullAttribute("def")] public virtual System.Nullable GetByteSize(string path, System.Nullable def = null) { } public virtual Akka.Configuration.Config GetConfig(string path) { } public virtual decimal GetDecimal(string path, [System.Runtime.CompilerServices.DecimalConstantAttribute(0, 0, 0u, 0u, 0u)] decimal @default) { } @@ -4001,6 +4002,8 @@ namespace Akka.IO public System.Net.EndPoint LocalAddress { get; } public System.Collections.Generic.IEnumerable Options { get; } public bool PullMode { get; } + [System.Runtime.CompilerServices.NullableAttribute(2)] + public Akka.IO.TcpSettings TcpSettings { get; set; } public override string ToString() { } } public sealed class Bound : Akka.IO.Tcp.Event @@ -4043,6 +4046,7 @@ namespace Akka.IO public sealed class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable, System.Collections.IEnumerable { public CompoundWrite(Akka.IO.Tcp.SimpleWriteCommand head, Akka.IO.Tcp.WriteCommand tailCommand) { } + public override long Bytes { get; } public Akka.IO.Tcp.SimpleWriteCommand Head { get; } public Akka.IO.Tcp.WriteCommand TailCommand { get; } public System.Collections.Generic.IEnumerator GetEnumerator() { } @@ -4065,6 +4069,8 @@ namespace Akka.IO public System.Collections.Generic.IEnumerable Options { get; } public bool PullMode { get; } public System.Net.EndPoint RemoteAddress { get; } + [System.Runtime.CompilerServices.NullableAttribute(2)] + public Akka.IO.TcpSettings TcpSettings { get; set; } public System.Nullable Timeout { get; } public override string ToString() { } } @@ -4184,6 +4190,7 @@ namespace Akka.IO { public static readonly Akka.IO.Tcp.Write Empty; public override Akka.IO.Tcp.Event Ack { get; } + public override long Bytes { get; } public Akka.IO.ByteString Data { get; } public static Akka.IO.Tcp.Write Create(Akka.IO.ByteString data) { } public static Akka.IO.Tcp.Write Create(Akka.IO.ByteString data, Akka.IO.Tcp.Event ack) { } @@ -4192,6 +4199,7 @@ namespace Akka.IO public abstract class WriteCommand : Akka.IO.Tcp.Command { protected WriteCommand() { } + public abstract long Bytes { get; } public static Akka.IO.Tcp.WriteCommand Create(System.Collections.Generic.IEnumerable writes) { } public static Akka.IO.Tcp.WriteCommand Create(params WriteCommand[] writes) { } public Akka.IO.Tcp.CompoundWrite Prepend(Akka.IO.Tcp.SimpleWriteCommand other) { } @@ -4205,7 +4213,6 @@ namespace Akka.IO public sealed class TcpExt : Akka.IO.IOExtension { public TcpExt(Akka.Actor.ExtendedActorSystem system) { } - public Akka.IO.Buffers.IBufferPool BufferPool { get; } public override Akka.Actor.IActorRef Manager { get; } public Akka.IO.TcpSettings Settings { get; } } @@ -4231,21 +4238,31 @@ namespace Akka.IO public static Akka.IO.Tcp.Command Unbind() { } public static Akka.IO.Tcp.Command Write(Akka.IO.ByteString data, Akka.IO.Tcp.Event ack = null) { } } - public class TcpSettings + public sealed class TcpSettings : System.IEquatable { + [System.ObsoleteAttribute("Many of these options are no longer used. Use the TcpSettings.Create method inste" + + "ad.")] public TcpSettings(string bufferPoolConfigPath, int initialSocketAsyncEventArgs, bool traceLogging, int batchAcceptLimit, System.Nullable registerTimeout, int receivedMessageSizeLimit, string managementDispatcher, string fileIoDispatcher, int transferToLimit, int finishConnectRetries, bool outgoingSocketForceIpv4, int writeCommandsQueueMaxSize) { } - public int BatchAcceptLimit { get; } + public int BatchAcceptLimit { get; set; } + [System.ObsoleteAttribute("This property is unused")] public string BufferPoolConfigPath { get; } + [System.ObsoleteAttribute("This property is unused")] public string FileIODispatcher { get; } - public int FinishConnectRetries { get; } + public int FinishConnectRetries { get; set; } + [System.ObsoleteAttribute("This property is unused")] public int InitialSocketAsyncEventArgs { get; } public string ManagementDispatcher { get; } - public bool OutgoingSocketForceIpv4 { get; } - public int ReceivedMessageSizeLimit { get; } - public System.Nullable RegisterTimeout { get; } - public bool TraceLogging { get; } + public int MaxFrameSizeBytes { get; set; } + public bool OutgoingSocketForceIpv4 { get; set; } + public int ReceiveBufferSize { get; set; } + [System.ObsoleteAttribute("This property is now MaxFrameSizeBytes")] + public long ReceivedMessageSizeLimit { get; } + public System.Nullable RegisterTimeout { get; set; } + public int SendBufferSize { get; set; } + public bool TraceLogging { get; set; } + [System.ObsoleteAttribute("This property is unused")] public int TransferToLimit { get; set; } - public int WriteCommandsQueueMaxSize { get; } + public int WriteCommandsQueueMaxSize { get; set; } public static Akka.IO.TcpSettings Create(Akka.Actor.ActorSystem system) { } public static Akka.IO.TcpSettings Create(Akka.Configuration.Config config) { } } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 52408af9289..7c2892fc6ce 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -3991,6 +3991,8 @@ namespace Akka.IO public System.Net.EndPoint LocalAddress { get; } public System.Collections.Generic.IEnumerable Options { get; } public bool PullMode { get; } + [System.Runtime.CompilerServices.NullableAttribute(2)] + public Akka.IO.TcpSettings TcpSettings { get; set; } public override string ToString() { } } public sealed class Bound : Akka.IO.Tcp.Event @@ -4033,6 +4035,7 @@ namespace Akka.IO public sealed class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable, System.Collections.IEnumerable { public CompoundWrite(Akka.IO.Tcp.SimpleWriteCommand head, Akka.IO.Tcp.WriteCommand tailCommand) { } + public override long Bytes { get; } public Akka.IO.Tcp.SimpleWriteCommand Head { get; } public Akka.IO.Tcp.WriteCommand TailCommand { get; } public System.Collections.Generic.IEnumerator GetEnumerator() { } @@ -4055,6 +4058,8 @@ namespace Akka.IO public System.Collections.Generic.IEnumerable Options { get; } public bool PullMode { get; } public System.Net.EndPoint RemoteAddress { get; } + [System.Runtime.CompilerServices.NullableAttribute(2)] + public Akka.IO.TcpSettings TcpSettings { get; set; } public System.Nullable Timeout { get; } public override string ToString() { } } @@ -4174,6 +4179,7 @@ namespace Akka.IO { public static readonly Akka.IO.Tcp.Write Empty; public override Akka.IO.Tcp.Event Ack { get; } + public override long Bytes { get; } public Akka.IO.ByteString Data { get; } public static Akka.IO.Tcp.Write Create(Akka.IO.ByteString data) { } public static Akka.IO.Tcp.Write Create(Akka.IO.ByteString data, Akka.IO.Tcp.Event ack) { } @@ -4182,6 +4188,7 @@ namespace Akka.IO public abstract class WriteCommand : Akka.IO.Tcp.Command { protected WriteCommand() { } + public abstract long Bytes { get; } public static Akka.IO.Tcp.WriteCommand Create(System.Collections.Generic.IEnumerable writes) { } public static Akka.IO.Tcp.WriteCommand Create(params WriteCommand[] writes) { } public Akka.IO.Tcp.CompoundWrite Prepend(Akka.IO.Tcp.SimpleWriteCommand other) { } @@ -4195,7 +4202,6 @@ namespace Akka.IO public sealed class TcpExt : Akka.IO.IOExtension { public TcpExt(Akka.Actor.ExtendedActorSystem system) { } - public Akka.IO.Buffers.IBufferPool BufferPool { get; } public override Akka.Actor.IActorRef Manager { get; } public Akka.IO.TcpSettings Settings { get; } } @@ -4221,21 +4227,31 @@ namespace Akka.IO public static Akka.IO.Tcp.Command Unbind() { } public static Akka.IO.Tcp.Command Write(Akka.IO.ByteString data, Akka.IO.Tcp.Event ack = null) { } } - public class TcpSettings + public sealed class TcpSettings : System.IEquatable { + [System.ObsoleteAttribute("Many of these options are no longer used. Use the TcpSettings.Create method inste" + + "ad.")] public TcpSettings(string bufferPoolConfigPath, int initialSocketAsyncEventArgs, bool traceLogging, int batchAcceptLimit, System.Nullable registerTimeout, int receivedMessageSizeLimit, string managementDispatcher, string fileIoDispatcher, int transferToLimit, int finishConnectRetries, bool outgoingSocketForceIpv4, int writeCommandsQueueMaxSize) { } - public int BatchAcceptLimit { get; } + public int BatchAcceptLimit { get; set; } + [System.ObsoleteAttribute("This property is unused")] public string BufferPoolConfigPath { get; } + [System.ObsoleteAttribute("This property is unused")] public string FileIODispatcher { get; } - public int FinishConnectRetries { get; } + public int FinishConnectRetries { get; set; } + [System.ObsoleteAttribute("This property is unused")] public int InitialSocketAsyncEventArgs { get; } public string ManagementDispatcher { get; } - public bool OutgoingSocketForceIpv4 { get; } - public int ReceivedMessageSizeLimit { get; } - public System.Nullable RegisterTimeout { get; } - public bool TraceLogging { get; } + public int MaxFrameSizeBytes { get; set; } + public bool OutgoingSocketForceIpv4 { get; set; } + public int ReceiveBufferSize { get; set; } + [System.ObsoleteAttribute("This property is now MaxFrameSizeBytes")] + public long ReceivedMessageSizeLimit { get; } + public System.Nullable RegisterTimeout { get; set; } + public int SendBufferSize { get; set; } + public bool TraceLogging { get; set; } + [System.ObsoleteAttribute("This property is unused")] public int TransferToLimit { get; set; } - public int WriteCommandsQueueMaxSize { get; } + public int WriteCommandsQueueMaxSize { get; set; } public static Akka.IO.TcpSettings Create(Akka.Actor.ActorSystem system) { } public static Akka.IO.TcpSettings Create(Akka.Configuration.Config config) { } } diff --git a/src/core/Akka.Streams.Tests/IO/TcpHelper.cs b/src/core/Akka.Streams.Tests/IO/TcpHelper.cs index 3467ecbd144..714d86be9e9 100644 --- a/src/core/Akka.Streams.Tests/IO/TcpHelper.cs +++ b/src/core/Akka.Streams.Tests/IO/TcpHelper.cs @@ -330,9 +330,9 @@ await _connectionProbe.FishForMessageAsync( public async Task ExpectTerminatedAsync(CancellationToken cancellationToken = default) { - _connectionProbe.Watch(_connectionActor); + await _connectionProbe.WatchAsync(_connectionActor); await _connectionProbe.ExpectTerminatedAsync(_connectionActor, cancellationToken: cancellationToken); - _connectionProbe.Unwatch(_connectionActor); + await _connectionProbe.UnwatchAsync(_connectionActor); } } diff --git a/src/core/Akka.Streams.Tests/IO/TcpSpec.cs b/src/core/Akka.Streams.Tests/IO/TcpSpec.cs index b70e75f04f8..2c3b66b8626 100644 --- a/src/core/Akka.Streams.Tests/IO/TcpSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/TcpSpec.cs @@ -1,4 +1,4 @@ -//----------------------------------------------------------------------- +//----------------------------------------------------------------------- // // Copyright (C) 2009-2022 Lightbend Inc. // Copyright (C) 2013-2025 .NET Foundation @@ -29,9 +29,11 @@ namespace Akka.Streams.Tests.IO { public class TcpSpec : TcpHelper { - public TcpSpec(ITestOutputHelper helper) : base(@" -akka.loglevel = DEBUG -akka.stream.materializer.subscription-timeout.timeout = 2s", helper) + public TcpSpec(ITestOutputHelper helper) : base(""" + akka.io.tcp.trace-logging = on + akka.loglevel = DEBUG + akka.stream.materializer.subscription-timeout.timeout = 2s + """, helper) { } @@ -40,7 +42,7 @@ public async Task Outgoing_TCP_stream_must_work_in_the_happy_case() { await this.AssertAllStagesStoppedAsync(async () => { - var testData = ByteString.FromBytes(new byte[] {1, 2, 3, 4, 5}); + var testData = ByteString.FromBytes([1, 2, 3, 4, 5]); var server = await new Server(this).InitializeAsync(); var tcpReadProbe = new TcpReadProbe(this); @@ -65,7 +67,7 @@ await ValidateServerClientCommunicationAsync(testData, serverConnection, tcpRead public async Task Outgoing_TCP_stream_must_be_able_to_write_a_sequence_of_ByteStrings() { var server = await new Server(this).InitializeAsync(); - var testInput = Enumerable.Range(0, 256).Select(i => ByteString.FromBytes(new byte[] {Convert.ToByte(i)})); + var testInput = Enumerable.Range(0, 256).Select(i => ByteString.FromBytes([Convert.ToByte(i)])); var expectedOutput = ByteString.FromBytes(Enumerable.Range(0, 256).Select(Convert.ToByte).ToArray()); Source.From(testInput) @@ -86,7 +88,7 @@ public async Task Outgoing_TCP_stream_must_be_able_to_read_a_sequence_of_ByteStr var testOutput = new byte[255]; for (byte i = 0; i < 255; i++) { - testInput[i] = ByteString.FromBytes(new [] {i}); + testInput[i] = ByteString.FromBytes([i]); testOutput[i] = i; } @@ -103,6 +105,7 @@ public async Task Outgoing_TCP_stream_must_be_able_to_read_a_sequence_of_ByteStr serverConnection.Write(input); serverConnection.ConfirmedClose(); + // Reduced timeout - otherwise we're just waiting longer for the failure var result = await resultFuture.ShouldCompleteWithin(3.Seconds()); result.ShouldBe(expectedOutput); } @@ -132,7 +135,7 @@ public async Task Outgoing_TCP_stream_must_work_when_client_closes_write_then_re { await this.AssertAllStagesStoppedAsync(async () => { - var testData = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5 }); + var testData = ByteString.FromBytes([1, 2, 3, 4, 5]); var server = await new Server(this).InitializeAsync(); var tcpWriteProbe = new TcpWriteProbe(this); @@ -466,7 +469,7 @@ public async Task Outgoing_TCP_stream_must_Echo_should_work_even_if_server_is_in .Run(Materializer); var result = await Source.From(Enumerable.Repeat(0, 1000) - .Select(i => ByteString.FromBytes(new byte[] { Convert.ToByte(i) }))) + .Select(i => ByteString.FromBytes([Convert.ToByte(i)]))) .Via(Sys.TcpStream().OutgoingConnection(serverAddress, halfClose: true)) .RunAggregate(0, (i, s) => i + s.Count, Materializer).ShouldCompleteWithin(10.Seconds()); @@ -503,7 +506,7 @@ await WithinAsync(TimeSpan.FromSeconds(15), async () => await AwaitAssertAsync(async () => { // Getting rid of existing connection actors by using a blunt instrument - system2.ActorSelection(system2.Tcp().Path / "$a" / "*").Tell(Kill.Instance); + system2.ActorSelection(system2.Tcp().Path / "tcp-client-connection-*").Tell(Kill.Instance); await result.ShouldCompleteWithin(3.Seconds()); }, interval:TimeSpan.FromSeconds(4)); @@ -525,6 +528,7 @@ await AwaitAssertAsync(async () => public async Task Outgoing_TCP_stream_must_not_thrown_on_unbind_after_system_has_been_shut_down() { var sys2 = ActorSystem.Create("shutdown-test-system", Sys.Settings.Config); + InitializeLogger(sys2); try { @@ -573,7 +577,7 @@ public async Task Tcp_listen_stream_must_be_able_to_implement_echo() var binding = await bindTask.ShouldCompleteWithin(3.Seconds()); var testInput = Enumerable.Range(0, 255) - .Select(i => ByteString.FromBytes(new byte[] { Convert.ToByte(i) })) + .Select(i => ByteString.FromBytes([Convert.ToByte(i)])) .ToList(); var expectedOutput = testInput.Aggregate(ByteString.Empty, (agg, b) => agg.Concat(b)); @@ -603,7 +607,7 @@ public async Task Tcp_listen_stream_must_work_with_a_chain_of_echoes() var echoConnection = Sys.TcpStream().OutgoingConnection(serverAddress); var testInput = Enumerable.Range(0, 255) - .Select(i => ByteString.FromBytes(new byte[] { Convert.ToByte(i) })) + .Select(i => ByteString.FromBytes([Convert.ToByte(i)])) .ToList(); var expectedOutput = testInput.Aggregate(ByteString.Empty, (agg, b) => agg.Concat(b)); diff --git a/src/core/Akka.Streams/Dsl/Tcp.cs b/src/core/Akka.Streams/Dsl/Tcp.cs index c60e239a226..7b4df572688 100644 --- a/src/core/Akka.Streams/Dsl/Tcp.cs +++ b/src/core/Akka.Streams/Dsl/Tcp.cs @@ -180,13 +180,14 @@ public TcpExt(ExtendedActorSystem system) /// TBD /// TBD /// TBD + // TODO: this really needs to be an async method public Source> Bind(string host, int port, int backlog = 100, IImmutableList options = null, bool halfClose = false, TimeSpan? idleTimeout = null) { // DnsEndpoint isn't allowed var ipAddresses = System.Net.Dns.GetHostAddressesAsync(host).Result; if (ipAddresses.Length == 0) - throw new ArgumentException($"Couldn't resolve IpAdress for host {host}", nameof(host)); + throw new ArgumentException($"Couldn't resolve IpAddress for host {host}", nameof(host)); return Source.FromGraph(new ConnectionSourceStage(_system.Tcp(), new IPEndPoint(ipAddresses[0], port), backlog, options, halfClose, idleTimeout, BindShutdownTimeout)); diff --git a/src/core/Akka.Streams/Implementation/IO/TcpStages.cs b/src/core/Akka.Streams/Implementation/IO/TcpStages.cs index 2b41f1f99a3..dd2eef48325 100644 --- a/src/core/Akka.Streams/Implementation/IO/TcpStages.cs +++ b/src/core/Akka.Streams/Implementation/IO/TcpStages.cs @@ -6,7 +6,9 @@ //----------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Collections.Immutable; +using System.Linq; using System.Net; using System.Threading.Tasks; using Akka.Actor; @@ -41,6 +43,7 @@ private sealed class ConnectionSourceStageLogic : TimerGraphStageLogic, IOutHand private readonly TaskCompletionSource _bindingPromise; private readonly TaskCompletionSource _unbindPromise = new(); private bool _unbindStarted = false; + private readonly Queue _pendingConnections = new(); public ConnectionSourceStageLogic(Shape shape, ConnectionSourceStage stage, TaskCompletionSource bindingPromise) : base(shape) @@ -53,8 +56,16 @@ public ConnectionSourceStageLogic(Shape shape, ConnectionSourceStage stage, Task public void OnPull() { - // Ignore if still binding - _listener?.Tell(new Tcp.ResumeAccepting(1), StageActor.Ref); + TryPush(); + } + + private void TryPush() + { + if (!IsAvailable(_stage._out)) return; // we have demand and can push + if (_pendingConnections.Count <= 0) return; + + var toPush = _pendingConnections.Dequeue(); + Push(_stage._out, toPush); } public void OnDownstreamFinish(Exception cause) @@ -160,7 +171,8 @@ private void Receive((IActorRef, object) args) break; case Tcp.Connected connected: - Push(_stage._out, ConnectionFor(connected, sender)); + _pendingConnections.Enqueue(ConnectionFor(connected, sender)); + TryPush(); break; case Tcp.Unbind _: @@ -424,7 +436,10 @@ public TcpStreamLogic(FlowShape shape, ITcpRole role, En _bytesOut = shape.Outlet; _readHandler = new LambdaOutHandler( - onPull: () => _connection.Tell(Tcp.ResumeReading.Instance, StageActor.Ref), + onPull: () => + { + _connection.Tell(Tcp.ResumeReading.Instance, StageActor.Ref); + }, onDownstreamFinish: cause => { if (cause is SubscriptionWithCancelException.NonFailureCancellation) @@ -549,22 +564,39 @@ private void Connected((IActorRef, object) args) { var msg = args.Item2; - if (msg is Terminated) FailStage(new StreamTcpException("The connection actor has terminated. Stopping now.")); - else if (msg is Tcp.CommandFailed failed) FailStage(new StreamTcpException($"Tcp command {failed.Cmd} failed")); - else if (msg is Tcp.ErrorClosed closed) FailStage(new StreamTcpException($"The connection closed with error: {closed.Cause}")); - else if (msg is Tcp.Aborted) FailStage(new StreamTcpException("The connection has been aborted")); - else if (msg is Tcp.Closed) CompleteStage(); - else if (msg is Tcp.ConfirmedClosed) CompleteStage(); - else if (msg is Tcp.PeerClosed) Complete(_bytesOut); - else if (msg is Tcp.Received received) + switch (msg) { // Keep on reading even when closed. There is no "close-read-side" in TCP - if (IsClosed(_bytesOut)) _connection.Tell(Tcp.ResumeReading.Instance, StageActor.Ref); - else Push(_bytesOut, received.Data); - } - else if (msg is WriteAck) - { - if (!IsClosed(_bytesIn)) Pull(_bytesIn); + case Tcp.Received received when IsClosed(_bytesOut): + _connection.Tell(Tcp.ResumeReading.Instance, StageActor.Ref); + break; + case Tcp.Received received: + Push(_bytesOut, received.Data); + break; + case WriteAck: + { + if (!IsClosed(_bytesIn)) Pull(_bytesIn); + break; + } + case Terminated: + FailStage(new StreamTcpException("The connection actor has terminated. Stopping now.")); + break; + case Tcp.CommandFailed failed: + FailStage(new StreamTcpException($"Tcp command {failed.Cmd} failed")); + break; + case Tcp.ErrorClosed closed: + FailStage(new StreamTcpException($"The connection closed with error: {closed.Cause}")); + break; + case Tcp.Aborted: + FailStage(new StreamTcpException("The connection has been aborted")); + break; + case Tcp.Closed: + case Tcp.ConfirmedClosed: + CompleteStage(); + break; + case Tcp.PeerClosed: + Complete(_bytesOut); + break; } } } diff --git a/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs b/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs index 79e49fa4bba..c36e447aa86 100644 --- a/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs @@ -1,4 +1,4 @@ -//----------------------------------------------------------------------- +//----------------------------------------------------------------------- // // Copyright (C) 2009-2022 Lightbend Inc. // Copyright (C) 2013-2025 .NET Foundation @@ -51,7 +51,7 @@ public TcpIntegrationSpec(ITestOutputHelper output) private async Task VerifyActorTermination(IActorRef actor) { - Watch(actor); + await WatchAsync(actor); await ExpectTerminatedAsync(actor); } @@ -61,7 +61,7 @@ public async Task The_TCP_transport_implementation_should_properly_bind_a_test_s await new TestSetup(this).RunAsync(async _ => await Task.CompletedTask); } - [Fact(Skip="FIXME .net core / linux")] + [Fact] public async Task The_TCP_transport_implementation_should_allow_connecting_to_and_disconnecting_from_the_test_server() { await new TestSetup(this).RunAsync(async x => @@ -76,7 +76,7 @@ public async Task The_TCP_transport_implementation_should_allow_connecting_to_an }); } - [Fact(Skip="FIXME .net core / linux")] + [Fact] public async Task The_TCP_transport_implementation_should_properly_handle_connection_abort_from_client_side() { await new TestSetup(this).RunAsync(async x => @@ -90,7 +90,7 @@ public async Task The_TCP_transport_implementation_should_properly_handle_connec }); } - [Fact(Skip="FIXME .net core / linux")] + [Fact] public async Task The_TCP_transport_implementation_should_properly_handle_connection_abort_from_client_side_after_chit_chat() { await new TestSetup(this).RunAsync(async x => @@ -114,7 +114,7 @@ public async Task The_TCP_transport_implementation_should_properly_handle_connec var actors = await x.EstablishNewClientConnectionAsync(); actors.ClientHandler.Send(actors.ClientConnection, PoisonPill.Instance); await VerifyActorTermination(actors.ClientConnection); - + await actors.ServerHandler.ExpectMsgAsync(); await VerifyActorTermination(actors.ServerConnection); }); @@ -171,11 +171,6 @@ public async Task The_TCP_transport_implementation_should_properly_handle_connec [Theory] public async Task The_TCP_transport_implementation_should_properly_support_connecting_to_DNS_endpoints(AddressFamily family) { - // Aaronontheweb, 9/2/2017 - POSIX-based OSES are still having trouble with IPV6 DNS resolution - if(!RuntimeInformation - .IsOSPlatform(OSPlatform.Windows) && family == AddressFamily.InterNetworkV6) - return; - var serverHandler = CreateTestProbe(); var bindCommander = CreateTestProbe(); bindCommander.Send(Sys.Tcp(), new Tcp.Bind(serverHandler.Ref, new IPEndPoint(family == AddressFamily.InterNetwork ? IPAddress.Loopback @@ -196,10 +191,10 @@ public async Task The_TCP_transport_implementation_should_properly_support_conne var testData = ByteString.FromString(str); clientEp.Tell(Tcp.Write.Create(testData, Ack.Instance), clientHandler); await clientHandler.ExpectMsgAsync(); - var received = await serverHandler.ReceiveWhileAsync(o => - { - return o as Tcp.Received; - }, RemainingOrDefault, TimeSpan.FromSeconds(0.5)).ToListAsync(); + var received = await serverHandler + .ReceiveWhileAsync(o => o as Tcp.Received, + RemainingOrDefault, + TimeSpan.FromSeconds(0.5)).ToListAsync(); received.Sum(s => s.Data.Count).Should().Be(testData.Count); } @@ -362,12 +357,12 @@ public async Task When_multiple_concurrent_writing_clients_All_acks_should_be_re // Setup multiple clients var actors = await x.EstablishNewClientConnectionAsync(); - // Each client sends his index to server + // Each client sends their index to server var indexRange = Enumerable.Range(0, clientsCount).ToList(); var clients = indexRange.Select(i => (Index: i, Probe: CreateTestProbe($"test-client-{i}"))).ToArray(); Parallel.ForEach(clients, client => { - var msg = ByteString.FromBytes(new byte[] {(byte) 0}); + var msg = ByteString.FromBytes([0]); client.Probe.Send(actors.ClientConnection, Tcp.Write.Create(msg, AckWithValue.Create(client.Index))); }); @@ -389,7 +384,7 @@ public async Task When_multiple_writing_clients_Should_receive_messages_in_order // Setup multiple clients var actors = await x.EstablishNewClientConnectionAsync(); - // Each client sends his index to server + // Each client sends their index to server var clients = Enumerable.Range(0, clientsCount).Select(i => (Index: i, Probe: CreateTestProbe($"test-client-{i}"))).ToArray(); var contentBuilder = new StringBuilder(); clients.ForEach(client => @@ -406,7 +401,7 @@ public async Task When_multiple_writing_clients_Should_receive_messages_in_order }); } - [Fact] + [Fact] public async Task Should_fail_writing_when_buffer_is_filled() { await new TestSetup(this).RunAsync(async x => @@ -421,22 +416,22 @@ public async Task Should_fail_writing_when_buffer_is_filled() await AwaitAssertAsync(async () => { // try sending overflow - actors.ClientHandler.Send(actors.ClientConnection, Tcp.Write.Create(overflowData)); // this is sent immidiately - actors.ClientHandler.Send(actors.ClientConnection, Tcp.Write.Create(overflowData)); // this will try to buffer - await actors.ClientHandler.ExpectMsgAsync(TimeSpan.FromSeconds(20)); + actors.ClientHandler.Send(actors.ClientConnection, Tcp.Write.Create(goodData)); // this is sent immediately + actors.ClientHandler.Send(actors.ClientConnection, Tcp.Write.Create(overflowData)); // this will fail + await actors.ClientHandler.ExpectMsgAsync(); - // First overflow data will be received anyway + // First message will go through, second one will not (await actors.ServerHandler.ReceiveWhileAsync(TimeSpan.FromSeconds(1), m => m as Tcp.Received).ToListAsync()) .Sum(m => m.Data.Count) - .Should().Be(InternalConnectionActorMaxQueueSize + 1); + .Should().Be(InternalConnectionActorMaxQueueSize); // Check that almost-overflow size does not cause any problems - actors.ClientHandler.Send(actors.ClientConnection, Tcp.ResumeWriting.Instance); // Recover after send failure + //actors.ClientHandler.Send(actors.ClientConnection, Tcp.ResumeWriting.Instance); // Recover after send failure actors.ClientHandler.Send(actors.ClientConnection, Tcp.Write.Create(goodData)); (await actors.ServerHandler.ReceiveWhileAsync(TimeSpan.FromSeconds(1), m => m as Tcp.Received).ToListAsync()) .Sum(m => m.Data.Count) .Should().Be(InternalConnectionActorMaxQueueSize); - }, TimeSpan.FromSeconds(30 * 3), TimeSpan.FromSeconds(5)); // 3 attempts by ~25 seconds + 5 sec pause + }, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(3)); // 3 attempts by ~25 seconds + 5 sec pause }); } @@ -469,17 +464,18 @@ public async Task The_TCP_transport_implementation_should_properly_complete_one_ [Fact] public async Task The_TCP_transport_implementation_should_support_waiting_for_writes_with_backpressure() { + var transmittedBytes = InternalConnectionActorMaxQueueSize; await new TestSetup(this).RunAsync(async x => { - x.BindOptions = new[] {new Inet.SO.SendBufferSize(1024)}; - x.ConnectOptions = new[] {new Inet.SO.SendBufferSize(1024)}; + x.BindOptions = [new Inet.SO.SendBufferSize(1024)]; + x.ConnectOptions = [new Inet.SO.SendBufferSize(1024)]; var actors = await x.EstablishNewClientConnectionAsync(); - actors.ServerHandler.Send(actors.ServerConnection, Tcp.Write.Create(ByteString.FromBytes(new byte[100000]), Ack.Instance)); + actors.ServerHandler.Send(actors.ServerConnection, Tcp.Write.Create(ByteString.FromBytes(new byte[transmittedBytes]), Ack.Instance)); await actors.ServerHandler.ExpectMsgAsync(Ack.Instance); - await x.ExpectReceivedDataAsync(actors.ClientHandler, 100000); + await x.ExpectReceivedDataAsync(actors.ClientHandler, transmittedBytes); }); } @@ -545,8 +541,8 @@ await AwaitConditionNoThrowAsync(() => private async Task ChitChat(TestSetup.ConnectionDetail actors, int rounds = 100) { - var testData = ByteString.FromBytes(new byte[] {(byte) 0}); - for (int i = 0; i < rounds; i++) + var testData = ByteString.FromBytes([0]); + for (var i = 0; i < rounds; i++) { actors.ClientHandler.Send(actors.ClientConnection, Tcp.Write.Create(testData)); await actors.ServerHandler.ExpectMsgAsync(x => x.Data.Count == 1 && x.Data[0] == 0, hint: $"server didn't received at {i} round"); @@ -561,11 +557,13 @@ class TestSetup private readonly bool _shouldBindServer; private readonly TestProbe _bindHandler; private IPEndPoint _endpoint; + private readonly TcpSettings _settings; - public TestSetup(AkkaSpec spec, bool shouldBindServer = true) + public TestSetup(AkkaSpec spec, bool shouldBindServer = true, TcpSettings? settings = null) { - BindOptions = Enumerable.Empty(); - ConnectOptions = Enumerable.Empty(); + _settings = settings ?? TcpSettings.Create(spec.Sys); + BindOptions = []; + ConnectOptions = []; _spec = spec; _shouldBindServer = shouldBindServer; _bindHandler = _spec.CreateTestProbe("bind-handler-probe"); @@ -574,14 +572,15 @@ public TestSetup(AkkaSpec spec, bool shouldBindServer = true) public async Task BindServer() { var bindCommander = _spec.CreateTestProbe(); - bindCommander.Send(_spec.Sys.Tcp(), new Tcp.Bind(_bindHandler.Ref, new IPEndPoint(IPAddress.Loopback, 0), options: BindOptions)); + bindCommander.Send(_spec.Sys.Tcp(), + new Tcp.Bind(_bindHandler.Ref, new IPEndPoint(IPAddress.Loopback, 0), options: BindOptions){ TcpSettings = _settings}); await bindCommander.ExpectMsgAsync(bound => _endpoint = (IPEndPoint) bound.LocalAddress); } - public async Task EstablishNewClientConnectionAsync(bool registerClientHandler = true) + public async Task EstablishNewClientConnectionAsync(bool registerClientHandler = true, TcpSettings? settings = null) { var connectCommander = _spec.CreateTestProbe("connect-commander-probe"); - connectCommander.Send(_spec.Sys.Tcp(), new Tcp.Connect(_endpoint, options: ConnectOptions)); + connectCommander.Send(_spec.Sys.Tcp(), new Tcp.Connect(_endpoint, options: ConnectOptions){ TcpSettings = settings ?? _settings}); await connectCommander.ExpectMsgAsync(); var clientHandler = _spec.CreateTestProbe("client-handler-probe"); @@ -611,10 +610,16 @@ public class ConnectionDetail public async Task ExpectReceivedDataAsync(TestProbe handler, int remaining) { - if (remaining > 0) + while (true) { - var recv = await handler.ExpectMsgAsync(); - await ExpectReceivedDataAsync(handler, remaining - recv.Data.Count); + if (remaining > 0) + { + var recv = await handler.ExpectMsgAsync(); + remaining = remaining - recv.Data.Count; + continue; + } + + break; } } diff --git a/src/core/Akka.Tests/IO/TcpSettingsSpec.cs b/src/core/Akka.Tests/IO/TcpSettingsSpec.cs index b9b55059787..e256707dd18 100644 --- a/src/core/Akka.Tests/IO/TcpSettingsSpec.cs +++ b/src/core/Akka.Tests/IO/TcpSettingsSpec.cs @@ -25,18 +25,16 @@ public void TcpSettings_should_parse_all_akka_io_tcp_config_values_correctly() var settings = TcpSettings.Create(tcpConfig); // Assert: all values match akka.conf reference - settings.BufferPoolConfigPath.Should().Be("akka.io.tcp.disabled-buffer-pool"); - settings.InitialSocketAsyncEventArgs.Should().Be(32); settings.TraceLogging.Should().BeFalse(); settings.BatchAcceptLimit.Should().Be(Environment.ProcessorCount * 2); settings.RegisterTimeout.Should().Be(TimeSpan.FromSeconds(5)); - settings.ReceivedMessageSizeLimit.Should().Be(int.MaxValue); settings.ManagementDispatcher.Should().Be("akka.actor.internal-dispatcher"); - settings.FileIODispatcher.Should().Be("akka.actor.default-blocking-io-dispatcher"); - settings.TransferToLimit.Should().Be(524288); // 512 KiB settings.FinishConnectRetries.Should().Be(5); settings.OutgoingSocketForceIpv4.Should().BeFalse(); settings.WriteCommandsQueueMaxSize.Should().Be(-1); + settings.SendBufferSize.Should().Be(8192); + settings.ReceiveBufferSize.Should().Be(8192); + settings.MaxFrameSizeBytes.Should().Be(4096); } } } \ No newline at end of file diff --git a/src/core/Akka/Configuration/Config.cs b/src/core/Akka/Configuration/Config.cs index 9141a781012..78d77d1e5b0 100644 --- a/src/core/Akka/Configuration/Config.cs +++ b/src/core/Akka/Configuration/Config.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Linq; using Akka.Configuration.Hocon; using Akka.Util.Internal; @@ -149,6 +150,7 @@ public virtual bool GetBoolean(string path, bool @default = false) /// Default return value if none provided. /// This exception is thrown if the current node is undefined. /// The long value defined in the specified path. + [return: NotNullIfNotNull(nameof(def))] public virtual long? GetByteSize(string path, long? def = null) { HoconValue value = GetNode(path); diff --git a/src/core/Akka/Configuration/akka.conf b/src/core/Akka/Configuration/akka.conf index 602d7c8d4ef..df5704d8f94 100644 --- a/src/core/Akka/Configuration/akka.conf +++ b/src/core/Akka/Configuration/akka.conf @@ -773,57 +773,33 @@ akka { # its commander before aborting the connection. register-timeout = 5s - # The maximum number of bytes delivered by a `Received` message. Before - # more data is read from the network the connection actor will try to - # do other work. - # The purpose of this setting is to impose a smaller limit than the - # configured receive buffer size. When using value 'unlimited' it will - # try to read all from the receive buffer. - max-received-message-size = unlimited + # The maximum size of a message that can be flushed onto the wire at once. + maximum-frame-size = 4k + + # The size of the system send buffer. Should be at least 2x the maximum + # frame size. The default value is 8kB, which is the default value for + # the underlying Socket.SendBufferSize property on Windows. + send-buffer-size = 8k + + # The size of the system receive buffer. Should be at least 2x the maximum + # frame size. The default value is 8kB, which is the default value for + # the underlying Socket.ReceiveBufferSize property on Windows. + receive-buffer-size = 8k # Enable fine grained logging of what goes on inside the implementation. # Be aware that this may log more than once per message sent to the actors # of the tcp implementation. trace-logging = off - # Fully qualified config path which holds the dispatcher configuration - # to be used for running the select() calls in the selectors - selector-dispatcher = "akka.io.pinned-dispatcher" - - # Fully qualified config path which holds the dispatcher configuration - # for the read/write worker actors - worker-dispatcher = "akka.actor.internal-dispatcher" - # Fully qualified config path which holds the dispatcher configuration # for the selector management actors management-dispatcher = "akka.actor.internal-dispatcher" - # Fully qualified config path which holds the dispatcher configuration - # on which file IO tasks are scheduled - file-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" - - # The maximum number of bytes (or "unlimited") to transfer in one batch - # when using `WriteFile` command which uses `FileChannel.transferTo` to - # pipe files to a TCP socket. On some OS like Linux `FileChannel.transferTo` - # may block for a long time when network IO is faster than file IO. - # Decreasing the value may improve fairness while increasing may improve - # throughput. - file-io-transferTo-limit = 524288 # 512 KiB - # The number of times to retry the `finishConnect` call after being notified about # OP_CONNECT. Retries are needed if the OP_CONNECT notification doesn't imply that # `finishConnect` will succeed, which is the case on Android. finish-connect-retries = 5 - # On Windows connection aborts are not reliably detected unless an OP_READ is - # registered on the selector _after_ the connection has been reset. This - # workaround enables an OP_CONNECT which forces the abort to be visible on Windows. - # Enabling this setting on other platforms than Windows will cause various failures - # and undefined behavior. - # Possible values of this key are on, off and auto where auto will enable the - # workaround if Windows is detected automatically. - windows-connection-abort-workaround-enabled = off - # Enforce outgoing socket connection to use IPv4 address family. Required in # scenario when IPv6 is not available, for example in Azure Web App sandbox. # When set to true it is required to set akka.io.dns.inet-address.use-ipv6 to false diff --git a/src/core/Akka/IO/Tcp.cs b/src/core/Akka/IO/Tcp.cs index 4e12196408d..6b5a1cc3b46 100644 --- a/src/core/Akka/IO/Tcp.cs +++ b/src/core/Akka/IO/Tcp.cs @@ -50,25 +50,7 @@ public override TcpExt CreateExtension(ExtendedActorSystem system) internal abstract class SocketCompleted : INoSerializationVerificationNeeded, IDeadLetterSuppression { } - - internal sealed class SocketSent : SocketCompleted - { - public static readonly SocketSent Instance = new(); - private SocketSent() { } - } - - internal sealed class SocketReceived : SocketCompleted - { - public static readonly SocketReceived Instance = new(); - private SocketReceived() { } - } - - internal sealed class SocketAccepted : SocketCompleted - { - public static readonly SocketAccepted Instance = new(); - private SocketAccepted() { } - } - + internal sealed class SocketConnected : SocketCompleted { public static readonly SocketConnected Instance = new(); @@ -182,6 +164,17 @@ public Connect(EndPoint remoteAddress, public TimeSpan? Timeout { get; } public bool PullMode { get; } + + /// + /// Optional - allows you to specify TCP settings for the connection. + /// + /// Otherwise, the system defaults will be used. + /// + /// + /// var tcpSettings = TcpSettings.Create(ActorSystem); + /// var tcpSettingsWithDifferentBufferSizes = tcpSettings with { SendBufferSize = 8192, ReceiveBufferSize = 8192 }; + /// + public TcpSettings? TcpSettings { get; set; } public override string ToString() => $"Connect(remote: {RemoteAddress}, local: {LocalAddress}, timeout: {Timeout}, pullMode: {PullMode})"; @@ -227,6 +220,17 @@ public Bind(IActorRef handler, public IEnumerable Options { get; } public bool PullMode { get; } + + /// + /// Optional - allows you to specify TCP settings for the connection. + /// + /// Otherwise, the system defaults will be used. + /// + /// + /// var tcpSettings = TcpSettings.Create(ActorSystem); + /// var tcpSettingsWithDifferentBufferSizes = tcpSettings with { SendBufferSize = 8192, ReceiveBufferSize = 8192 }; + /// + public TcpSettings? TcpSettings { get; set; } public override string ToString() => $"Bind(addr: {LocalAddress}, handler: {Handler}, backlog: {Backlog}, pullMode: {PullMode})"; @@ -265,7 +269,7 @@ public override string ToString() => } /// - /// In order to close down a listening socket, send this message to that socket’s + /// To close down a listening socket, send this message to that socket’s /// actor (that is the actor which previously had sent the message). The /// listener socket actor will reply with a message. /// @@ -378,6 +382,11 @@ public CompoundWrite Prepend(SimpleWriteCommand other) { return new CompoundWrite(other, this); } + + /// + /// The number of bytes that will be written to the socket. + /// + public abstract long Bytes { get; } /// /// Prepend a group of writes before this one. @@ -489,6 +498,8 @@ public static Write Create(ByteString data, Event ack) { return new Write(data, ack); } + + public override long Bytes => Data.Count; } /// @@ -540,6 +551,8 @@ private IEnumerable Enumerable() public override string ToString() => $"CompoundWrite({Head}, {TailCommand})"; + + public override long Bytes => Head.Bytes + TailCommand.Bytes; } /// @@ -900,14 +913,7 @@ public TcpExt(ExtendedActorSystem system) : this(system, TcpSettings.Create(syst internal TcpExt(ExtendedActorSystem system, TcpSettings settings) { - var bufferPoolConfig = system.Settings.Config.GetConfig(settings.BufferPoolConfigPath); - - if (bufferPoolConfig.IsNullOrEmpty()) - throw new ConfigurationException($"Cannot retrieve TCP buffer pool configuration: {settings.BufferPoolConfigPath} configuration node not found"); - Settings = settings; - FileIoDispatcher = system.Dispatchers.Lookup(Settings.FileIODispatcher); - BufferPool = CreateBufferPool(system, bufferPoolConfig); Manager = system.SystemActorOf( props: Props.Create(() => new TcpManager(this)).WithDispatcher(Settings.ManagementDispatcher).WithDeploy(Deploy.Local), name: "IO-TCP"); @@ -917,43 +923,11 @@ internal TcpExt(ExtendedActorSystem system, TcpSettings settings) /// Gets reference to a TCP manager actor. /// public override IActorRef Manager { get; } - - /// - /// A buffer pool used by current plugin. - /// - public IBufferPool BufferPool { get; } - + /// /// The settings used by this extension. /// public TcpSettings Settings { get; } - - /// - /// TBD - /// - internal MessageDispatcher FileIoDispatcher { get; } - - private IBufferPool CreateBufferPool(ExtendedActorSystem system, Config config) - { - if (config.IsNullOrEmpty()) - throw ConfigurationException.NullOrEmptyConfig(); - - var type = Type.GetType(config.GetString("class", null), true); - - if (!typeof(IBufferPool).IsAssignableFrom(type)) - throw new ArgumentException($"Buffer pool of type {type} doesn't implement {nameof(IBufferPool)} interface"); - - try - { - // try to construct via `BufferPool(ExtendedActorSystem, Config)` ctor - return (IBufferPool)Activator.CreateInstance(type, system, config); - } - catch - { - // try to construct via `BufferPool(ExtendedActorSystem)` ctor - return (IBufferPool)Activator.CreateInstance(type, system); - } - } } /// diff --git a/src/core/Akka/IO/TcpConnection.cs b/src/core/Akka/IO/TcpConnection.cs index 6171989a699..280c5f274d5 100644 --- a/src/core/Akka/IO/TcpConnection.cs +++ b/src/core/Akka/IO/TcpConnection.cs @@ -6,26 +6,50 @@ //----------------------------------------------------------------------- using System; -using System.Collections.Concurrent; +using System.Buffers; using System.Collections.Generic; using System.Collections.Immutable; using System.IO; using System.Linq; using System.Net.Sockets; -using System.Runtime.CompilerServices; using Akka.Actor; using Akka.Dispatch; using Akka.Event; -using Akka.IO.Buffers; using Akka.Pattern; -using Akka.Util; -using Akka.Util.Internal; + +#nullable enable namespace Akka.IO { using static Akka.IO.Tcp; using ByteBuffer = ArraySegment; + // A **green‑field** rewrite of the connection actor, distilled to + // • 4 stable phases (Connecting ▸ AwaitRegistration ▸ Open ▸ HalfOpen) + // • 8 booleans that fully describe the transient aspects of the socket. + // • single immutable record `ConnState` passed by value. + // • all close logic in one method (TryStop). + // + // ┌───────────────────────── ASCII *phase* diagram ─────────────────────────┐ + // │ │ + // │ (socket.ConnectAsync) │ + // │ +-----------+ Connected +---------------+ │ + // │ |Connecting |──────────────►|AwaitReg |──Register────────────+│ + // │ +-----------+ +-------┬-------+ │ + // │ │ │ + // │ writes/reads ▼ │ + // │ +-----------+ Close +------+ │ + // │ | Open |────────►|Closed| │ + // │ +----┬------+ +------+ │ + // │ │ ConfirmedClose │ + // │ ▼ │ + // │ +-----------+ FIN↑ +------+ │ + // │ | HalfOpen |────────►|Closed| │ + // │ +-----------+ +------+ │ + // │ │ + // └─────────────────────────────────────────────────────────────────────────┘ + + /// /// INTERNAL API: Base class for TcpIncomingConnection and TcpOutgoingConnection. /// @@ -46,375 +70,174 @@ namespace Akka.IO /// Similar approach can be found on other networking libraries (i.e. System.IO.Pipelines and EventStore). /// Both buffers and are pooled to reduce GC pressure. /// - internal abstract class TcpConnection : ActorBase, IRequiresMessageQueue + internal abstract class TcpConnection : ReceiveActor, IRequiresMessageQueue { - [Flags] - enum ConnectionStatus + /// + /// Immutable flags – reference to the live Queue + byte counter **and any deferred half‑close**. + /// Moving every transient flag in here lets us reason over shutdown with a single value. + /// + private readonly record struct ConnState( + bool IsReceiving, + bool IsSending, + bool PeerClosed, + bool OutputShutdown, + bool ReadingSuspended, + bool WritingSuspended, + bool KeepOpenOnPeerClosed, + Queue<(Write Cmd, IActorRef Snd)> Queue, + int QueuedBytes) { - /// - /// Marks that connection has invoked Socket.ReceiveAsync and that - /// are currently trying to receive data. - /// - Receiving = 1, - - /// - /// Marks that connection has invoked Socket.SendAsync and that - /// are currently sending data. It's important as - /// will throw exception if another socket operations will - /// be called over it as it's performing send request. For that reason we cannot release send args - /// back to pool if it's sending (another connection actor could acquire that buffer and try to - /// use it while it's sending the data). - /// - Sending = 1 << 1, - - /// - /// Marks that current connection has suspended reading. - /// - ReadingSuspended = 1 << 2, - - /// - /// Marks that current connection has suspended writing. - /// - WritingSuspended = 1 << 3, - - /// - /// Marks that current connection has been requested for shutdown. It may not occur immediatelly, - /// i.e. because underlying is actually sending the data. - /// - ShutdownRequested = 1 << 4 - } - - private ConnectionStatus _status; - protected readonly TcpExt Tcp; - protected readonly Socket Socket; - protected SocketAsyncEventArgs ReceiveArgs; - protected SocketAsyncEventArgs SendArgs; - - protected readonly ILoggingAdapter Log = Context.GetLogger(); - private readonly bool _pullMode; - private readonly PendingSimpleWritesQueue _writeCommandsQueue; - private readonly bool _traceLogging; + public bool HasPending => IsSending || Queue.Count != 0; + public bool CanSend => !OutputShutdown && !WritingSuspended; + public bool CanReceive => !PeerClosed && !ReadingSuspended; - private bool _isOutputShutdown; - - private readonly ConcurrentQueue<(IActorRef Commander, object Ack)> _pendingAcks = new(); - private bool _peerClosed; - private IActorRef _interestedInResume; - private CloseInformation _closedMessage; // for ConnectionClosed message in postStop - - private IActorRef _watchedActor = Context.System.DeadLetters; - - private readonly IOException droppingWriteBecauseWritingIsSuspendedException = new("Dropping write because writing is suspended"); + public static ConnState Initial(Queue<(Write Cmd, IActorRef Snd)> q) => + new(false, false, false, false, true, true, false, q, 0); + } - private readonly IOException droppingWriteBecauseQueueIsFullException = new("Dropping write because queue is full"); + #region Ack‑aware SAEA - protected TcpConnection(TcpExt tcp, Socket socket, bool pullMode, Option writeCommandsBufferMaxSize) + private sealed class AckSocketAsyncEventArgs : SocketAsyncEventArgs, INoSerializationVerificationNeeded, + IDeadLetterSuppression { - if (socket == null) throw new ArgumentNullException(nameof(socket)); + public readonly List<(IActorRef Commander, object Ack)> PendingAcks = new(8); + public void ClearAcks() => PendingAcks.Clear(); + } - _pullMode = pullMode; - _writeCommandsQueue = new PendingSimpleWritesQueue(Log, writeCommandsBufferMaxSize); - _traceLogging = tcp.Settings.TraceLogging; + private sealed class ReadSocketAsyncEventArgs : SocketAsyncEventArgs, INoSerializationVerificationNeeded, + IDeadLetterSuppression; - Tcp = tcp; - Socket = socket; + private class CommanderDied : IDeadLetterSuppression + { + public static readonly CommanderDied Instance = new(); - if (pullMode) SetStatus(ConnectionStatus.ReadingSuspended); + private CommanderDied() + { + } } - - private bool IsWritePending + private class HandlerDied : IDeadLetterSuppression { - [MethodImpl(MethodImplOptions.AggressiveInlining)] - get { return !HasStatus(ConnectionStatus.Sending) && !_writeCommandsQueue.IsEmpty; } + public static readonly HandlerDied Instance = new(); + + private HandlerDied() + { + } } - private Option GetAllowedPendingWrite() => IsWritePending ? GetNextWrite() : Option.None; + #endregion - protected void SignDeathPact(IActorRef actor) - { - UnsignDeathPact(); - _watchedActor = actor; - Context.Watch(actor); - } + protected readonly TcpSettings Settings; + protected readonly Socket Socket; + protected ILoggingAdapter Log { get; } = Context.GetLogger(); - protected void UnsignDeathPact() - { - if (!ReferenceEquals(_watchedActor, Context.System.DeadLetters)) Context.Unwatch(_watchedActor); - } + private readonly ArrayPool _bufferPool = ArrayPool.Shared; - // STATES + private readonly Queue<(Write Cmd, IActorRef Sender)> _pendingWrites; + private readonly byte[] _receiveBuffer; + private readonly ReadSocketAsyncEventArgs _receiveArgs; + private readonly AckSocketAsyncEventArgs _sendArgs; + + private readonly int _maxQueuedBytes; - /// - /// Connection established, waiting for registration from user handler. - /// - private Receive WaitingForRegistration(IActorRef commander) - { - return message => - { - switch (message) - { - case Register register: - // up to this point we've been watching the commander, - // but since registration is now complete we only need to watch the handler from here on - if (!Equals(register.Handler, commander)) - SignDeathPact(register.Handler); // will unsign death pact with commander automatically + private ConnState _state; - if (_traceLogging) Log.Debug("[{0}] registered as connection handler", register.Handler); + private readonly bool _traceLogging; - var registerInfo = new ConnectionInfo(register.Handler, register.KeepOpenOnPeerClosed, register.UseResumeWriting); + // used by Akka.Streams + private readonly bool _pullMode; - Context.SetReceiveTimeout(null); - Context.Become(Connected(registerInfo)); + private IActorRef? _commander; + private IActorRef? _handler; + private CloseInformation? _closeInformation; - // if we are in push mode or already have resumed reading in pullMode while waiting for Register then read - if (!_pullMode || !HasStatus(ConnectionStatus.ReadingSuspended)) ResumeReading(); + private static readonly IOException DroppingWriteBecauseClosingException = + new("Dropping write because the connection is closing"); - // If there is something buffered before we got Register message - put it all to the socket - var bufferedWrite = GetNextWrite(); - if (bufferedWrite.HasValue) - { - SetStatus(ConnectionStatus.Sending); - DoWrite(registerInfo, bufferedWrite.Value); - } + private static readonly IOException DroppingWriteBecauseWritingIsSuspendedException = + new("Dropping write because writing is suspended"); - return true; - case ResumeReading _: ClearStatus(ConnectionStatus.ReadingSuspended); return true; - case SuspendReading _: SetStatus(ConnectionStatus.ReadingSuspended); return true; - case CloseCommand cmd: - var info = new ConnectionInfo(commander, keepOpenOnPeerClosed: false, useResumeWriting: false); - HandleClose(info, Sender, cmd.Event); - return true; - case ReceiveTimeout _: - // after sending `Register` user should watch this actor to make sure - // it didn't die because of the timeout - Log.Debug("Configured registration timeout of [{0}] expired, stopping", Tcp.Settings.RegisterTimeout); - Context.Stop(Self); - return true; - case WriteCommand write: - // When getting Write before regestered handler, have to buffer writes until registration - var buffered = _writeCommandsQueue.EnqueueSimpleWrites(write, Sender, out var commandSize); - if (!buffered) - { - var writerInfo = new ConnectionInfo(Sender, false, false); - DropWrite(writerInfo, write); - } - else - { - Log.Warning("Received Write command before Register command. " + - "It will be buffered until Register will be received (buffered write size is {0} bytes)", commandSize); - } + private static readonly IOException DroppingWriteBecauseQueueIsFullException = + new("Dropping write because queue is full"); - return true; - default: return false; - } - }; - } + private int? _partialWriteOffset = null; - /// - /// Normal connected state. - /// - private Receive Connected(ConnectionInfo info) + protected TcpConnection(TcpSettings settings, Socket socket, bool pullMode) { - var handleWrite = HandleWriteMessages(info); - return message => - { - if (handleWrite(message)) return true; - switch (message) - { - case SuspendReading _: SuspendReading(); return true; - case ResumeReading _: ResumeReading(); return true; - case SocketReceived _: DoRead(info, null); return true; - case CloseCommand cmd: HandleClose(info, Sender, cmd.Event); return true; - default: return false; - } - }; - } + Settings = settings; + _maxQueuedBytes = settings.WriteCommandsQueueMaxSize; // –1 ⇒ unlimited; + _pendingWrites = new Queue<(Write Cmd, IActorRef Sender)>(16); + _pullMode = pullMode; - /// - /// The peer sent EOF first, but we may still want to send - /// - private Receive PeerSentEOF(ConnectionInfo info) - { - var handleWrite = HandleWriteMessages(info); - return message => - { - if (handleWrite(message)) return true; - var cmd = message as CloseCommand; - if (cmd != null) - { - HandleClose(info, Sender, cmd.Event); - return true; - } - if (message is ResumeReading) return true; - return false; - }; - } + _traceLogging = Settings.TraceLogging; + _state = ConnState.Initial(_pendingWrites); + Socket = socket ?? throw new ArgumentNullException(nameof(socket)); + _receiveBuffer = _bufferPool.Rent(settings.MaxFrameSizeBytes); + _receiveArgs = new ReadSocketAsyncEventArgs(); + _sendArgs = new AckSocketAsyncEventArgs(); + InitSocketEventArgs(); - /// - /// Connection is closing but a write has to be finished first - /// - private Receive ClosingWithPendingWrite(ConnectionInfo info, IActorRef closeCommander, ConnectionClosed closedEvent) - { - return message => + if (_pullMode) { - switch (message) - { - case SuspendReading _: SuspendReading(); return true; - case ResumeReading _: ResumeReading(); return true; - case SocketReceived _: DoRead(info, closeCommander); return true; - case SocketSent _: - AcknowledgeSent(); - if (IsWritePending) - DoWrite(info, GetAllowedPendingWrite()); - else - HandleClose(info, closeCommander, closedEvent); - return true; - case UpdatePendingWriteAndThen updatePendingWrite: - var nextWrite = updatePendingWrite.RemainingWrite; - updatePendingWrite.Work(); - - if (nextWrite.HasValue) - DoWrite(info, nextWrite); - else - HandleClose(info, closeCommander, closedEvent); - return true; - case WriteFileFailed fail: HandleError(info.Handler, fail.Cause); return true; - case Abort _: HandleClose(info, Sender, Aborted.Instance); return true; - default: return false; - } - }; + // have to wait for the first pull request to start reading + _state = _state with { ReadingSuspended = true }; + } } - /** connection is closed on our side and we're waiting from confirmation from the other side */ - private Receive Closing(ConnectionInfo info, IActorRef closeCommander) + private void InitSocketEventArgs() { - return message => - { - switch (message) - { - case SuspendReading _: SuspendReading(); return true; - case ResumeReading _: ResumeReading(); return true; - case SocketReceived _: DoRead(info, closeCommander); return true; - case Abort _: HandleClose(info, Sender, Aborted.Instance); return true; - default: return false; - } - }; + _receiveArgs.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length); + _receiveArgs.UserToken = Self; + _receiveArgs.Completed += OnCompleted; + + + _sendArgs.UserToken = Self; + _sendArgs.Completed += OnCompleted; } - private Receive HandleWriteMessages(ConnectionInfo info) + private static void OnCompleted(object? sender, SocketAsyncEventArgs e) { - return message => - { - switch (message) - { - case SocketSent _: - // Send ack to sender - AcknowledgeSent(); + if (e.UserToken is not IActorRef self) return; + self.Tell(e); + } - // If there is something to send - send it - var pendingWrite = GetAllowedPendingWrite(); - if (pendingWrite.HasValue) - { - SetStatus(ConnectionStatus.Sending); - DoWrite(info, pendingWrite); - } + /* ================================================================= */ + /* Base‑class public API */ + /* ================================================================= */ - // If message is fully sent, notify sender who sent ResumeWriting command - if (!IsWritePending && _interestedInResume != null) - { - _interestedInResume.Tell(WritingResumed.Instance); - _interestedInResume = null; - } + protected override void PostStop() + { + if (Socket.Connected) AbortSocket(); + else CloseSocket(); - return true; - case WriteCommand write: - if (HasStatus(ConnectionStatus.WritingSuspended)) - { - if (_traceLogging) Log.Debug("Dropping write because writing is suspended"); - Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseWritingIsSuspendedException)); - } + _receiveArgs.Dispose(); + _sendArgs.Dispose(); + _bufferPool.Return(_receiveBuffer); - if (HasStatus(ConnectionStatus.Sending)) - { - // If we are sending something right now, just enqueue incoming write - if (!_writeCommandsQueue.EnqueueSimpleWrites(write, Sender)) - { - DropWrite(info, write); - return true; - } - } - else - { - Option nextWrite; - if (_writeCommandsQueue.IsEmpty) - { - // If writes queue is empty, do not enqueue first write - we will send it immidiately - if (!_writeCommandsQueue.EnqueueSimpleWritesExceptFirst(write, Sender, out var simpleWriteCommand)) - { - DropWrite(info, write); - return true; - } - - nextWrite = GetNextWrite(headCommands: new []{ (simpleWriteCommand, Sender) }); - } - else - { - _writeCommandsQueue.EnqueueSimpleWrites(write, Sender); - nextWrite = GetNextWrite(); - } + // fail everything still queued + while (_pendingWrites.Count > 0) + { + var (cmd, snd) = _pendingWrites.Dequeue(); + snd.Tell(cmd.FailureMessage.WithCause(DroppingWriteBecauseClosingException)); + } - // If there is something to send and we are allowed to, lets put the next command on the wire - if (nextWrite.HasValue) - { - SetStatus(ConnectionStatus.Sending); - DoWrite(info, nextWrite.Value); - } - } + if (_closeInformation != null) + { + if (Settings.TraceLogging) + Log.Debug("[TcpConnection] sending close event [{0}] to {1}", _closeInformation.ClosedEvent, + string.Join(",", _closeInformation.NotificationsTo)); - return true; - case ResumeWriting _: - /* - * If more than one actor sends Writes then the first to send this - * message might resume too early for the second, leading to a Write of - * the second to go through although it has not been resumed yet; there - * is nothing we can do about this apart from all actors needing to - * register themselves and us keeping track of them, which sounds bad. - * - * Thus it is documented that useResumeWriting is incompatible with - * multiple writers. But we fail as gracefully as we can. - */ - ClearStatus(ConnectionStatus.WritingSuspended); - if (IsWritePending) - { - if (_interestedInResume == null) _interestedInResume = Sender; - else Sender.Tell(new CommandFailed(ResumeWriting.Instance)); - } - else Sender.Tell(WritingResumed.Instance); - return true; - case UpdatePendingWriteAndThen updatePendingWrite: - var updatedWrite = updatePendingWrite.RemainingWrite; - updatePendingWrite.Work(); - if (updatedWrite.HasValue) - DoWrite(info, updatedWrite.Value); - return true; - case WriteFileFailed fail: - HandleError(info.Handler, fail.Cause); - return true; - default: return false; - } - }; + foreach (var sub in _closeInformation.NotificationsTo) + sub.Tell(_closeInformation.ClosedEvent); + } } - - private void DropWrite(ConnectionInfo info, WriteCommand write) + + protected override void PostRestart(Exception reason) { - if (_traceLogging) Log.Debug("Dropping write because queue is full"); - Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseQueueIsFullException)); - if (info.UseResumeWriting) SetStatus(ConnectionStatus.WritingSuspended); + // have to assert that we are not restarting + throw new IllegalStateException("Restarting not supported for connection actors."); } - // AUXILIARIES and IMPLEMENTATION - /// /// Used in subclasses to start the common machinery above once a channel is connected /// @@ -435,692 +258,524 @@ protected void CompleteConnect(IActorRef commander, IEnumerable(reg => + { + _handler = reg.Handler; + if (_traceLogging) Log.Debug("[{0}] registered as connection handler", reg.Handler); + Context.WatchWith(_handler, HandlerDied.Instance); + Context.Unwatch(_commander); + _state = _state with { KeepOpenOnPeerClosed = reg.KeepOpenOnPeerClosed, ReadingSuspended = _pullMode, WritingSuspended = false }; + // set a default close event - if someone hard-kills us we log an aborted + _closeInformation = CloseInformation.Single(_handler, Aborted.Instance); + Context.SetReceiveTimeout(null); + Become(OpenBehaviour); + IssueReceive(); + TrySend(); + }); + Receive(w => + { + var queueSizeBefore = _pendingWrites.Count; + Enqueue(w); + if(_pendingWrites.Count > queueSizeBefore) { - var read = InnerRead(info, Tcp.Settings.ReceivedMessageSizeLimit, ReceiveArgs); - ClearStatus(ConnectionStatus.Receiving); - switch (read.Type) - { - case ReadResultType.AllRead: - if (!_pullMode) - ReceiveAsync(); - break; - case ReadResultType.EndOfStream: - if (_isOutputShutdown) - { - if (_traceLogging) Log.Debug("Read returned end-of-stream, our side already closed"); - DoCloseConnection(info, closeCommander, ConfirmedClosed.Instance); - } - else - { - if (_traceLogging) Log.Debug("Read returned end-of-stream, our side not yet closed"); - HandleClose(info, closeCommander, PeerClosed.Instance); - } - break; - case ReadResultType.ReadError: - HandleError(info.Handler, new SocketException((int)read.Error)); - break; - } - } - catch (SocketException cause) - { - HandleError(info.Handler, cause); + // need to log a warning here about writing before registration + Log.Warning("Received Write command before Register command. " + + "It will be buffered until Register will be received (buffered write size is {0} bytes)", w.Bytes); } - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private ReadResult InnerRead(ConnectionInfo info, int remainingLimit, SocketAsyncEventArgs ea) - { - if (remainingLimit > 0) + }); + Receive(c => HandleClose(Sender, c.Event)); + Receive(_ => { _state = _state with { ReadingSuspended = true }; }); + Receive(_ => { - //var maxBufferSpace = Math.Min(_tcp.Settings.DirectBufferSize, remainingLimit); - var readBytes = ea.BytesTransferred; - - if (_traceLogging) Log.Debug("Read [{0}] bytes.", readBytes); - if (ea.SocketError == SocketError.Success && readBytes > 0) - info.Handler.Tell(new Received(ByteString.CopyFrom(ea.Buffer, ea.Offset, ea.BytesTransferred))); - - //if (ea.SocketError == SocketError.ConnectionReset) return ReadResult.EndOfStream; - if (ea.SocketError != SocketError.Success) return new ReadResult(ReadResultType.ReadError, ea.SocketError); - if (readBytes > 0) return ReadResult.AllRead; - if (readBytes == 0) return ReadResult.EndOfStream; - - throw new IllegalStateException($"Unexpected value returned from read: {readBytes}"); - } - return ReadResult.AllRead; + _state = _state with { ReadingSuspended = false }; + }); + Receive(_ => Context.Stop(Self)); + Receive(_ => + { + // after sending `Register` user should watch this actor to make sure + // it didn't die because of the timeout + Log.Debug("Configured registration timeout of [{0}] expired, stopping", Settings.RegisterTimeout); + Context.Stop(Self); + }); } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void DoWrite(ConnectionInfo info, Option write) + private void OpenBehaviour() { - if (!write.HasValue) - return; - - // Enqueue all acks assigned to this write to be sent once write is finished - foreach (var pendingAck in write.Value.PendingAcks.Where(ackInfo => !ackInfo.Ack.Equals(NoAck.Instance))) + Receive(s => HandleReceiveCompleted(s, null)); + Receive(HandleSendCompleted); + Receive(Enqueue); + Receive(c => HandleClose(Sender, c.Event)); + SuspendResumeHandlers(); + Receive(_ => { - _pendingAcks.Enqueue(pendingAck); - } - - write.Value.DoWrite(info); + Log.Debug("Handler [{0}] died, stopping connection actor", _handler); + Context.Stop(Self); + }); + //Receive(_=> { _st = _st with { WritingSuspended=true }; }); } - private void HandleClose(ConnectionInfo info, IActorRef closeCommander, ConnectionClosed closedEvent) + private void SuspendResumeHandlers() { - SetStatus(ConnectionStatus.ShutdownRequested); - - if (closedEvent is Aborted) - { - if (_traceLogging) Log.Debug("Got Abort command. RESETing connection."); - DoCloseConnection(info, closeCommander, closedEvent); - } - else if (closedEvent is PeerClosed && info.KeepOpenOnPeerClosed) + Receive(_ => { - // report that peer closed the connection - info.Handler.Tell(PeerClosed.Instance); - // used to check if peer already closed its side later - _peerClosed = true; - Context.Become(PeerSentEOF(info)); - } - else if (IsWritePending) // finish writing first - { - UnsignDeathPact(); - if (_traceLogging) Log.Debug("Got Close command but write is still pending."); - Context.Become(ClosingWithPendingWrite(info, closeCommander, closedEvent)); - } - else if (closedEvent is ConfirmedClosed) // shutdown output and wait for confirmation - { - if (_traceLogging) Log.Debug("Got ConfirmedClose command, sending FIN."); - - // If peer closed first, the socket is now fully closed. - // Also, if shutdownOutput threw an exception we expect this to be an indication - // that the peer closed first or concurrently with this code running. - if (_peerClosed || !SafeShutdownOutput()) - DoCloseConnection(info, closeCommander, closedEvent); - else Context.Become(Closing(info, closeCommander)); - } - // close gracefully now - else + _state = _state with { ReadingSuspended = false }; + IssueReceive(); + }); + Receive(_ => { _state = _state with { ReadingSuspended = true }; }); + Receive(_ => { - if (_traceLogging) Log.Debug("Got Close command, closing connection."); - try - { - Socket.Shutdown(SocketShutdown.Both); - } - catch (SocketException e) - { - Log.Error("Socket shutdown failed with [{0}]", e); - } - DoCloseConnection(info, closeCommander, closedEvent); - } + _state = _state with { WritingSuspended = false }; + TrySend(); + }); } - private void DoCloseConnection(ConnectionInfo info, IActorRef closeCommander, ConnectionClosed closedEvent) + private void PeerSentEOF() { - if (closedEvent is Aborted) Abort(); - else + Receive(HandleSendCompleted); + Receive(Enqueue); + Receive(c => HandleClose(Sender, c.Event)); + SuspendResumeHandlers(); + Receive(_ => { - CloseSocket(); - } - - var notifications = new HashSet(); - if (info.Handler != null) notifications.Add(info.Handler); - if (closeCommander != null) notifications.Add(closeCommander); - StopWith(new CloseInformation(notifications, closedEvent)); + Log.Debug("Handler [{0}] died, stopping connection actor", _handler); + Context.Stop(Self); + }); } - private void HandleError(IActorRef handler, SocketException exception) + private void ClosingWithPendingWrite(IActorRef closeSender, ConnectionClosed e) { - Log.Debug("Closing connection due to IO error {0}", exception); - StopWith(new CloseInformation(new HashSet(new[] { handler }), new ErrorClosed(exception.Message))); + Receive(s => HandleReceiveCompleted(s, closeSender)); + Receive(s => + { + HandleSendCompleted(s); + if (!_state.HasPending) + { + // we are finished sending + HandleClose(closeSender, e); + } + }); + Receive(Enqueue); + Receive(c => HandleClose(Sender, c.Event)); + SuspendResumeHandlers(); } - private bool SafeShutdownOutput() + /// + /// Connection is closed on our side, and we're waiting from confirmation from the other side. + /// + private void Closing(IActorRef closeSender) { - try + Receive(s => HandleReceiveCompleted(s, closeSender)); + Receive(HandleSendCompleted); + Receive(w => { - Socket.Shutdown(SocketShutdown.Send); - _isOutputShutdown = true; - return true; - } - catch (SocketException) + // fail all writes + Sender.Tell(w.FailureMessage.WithCause(DroppingWriteBecauseClosingException)); + }); + Receive(c => HandleClose(Sender, c.Event)); + SuspendResumeHandlers(); + Receive(h => { - return false; - } + Log.Debug("Handler [{0}] died, stopping connection actor", _handler); + Context.Stop(Self); + }); } - protected void AcquireSocketAsyncEventArgs() - { - if (ReceiveArgs != null) throw new InvalidOperationException("Cannot acquire receive SocketAsyncEventArgs. It's already has been initialized"); - if (SendArgs != null) throw new InvalidOperationException("Cannot acquire send SocketAsyncEventArgs. It's already has been initialized"); - - ReceiveArgs = CreateSocketEventArgs(Self); - var buffer = Tcp.BufferPool.Rent(); - ReceiveArgs.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); + /* ----------------------------------------------------------------- */ + /* Socket‑event handlers */ + /* ----------------------------------------------------------------- */ - SendArgs = CreateSocketEventArgs(Self); - } + private long _totalSentBytes; + private long _totalReceivedBytes; - private void ReleaseSocketAsyncEventArgs() + private void HandleReceiveCompleted(SocketAsyncEventArgs ea, IActorRef? closeCommander) { - if (ReceiveArgs != null) + _state = _state with { IsReceiving = false }; + if (ea is { SocketError: SocketError.Success, BytesTransferred: > 0 }) { - var buffer = new ByteBuffer(ReceiveArgs.Buffer, ReceiveArgs.Offset, ReceiveArgs.Count); - ReleaseSocketEventArgs(ReceiveArgs); - // TODO: When using DirectBufferPool as a pool impelementation, there is potential risk, - // that socket was working while released. In that case releasing buffer may not be safe. - Tcp.BufferPool.Release(buffer); - ReceiveArgs = null; - } + if (Settings.TraceLogging) + { + _totalReceivedBytes += ea.BytesTransferred; + Log.Debug("[TcpConnection] received {0} bytes [{1} total]", ea.BytesTransferred, + _totalReceivedBytes); + } - if (SendArgs != null) - { - ReleaseSocketEventArgs(SendArgs); - SendArgs = null; - } - } + _handler!.Tell(new Received(ByteString.CopyFrom(_receiveBuffer, 0, ea.BytesTransferred))); - protected SocketAsyncEventArgs CreateSocketEventArgs(IActorRef onCompleteNotificationsReceiver) - { - SocketCompleted ResolveMessage(SocketAsyncEventArgs e) - { - switch (e.LastOperation) + if (_pullMode) { - case SocketAsyncOperation.Receive: - case SocketAsyncOperation.ReceiveFrom: - case SocketAsyncOperation.ReceiveMessageFrom: - return SocketReceived.Instance; - case SocketAsyncOperation.Send: - case SocketAsyncOperation.SendTo: - case SocketAsyncOperation.SendPackets: - return SocketSent.Instance; - case SocketAsyncOperation.Accept: - return SocketAccepted.Instance; - case SocketAsyncOperation.Connect: - return SocketConnected.Instance; - default: - throw new NotSupportedException($"Socket operation {e.LastOperation} is not supported"); + // in pull mode we need to wait for the next pull request + _state = _state with { ReadingSuspended = true }; + } + else + { + IssueReceive(); } - } - var args = new SocketAsyncEventArgs(); - args.UserToken = onCompleteNotificationsReceiver; - args.Completed += (_, e) => + return; + } + + // check for an error code + if (ea.SocketError != SocketError.Success) { - var actorRef = e.UserToken as IActorRef; - var completeMsg = ResolveMessage(e); - actorRef?.Tell(completeMsg); - }; - - return args; - } - - protected void ReleaseSocketEventArgs(SocketAsyncEventArgs e) - { - e.UserToken = null; - e.AcceptSocket = null; - - try + if(_traceLogging) + Log.Debug("[TcpConnection] read failed with error [{0}]", ea.SocketError); + HandleError(new SocketException((int)ea.SocketError)); + return; + } + + // check for EOF + if (ea.BytesTransferred == 0) { - e.SetBuffer(null, 0, 0); - if (e.BufferList != null) - e.BufferList = null; + if (_state.OutputShutdown) + { + if(_traceLogging) + Log.Debug("[TcpConnection] EOF received; our side is already closed. Closing connection."); + DoCloseConnection(closeCommander ?? _handler!, ConfirmedClosed.Instance); + } + else + { + if (_traceLogging) + Log.Debug("[TcpConnection] EOF received"); + _state = _state with { PeerClosed = true }; + HandleClose(closeCommander ?? _handler!, PeerClosed.Instance); + } } - // it can be that for some reason socket is in use and haven't closed yet - catch (InvalidOperationException) { } - - e.Dispose(); - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void CloseSocket() + private void HandleSendCompleted(AckSocketAsyncEventArgs ea) { - Socket.Dispose(); - _isOutputShutdown = true; - ReleaseSocketAsyncEventArgs(); - } + _state = _state with { IsSending = false }; - private void Abort() - { - try + if (_traceLogging) + Log.Debug($"[TcpConnection] HandleSendCompleted: BytesTransferred={ea.BytesTransferred}, PendingAcks={ea.PendingAcks.Count}, PartialWriteOffset={_partialWriteOffset}"); + + if (ea.SocketError != SocketError.Success) { - Socket.LingerState = new LingerOption(true, 0); // causes the following close() to send TCP RST + if(_traceLogging) + Log.Debug("[TcpConnection] write failed with error [{0}]", ea.SocketError); + HandleError(new SocketException((int)ea.SocketError)); + return; } - catch (Exception e) + + if (Settings.TraceLogging) { - if (_traceLogging) Log.Debug("setSoLinger(true, 0) failed with [{0}]", e); + _totalSentBytes += ea.BytesTransferred; + Log.Debug("[TcpConnection] completed write of {0}/{1} bytes (queued={2}/{3}) [{4} total sent]", + ea.BytesTransferred, ea.BufferList.Sum(c => c.Count), _state.QueuedBytes, _maxQueuedBytes, + _totalSentBytes); } - CloseSocket(); - } + foreach (var (c, ack) in ea.PendingAcks) + c.Tell(ack); - protected void StopWith(CloseInformation closeInfo) - { - _closedMessage = closeInfo; - UnsignDeathPact(); - Context.Stop(Self); + ea.ClearAcks(); + ea.BufferList = null; // release refs + TrySend(); } - private void ReceiveAsync() - { - if (!HasStatus(ConnectionStatus.Receiving)) - { - if (!Socket.ReceiveAsync(ReceiveArgs)) - Self.Tell(SocketReceived.Instance); + /* ----------------------------------------------------------------- */ + /* Read / Write helpers */ + /* ----------------------------------------------------------------- */ - SetStatus(ConnectionStatus.Receiving); - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool HasStatus(ConnectionStatus connectionStatus) + private void IssueReceive() { - // don't use Enum.HasFlag - it's using reflection underneat - var s = (int)connectionStatus; - return ((int)_status & s) == s; + if (!_state.CanReceive || _state.IsReceiving) return; + _receiveArgs.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length); + _state = _state with { IsReceiving = true }; + if (!Socket.ReceiveAsync(_receiveArgs)) Self.Tell(_receiveArgs, Self); } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void SetStatus(ConnectionStatus connectionStatus) => _status |= connectionStatus; - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void ClearStatus(ConnectionStatus connectionStatus) => _status &= ~connectionStatus; - - protected override void PostStop() + private void Enqueue(WriteCommand cmd) { - if (Socket.Connected) Abort(); - else CloseSocket(); - - // We do never store pending writes between messages anymore, so nothing is acquired and nothing to release - - // always try to release SocketAsyncEventArgs to avoid memory leaks - ReleaseSocketAsyncEventArgs(); - - if (_closedMessage != null) + var b = (int)cmd.Bytes; + if (_maxQueuedBytes >= 0 && _state.QueuedBytes + b > _maxQueuedBytes) { - var interestedInClose = _writeCommandsQueue.TryGetNext(out var pending) - ? _closedMessage.NotificationsTo.Union(_writeCommandsQueue.DequeueAll().Select(cmd => cmd.Sender)) - : _closedMessage.NotificationsTo; - - foreach (var listener in interestedInClose) - { - listener.Tell(_closedMessage.ClosedEvent); - } + Sender.Tell(cmd.FailureMessage.WithCause(DroppingWriteBecauseQueueIsFullException)); + return; } - } + + EnqueueInner(); - protected override void PostRestart(Exception reason) - { - throw new IllegalStateException("Restarting not supported for connection actors."); - } + _state = _state with { QueuedBytes = _state.QueuedBytes + b }; + TrySend(); + return; - private Option GetNextWrite(IEnumerable<(SimpleWriteCommand Command, IActorRef Sender)> headCommands = null) - { - headCommands = headCommands ?? ImmutableList<(SimpleWriteCommand Command, IActorRef Sender)>.Empty; - var writeCommands = new List<(Write Command, IActorRef Sender)>(_writeCommandsQueue.ItemsCount); - foreach (var commandInfo in headCommands.Concat(_writeCommandsQueue.DequeueAll())) + void EnqueueInner() { - switch (commandInfo.Command) + switch (cmd) { - case Write w when !w.Data.IsEmpty: - // Have real write - go on and put it to the wire - writeCommands.Add((w, commandInfo.Sender)); + case Write realWrite: + _pendingWrites.Enqueue((realWrite, Sender)); break; - case Write w: - // Write command is empty, so just sending Ask if required - if (w.WantsAck) commandInfo.Sender.Tell(w.Ack); + case CompoundWrite compounds: //TODO: poorly designed API we should remove + foreach (var c in compounds) + { + if(c is Write w) + { + _pendingWrites.Enqueue((w, Sender)); + } + else + { + Sender.Tell(c.FailureMessage.WithCause(new InvalidOperationException($"Cannot enqueue {c} - only valid classes are Write and CompoundWrite"))); + } + } break; default: - //TODO: there's no TransmitFile API - .NET Core doesn't support it at all - throw new InvalidOperationException("Non reachable code"); + Sender.Tell(cmd.FailureMessage.WithCause(new InvalidOperationException($"Cannot enqueue {cmd} - only valid classes are Write and CompoundWrite"))); + break; } } + } + + private void TrySend() + { + if (_traceLogging) + Log.Debug($"[TcpConnection] TrySend called. IsSending={_state.IsSending}, PendingWrites={_pendingWrites.Count}, CanSend={_state.CanSend}, PartialWriteOffset={_partialWriteOffset}"); + if (!_state.CanSend) return; + if (_state.IsSending || _pendingWrites.Count == 0) return; + + var segs = new List>(8); + var batchBytes = 0; - if (writeCommands.Count > 0) + while (_pendingWrites.Count > 0 && batchBytes < Settings.MaxFrameSizeBytes) { - return CreatePendingBufferWrite(writeCommands); - } + var (w, snd) = _pendingWrites.Peek(); - // No more writes out there - return Option.None; - } + var data = w.Data; + var offset = _partialWriteOffset ?? 0; + var remaining = data.Count - offset; - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private PendingWrite CreatePendingBufferWrite(List<(Write Command, IActorRef Sender)> writes) - { - var acks = writes.Select(w => (w.Sender, (object)w.Command.Ack)).ToImmutableList(); - var dataList = writes.Select(w => w.Command.Data); - return new PendingBufferWrite(this, SendArgs, Self, acks, dataList, Tcp.BufferPool); - } + // Handle empty writes immediately + if (remaining == 0) + { + _pendingWrites.Dequeue(); + _state = _state with { QueuedBytes = _state.QueuedBytes - w.Data.Count }; + _partialWriteOffset = null; + if (w.WantsAck) snd.Tell(w.Ack); // message was already sent - ACK right away + if (_traceLogging) + Log.Debug($"[TcpConnection] TrySend: encountered empty write, dequeued. Remaining queue: {_pendingWrites.Count}"); + continue; + } - //TODO: Port File IO - currently .NET Core doesn't support TransmitFile API + var toSend = Math.Min(remaining, Settings.MaxFrameSizeBytes - batchBytes); - private enum ReadResultType - { - EndOfStream, - AllRead, - ReadError, - } + if (_traceLogging) + Log.Debug($"[TcpConnection] TrySend batching: offset={offset}, remaining={remaining}, toSend={toSend}, batchBytes={batchBytes}"); - private struct ReadResult - { - public static readonly ReadResult EndOfStream = new(ReadResultType.EndOfStream, SocketError.Success); - public static readonly ReadResult AllRead = new(ReadResultType.AllRead, SocketError.Success); + // non-copying operation - just creates a new ArraySegment without copying any bytes + var chunk = data.Slice(offset, toSend); + segs.AddRange(chunk.Buffers); + batchBytes += toSend; - public readonly ReadResultType Type; - public readonly SocketError Error; + if (toSend == remaining) + { + // Full write completed + _pendingWrites.Dequeue(); + _state = _state with { QueuedBytes = _state.QueuedBytes - w.Data.Count }; + _partialWriteOffset = null; + if (w.WantsAck) _sendArgs.PendingAcks.Add((snd, w.Ack)); + if (_traceLogging) + Log.Debug($"[TcpConnection] TrySend: completed full write, dequeued. Remaining queue: {_pendingWrites.Count}"); + } + else + { + // Partial write, update offset and break + _partialWriteOffset = offset + toSend; + if (_traceLogging) + Log.Debug($"[TcpConnection] TrySend: partial write, will resume at offset {_partialWriteOffset}"); + break; + } + } - public ReadResult(ReadResultType type, SocketError error) + if (segs.Count == 0) { - Type = type; - Error = error; + if (_traceLogging) + Log.Debug("[TcpConnection] TrySend: no segments to send (only empty writes encountered)"); + return; } - } - - /// - /// Used to transport information to the postStop method to notify - /// interested party about a connection close. - /// - protected sealed class CloseInformation - { - /// - /// TBD - /// - public ISet NotificationsTo { get; } - public Tcp.Event ClosedEvent { get; } - public CloseInformation(ISet notificationsTo, Tcp.Event closedEvent) - { - NotificationsTo = notificationsTo; - ClosedEvent = closedEvent; + _sendArgs.BufferList = segs; + _state = _state with { IsSending = true }; + if (_traceLogging) + Log.Debug($"[TcpConnection] TrySend: sending {segs.Count} segments, total bytes={batchBytes}"); + if (!Socket.SendAsync(_sendArgs)) Self.Tell(_sendArgs, Self); + } + + /* ====================================================================*/ + /* Shutdown decision */ + /* ====================================================================*/ + private void HandleClose(IActorRef closeSender, ConnectionClosed closeEvent) + { + switch (closeEvent) + { + case Aborted: + if(_traceLogging) + Log.Debug("Got Abort command. RESETing connection."); + DoCloseConnection(closeSender, closeEvent); + break; + // this shouldn't happen really - ErrorClosed is mostly just a message we send to handler. + // but in case we get it, we should close the connection immediately. + case ErrorClosed: + DoCloseConnection(closeSender, closeEvent); + break; + case PeerClosed when _state.KeepOpenOnPeerClosed: + _handler.Tell(PeerClosed.Instance); + _state = _state with { PeerClosed = true }; + Become(PeerSentEOF); + break; + case not null when _state.HasPending: + Context.Unwatch(_handler); // stop watching the handler + if(_traceLogging) + Log.Debug("Got Close command but write is still pending."); + Become(() => ClosingWithPendingWrite(closeSender, closeEvent)); + break; + case ConfirmedClosed: //shutdown output and wait for confirmation + if(_traceLogging) + Log.Debug("Got ConfirmedClose command, sending FIN."); + /* + * If peer closed first, the socket is now fully closed. + * Also, if ShutdownOutput threw an exception we expect this to be an indication + * that the peer closed first or concurrently with this code running. + */ + if(_state.PeerClosed || !ShutdownOutput()) + { + DoCloseConnection(closeSender, closeEvent); + } + else + { + if(_traceLogging) + Log.Debug("Got ConfirmedClose command, but write is still pending."); + Become(() => Closing(closeSender)); + } + break; + default: // no pending writes, not required to stay open when peer is closed + if(_traceLogging) + Log.Debug("Got Close command, closing connection."); + try + { + Socket.Shutdown(SocketShutdown.Both); + } + catch (SocketException e) + { + Log.Error(e, "Graceful socket shutdown failed"); + } + DoCloseConnection(closeSender, closeEvent!); + break; } } - /// - /// Groups required connection-related data that are only available once the connection has been fully established. - /// - private sealed class ConnectionInfo + private void HandleError(SocketException e) { - public readonly IActorRef Handler; - public readonly bool KeepOpenOnPeerClosed; - public readonly bool UseResumeWriting; - - public ConnectionInfo(IActorRef handler, bool keepOpenOnPeerClosed, bool useResumeWriting) + Log.Debug(e, "Closing connection due to I/O error: {0}", e.SocketErrorCode); + var errorClosed = new ErrorClosed(e.Message); + if(_closeInformation != null) { - Handler = handler; - KeepOpenOnPeerClosed = keepOpenOnPeerClosed; - UseResumeWriting = useResumeWriting; + _closeInformation = _closeInformation with { ClosedEvent = errorClosed }; } - } - - // INTERNAL MESSAGES - private sealed class UpdatePendingWriteAndThen : INoSerializationVerificationNeeded - { - public Option RemainingWrite { get; } - public Action Work { get; } - - public UpdatePendingWriteAndThen(Option remainingWrite, Action work) + else { - RemainingWrite = remainingWrite; - Work = work; + _closeInformation = CloseInformation.Single(_handler ?? _commander!, errorClosed); } + Context.Stop(Self); } - private sealed class WriteFileFailed + private bool ShutdownOutput() { - public SocketException Cause { get; } - - public WriteFileFailed(SocketException cause) + try { - Cause = cause; + Socket.Shutdown(SocketShutdown.Send); + _state = _state with { OutputShutdown = true }; + return true; } - } - - private abstract class PendingWrite - { - public IImmutableList<(IActorRef Commander, object Ack)> PendingAcks { get; } - - protected PendingWrite(IImmutableList<(IActorRef Commander, object Ack)> pendingAcks) + catch (SocketException) { - PendingAcks = pendingAcks; + return false; } - - public abstract void DoWrite(ConnectionInfo info); } - - private sealed class PendingBufferWrite : PendingWrite + + private void DoCloseConnection(IActorRef closeSender, ConnectionClosed closedEvent) { - private readonly TcpConnection _connection; - private readonly IActorRef _self; - private readonly IEnumerable _dataToSend; - private readonly IBufferPool _bufferPool; - private readonly SocketAsyncEventArgs _sendArgs; - - public PendingBufferWrite( - TcpConnection connection, - SocketAsyncEventArgs sendArgs, - IActorRef self, - IImmutableList<(IActorRef Commander, object Ack)> acks, - IEnumerable dataToSend, - IBufferPool bufferPool) : base(acks) + switch (closedEvent) { - _connection = connection; - _sendArgs = sendArgs; - _self = self; - _dataToSend = dataToSend; - _bufferPool = bufferPool; + case Aborted: + AbortSocket(); + break; + default: + CloseSocket(); + break; } - public override void DoWrite(ConnectionInfo info) - { - try - { - _sendArgs.SetBuffer(_dataToSend); - if (!_connection.Socket.SendAsync(_sendArgs)) - _self.Tell(SocketSent.Instance); - } - catch (SocketException e) - { - _connection.HandleError(info.Handler, e); - } - } + StopWith(new CloseInformation(ImmutableHashSet.Empty.Add(closeSender), closedEvent)); } - public class PendingSimpleWritesQueue + private void CloseSocket() { - private readonly ILoggingAdapter _log; - private readonly Option _maxQueueSizeInBytes; - private readonly Queue<(SimpleWriteCommand Command, IActorRef Commander, int Size)> _queue; - private int _totalSizeInBytes = 0; - - public PendingSimpleWritesQueue(ILoggingAdapter log, Option maxQueueSizeInBytes) + try { - _log = log; - _maxQueueSizeInBytes = maxQueueSizeInBytes; - _queue = new Queue<(SimpleWriteCommand Command, IActorRef Commander, int Size)>(); + Socket.Close(); } - - /// - /// Gets total number of items in queue - /// - public int ItemsCount => _queue.Count; - - /// - /// Adds all subcommands stored in provided command. - /// Performs buffer size checks - /// - /// - /// Thrown when data to buffer is larger then allowed - /// - public bool EnqueueSimpleWrites(WriteCommand command, IActorRef sender) + catch { - return EnqueueSimpleWrites(command, sender, out _); + /* ignore */ } - /// - /// Adds all subcommands stored in provided command. - /// Performs buffer size checks - /// - /// - /// Thrown when data to buffer is larger then allowed - /// - public bool EnqueueSimpleWrites(WriteCommand command, IActorRef sender, out int bufferedSize) + try { - bufferedSize = 0; - - foreach (var writeInfo in ExtractFromCommand(command)) - { - var sizeAfterAppending = _totalSizeInBytes + writeInfo.DataSize; - if (_maxQueueSizeInBytes.HasValue && _maxQueueSizeInBytes.Value < sizeAfterAppending) - { - _log.Warning("Could not receive write command of size {0} bytes, " + - "because buffer limit is {1} bytes and " + - "it is already {2} bytes", writeInfo.DataSize, _maxQueueSizeInBytes, _totalSizeInBytes); - return false; - } - - _totalSizeInBytes = sizeAfterAppending; - _queue.Enqueue((writeInfo.Command, sender, writeInfo.DataSize)); - bufferedSize += writeInfo.DataSize; - } - - return true; + Socket.Dispose(); } - - /// - /// Adds all subcommands stored in provided command. - /// Performs buffer size checks for all, except first one, that is not buffered - /// - /// - /// Not buffered (and not checked) first - /// - /// - /// Thrown when data to buffer is larger then allowed - /// - public bool EnqueueSimpleWritesExceptFirst(WriteCommand command, IActorRef sender, out SimpleWriteCommand first) + catch { - first = null; - foreach (var writeInfo in ExtractFromCommand(command)) - { - if (first == null) - { - first = writeInfo.Command; - continue; - } - - var sizeAfterAppending = _totalSizeInBytes + writeInfo.DataSize; - if (_maxQueueSizeInBytes.HasValue && _maxQueueSizeInBytes.Value < sizeAfterAppending) - { - _log.Warning("Could not receive write command of size {0} bytes, " + - "because buffer limit is {1} bytes and " + - "it is already {2} bytes", writeInfo.DataSize, _maxQueueSizeInBytes, _totalSizeInBytes); - return false; - } - - _totalSizeInBytes = sizeAfterAppending; - _queue.Enqueue((writeInfo.Command, sender, writeInfo.DataSize)); - } - - return true; + /* ignore */ } - /// - /// Gets next command from the queue, if any - /// - public (SimpleWriteCommand, IActorRef Sender) Dequeue() - { - if (_queue.Count == 0) - throw new InvalidOperationException("Write commands queue is empty"); - - var (command, sender, size) = _queue.Dequeue(); - _totalSizeInBytes -= size; - return (command, sender); - } + _state = _state with { OutputShutdown = true, ReadingSuspended = true }; + } - /// - /// Dequeue all elements one by one - /// - /// - public IEnumerable<(SimpleWriteCommand Command, IActorRef Sender)> DequeueAll() + private void AbortSocket() + { + try { - while (TryGetNext(out var command)) - yield return command; + Socket.LingerState = new LingerOption(true, 0); // causes the following close() to send TCP RST } - - /// - /// Gets next command from the queue, if any - /// - public bool TryGetNext(out (SimpleWriteCommand Command, IActorRef Sender) command) + catch (Exception e) { - command = default; - if (_queue.Count == 0) - return false; - - command = Dequeue(); - return true; + if (_traceLogging) Log.Debug("setSoLinger(true, 0) failed with [{0}]", e); } - /// - /// Checks if commands queue is empty - /// - public bool IsEmpty => _totalSizeInBytes == 0; + CloseSocket(); + } - private IEnumerable<(SimpleWriteCommand Command, int DataSize)> ExtractFromCommand(WriteCommand command) + protected sealed record CloseInformation(ImmutableHashSet NotificationsTo, Tcp.Event ClosedEvent) + { + public static CloseInformation Single(IActorRef closeSender, Tcp.Event closedEvent) { - switch (command) - { - case Write write: - yield return (write, write.Data.Count); - break; - case CompoundWrite compoundWrite: - var extractedFromHead = ExtractFromCommand(compoundWrite.Head); - var extractedFromTail = ExtractFromCommand(compoundWrite.TailCommand); - foreach (var extractedSimple in extractedFromHead.Concat(extractedFromTail)) - { - yield return extractedSimple; - } - break; - default: - throw new ArgumentException($"Trying to calculate size of unknown write type: {command.GetType().FullName}"); - } + return new CloseInformation(ImmutableHashSet.Empty.Add(closeSender), closedEvent); } } } -} +} \ No newline at end of file diff --git a/src/core/Akka/IO/TcpIncomingConnection.cs b/src/core/Akka/IO/TcpIncomingConnection.cs index 7774e4c2430..46efdc4f011 100644 --- a/src/core/Akka/IO/TcpIncomingConnection.cs +++ b/src/core/Akka/IO/TcpIncomingConnection.cs @@ -20,21 +20,13 @@ internal sealed class TcpIncomingConnection : TcpConnection { private readonly IActorRef _bindHandler; private readonly IEnumerable _options; - - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - public TcpIncomingConnection(TcpExt tcp, + + public TcpIncomingConnection(TcpSettings settings, Socket socket, IActorRef bindHandler, IEnumerable options, bool readThrottling) - : base(tcp, socket, readThrottling, Option.None) + : base(settings, socket, readThrottling) { _bindHandler = bindHandler; _options = options; @@ -44,19 +36,7 @@ public TcpIncomingConnection(TcpExt tcp, protected override void PreStart() { - AcquireSocketAsyncEventArgs(); - CompleteConnect(_bindHandler, _options); } - - /// - /// TBD - /// - /// TBD - /// TBD - protected override bool Receive(object message) - { - throw new NotSupportedException(); - } } } diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index 198ec16e480..01dd20f3486 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -88,9 +88,9 @@ private ConnectionTerminated() public static ConnectionTerminated Instance { get; } = new(); } - private sealed record AcceptCompleted(SocketAsyncEventArgs EventArgs) : INoSerializationVerificationNeeded; + private sealed record AcceptCompleted(SocketAsyncEventArgs EventArgs) : INoSerializationVerificationNeeded, IDeadLetterSuppression; - private sealed record RetryAccept(SocketAsyncEventArgs EventArgs) : INoSerializationVerificationNeeded; + private sealed record RetryAccept(SocketAsyncEventArgs EventArgs) : INoSerializationVerificationNeeded, IDeadLetterSuppression; public TcpListener(TcpExt tcp, IActorRef bindCommander, Tcp.Bind bind) @@ -268,7 +268,7 @@ private void HandleAccept(SocketAsyncEventArgs saea) var accepted = saea.AcceptSocket!; saea.AcceptSocket = null; // ready for re‑use var incomingConnection = Context.ActorOf(Props - .Create(_tcp, accepted, _bind.Handler, _bind.Options, _bind.PullMode) + .Create(_bind.TcpSettings ?? _tcp.Settings, accepted, _bind.Handler, _bind.Options, _bind.PullMode) .WithDeploy(Deploy.Local)); // set up the watch for monitoring purposes diff --git a/src/core/Akka/IO/TcpManager.cs b/src/core/Akka/IO/TcpManager.cs index 2c1189008ff..cfd369c34a6 100644 --- a/src/core/Akka/IO/TcpManager.cs +++ b/src/core/Akka/IO/TcpManager.cs @@ -54,6 +54,22 @@ internal sealed class TcpManager : ActorBase { private readonly TcpExt _tcp; + const string TcpListenerNamePrefix = "tcp-listener"; + const string TcpOutgoingConnectionNamePrefix = "tcp-client-connection"; + + private long _tcpListenerCounter; + private long _tcpOutgoingConnectionCounter; + + private string NextTcpListenerName() + { + return $"{TcpListenerNamePrefix}-{_tcpListenerCounter++}"; + } + + private string NextTcpOutgoingConnectionName() + { + return $"{TcpOutgoingConnectionNamePrefix}-{_tcpOutgoingConnectionCounter++}"; + } + public TcpManager(TcpExt tcp) { _tcp = tcp; @@ -66,13 +82,13 @@ protected override bool Receive(object message) case Connect c: { var commander = Sender; - Context.ActorOf(Props.Create(_tcp, commander, c).WithDeploy(Deploy.Local)); + Context.ActorOf(Props.Create(_tcp, commander, c).WithDeploy(Deploy.Local), NextTcpOutgoingConnectionName()); return true; } case Bind b: { var commander = Sender; - Context.ActorOf(Props.Create(_tcp, commander, b).WithDeploy(Deploy.Local)); + Context.ActorOf(Props.Create(_tcp, commander, b).WithDeploy(Deploy.Local), NextTcpListenerName()); return true; } default: diff --git a/src/core/Akka/IO/TcpOutgoingConnection.cs b/src/core/Akka/IO/TcpOutgoingConnection.cs index 075caa4ee82..6f0f7764ca9 100644 --- a/src/core/Akka/IO/TcpOutgoingConnection.cs +++ b/src/core/Akka/IO/TcpOutgoingConnection.cs @@ -6,7 +6,6 @@ //----------------------------------------------------------------------- using System; -using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; @@ -14,7 +13,6 @@ using Akka.Actor; using Akka.Annotations; using Akka.Event; -using Akka.Util; namespace Akka.IO { @@ -29,40 +27,49 @@ internal sealed class TcpOutgoingConnection : TcpConnection private SocketAsyncEventArgs _connectArgs; - private readonly ConnectException finishConnectNeverReturnedTrueException = + private readonly ConnectException _finishConnectNeverReturnedTrueException = new("Could not establish connection because finishConnect never returned true"); public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect) : base( - tcp, - tcp.Settings.OutgoingSocketForceIpv4 - ? new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) { Blocking = false } - : new Socket(SocketType.Stream, ProtocolType.Tcp) { Blocking = false }, - connect.PullMode, - tcp.Settings.WriteCommandsQueueMaxSize >= 0 ? tcp.Settings.WriteCommandsQueueMaxSize : Option.None) + (connect.TcpSettings ?? tcp.Settings), + (connect.TcpSettings ?? tcp.Settings).OutgoingSocketForceIpv4 + ? new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) { Blocking = false } + : new Socket(SocketType.Stream, ProtocolType.Tcp) { Blocking = false }, connect.PullMode) { _commander = commander; _connect = connect; - SignDeathPact(commander); - foreach (var option in connect.Options) { option.BeforeConnect(Socket); } - + if (connect.LocalAddress != null) Socket.Bind(connect.LocalAddress); if (connect.Timeout.HasValue) - Context.SetReceiveTimeout(connect.Timeout.Value); //Initiate connection timeout if supplied + Context.SetReceiveTimeout(connect.Timeout.Value); //Initiate connection timeout if supplied } private void ReleaseConnectionSocketArgs() { if (_connectArgs != null) { - ReleaseSocketEventArgs(_connectArgs); + _connectArgs.UserToken = null; + _connectArgs.AcceptSocket = null; + + try + { + _connectArgs.SetBuffer(null, 0, 0); + _connectArgs.BufferList = null; + } + // it can be that for some reason socket is in use and haven't closed yet + catch (InvalidOperationException) + { + } + + _connectArgs.Dispose(); _connectArgs = null; } } @@ -71,7 +78,10 @@ private void Stop(Exception cause) { ReleaseConnectionSocketArgs(); - StopWith(new CloseInformation(new HashSet(new[] {_commander}), _connect.FailureMessage.WithCause(cause))); + var failureEvent = _connect.FailureMessage.WithCause(cause); + var closeInfo = CloseInformation.Single(_commander, failureEvent); + StopWith(closeInfo); + Context.Stop(Self); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -87,7 +97,7 @@ private void ReportConnectFailure(Action thunk) Stop(e); } } - + protected override void PreStart() { ReportConnectFailure(() => @@ -97,9 +107,10 @@ protected override void PreStart() Log.Debug("Resolving {0} before connecting", remoteAddress.Host); var resolved = Dns.ResolveName(remoteAddress.Host, Context.System, Self); if (resolved == null) - Become(Resolving(remoteAddress)); + Become(() => Resolving(remoteAddress)); else if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // one of both families - Register(new IPEndPoint(resolved.Ipv4.FirstOrDefault(), remoteAddress.Port), new IPEndPoint(resolved.Ipv6.FirstOrDefault(), remoteAddress.Port)); + Register(new IPEndPoint(resolved.Ipv4.First(), remoteAddress.Port), + new IPEndPoint(resolved.Ipv6.First(), remoteAddress.Port)); else // one or the other Register(new IPEndPoint(resolved.Addr, remoteAddress.Port), null); } @@ -107,7 +118,9 @@ protected override void PreStart() { Register(point, null); } - else throw new NotSupportedException($"Couldn't connect to [{_connect.RemoteAddress}]: only IP and DNS-based endpoints are supported"); + else + throw new NotSupportedException( + $"Couldn't connect to [{_connect.RemoteAddress}]: only IP and DNS-based endpoints are supported"); }); } @@ -119,33 +132,46 @@ protected override void PostStop() base.PostStop(); } - protected override bool Receive(object message) + private void Resolving(DnsEndPoint remoteAddress) { - throw new NotSupportedException(); + Receive(resolved => + { + if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // multiple addresses + { + ReportConnectFailure(() => Register( + new IPEndPoint(resolved.Ipv4.First(), remoteAddress.Port), + new IPEndPoint(resolved.Ipv6.First(), remoteAddress.Port))); + } + else // only one address family. No fallbacks. + { + ReportConnectFailure(() => Register( + new IPEndPoint(resolved.Addr, remoteAddress.Port), + null)); + } + }); } - private Receive Resolving(DnsEndPoint remoteAddress) + private static SocketAsyncEventArgs CreateSocketEventArgs(IActorRef onCompleteNotificationsReceiver) { - return message => + var args = new SocketAsyncEventArgs(); + args.UserToken = onCompleteNotificationsReceiver; + args.Completed += (_, e) => { - if (message is Dns.Resolved resolved) - { - if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // multiple addresses - { - ReportConnectFailure(() => Register( - new IPEndPoint(resolved.Ipv4.FirstOrDefault(), remoteAddress.Port), - new IPEndPoint(resolved.Ipv6.FirstOrDefault(), remoteAddress.Port))); - } - else // only one address family. No fallbacks. - { - ReportConnectFailure(() => Register( - new IPEndPoint(resolved.Addr, remoteAddress.Port), - null)); - } - return true; - } - return false; + var actorRef = e.UserToken as IActorRef; + var completeMsg = ResolveMessage(e); + actorRef?.Tell(completeMsg); }; + + return args; + + Tcp.SocketCompleted ResolveMessage(SocketAsyncEventArgs e) + { + return e.LastOperation switch + { + SocketAsyncOperation.Connect => IO.Tcp.SocketConnected.Instance, + _ => throw new NotSupportedException($"Socket operation {e.LastOperation} is not supported") + }; + } } private void Register(IPEndPoint address, IPEndPoint fallbackAddress) @@ -160,72 +186,74 @@ private void Register(IPEndPoint address, IPEndPoint fallbackAddress) if (!Socket.ConnectAsync(_connectArgs)) Self.Tell(IO.Tcp.SocketConnected.Instance); - Become(Connecting(Tcp.Settings.FinishConnectRetries, _connectArgs, fallbackAddress)); + Become(() => Connecting(Settings.FinishConnectRetries, _connectArgs, fallbackAddress)); }); } - private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs args, IPEndPoint fallbackAddress) + private void Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs args, + IPEndPoint fallbackAddress) { - return message => + Receive(_ => { - if (message is Tcp.SocketConnected) + if (args.SocketError == SocketError.Success) { - if (args.SocketError == SocketError.Success) - { - if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); - Log.Debug("Connection established to [{0}]", _connect.RemoteAddress); + if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); + Log.Debug("Connection established to [{0}]", _connect.RemoteAddress); - ReleaseConnectionSocketArgs(); - AcquireSocketAsyncEventArgs(); + ReleaseConnectionSocketArgs(); - CompleteConnect(_commander, _connect.Options); - } - else if (remainingFinishConnectRetries > 0 && fallbackAddress != null) // used only when we've resolved a DNS endpoint. + CompleteConnect(_commander, _connect.Options); + } + else + switch (remainingFinishConnectRetries) { - var self = Self; - var previousAddress = (IPEndPoint)args.RemoteEndPoint; - args.RemoteEndPoint = fallbackAddress; - Context.System.Scheduler.Advanced.ScheduleOnce(TimeSpan.FromMilliseconds(1), () => + // used only when we've resolved a DNS endpoint. + case > 0 when fallbackAddress != null: { - if (!Socket.ConnectAsync(args)) - self.Tell(IO.Tcp.SocketConnected.Instance); - }); - Context.Become(Connecting(remainingFinishConnectRetries - 1, args, previousAddress)); - } - else if (remainingFinishConnectRetries > 0) - { - var self = Self; - Context.System.Scheduler.Advanced.ScheduleOnce(TimeSpan.FromMilliseconds(1), () => + var self = Self; + var previousAddress = (IPEndPoint)args.RemoteEndPoint; + args.RemoteEndPoint = fallbackAddress; + Context.System.Scheduler.Advanced.ScheduleOnce(TimeSpan.FromMilliseconds(1), () => + { + if (!Socket.ConnectAsync(args)) + self.Tell(IO.Tcp.SocketConnected.Instance); + }); + Become(() => Connecting(remainingFinishConnectRetries - 1, args, previousAddress)); + break; + } + case > 0: { - if (!Socket.ConnectAsync(args)) - self.Tell(IO.Tcp.SocketConnected.Instance); - }); - Context.Become(Connecting(remainingFinishConnectRetries - 1, args, null)); + var self = Self; + Context.System.Scheduler.Advanced.ScheduleOnce(TimeSpan.FromMilliseconds(1), () => + { + if (!Socket.ConnectAsync(args)) + self.Tell(IO.Tcp.SocketConnected.Instance); + }); + Become(() => Connecting(remainingFinishConnectRetries - 1, args, null)); + break; + } + default: + Log.Debug( + "Could not establish connection because finishConnect never returned true (consider increasing akka.io.tcp.finish-connect-retries)"); + Stop(_finishConnectNeverReturnedTrueException); + break; } - else - { - Log.Debug("Could not establish connection because finishConnect never returned true (consider increasing akka.io.tcp.finish-connect-retries)"); - Stop(finishConnectNeverReturnedTrueException); - } - return true; - } - if (message is ReceiveTimeout) - { - if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); // Clear the timeout - Log.Debug("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress); - Stop(new ConnectException($"Connect timeout of {_connect.Timeout} expired")); - return true; - } - return false; - }; + }); + Receive(_ => + { + if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); // Clear the timeout + Log.Debug("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress); + Stop(new ConnectException($"Connect timeout of {_connect.Timeout} expired")); + }); } } [InternalApi] public class ConnectException : Exception { - public ConnectException(string message) + public ConnectException(string message) : base(message) - { } + { + } } -} +} \ No newline at end of file diff --git a/src/core/Akka/IO/TcpSettings.cs b/src/core/Akka/IO/TcpSettings.cs index 33c19e0316c..c712d55e3bb 100644 --- a/src/core/Akka/IO/TcpSettings.cs +++ b/src/core/Akka/IO/TcpSettings.cs @@ -12,9 +12,9 @@ namespace Akka.IO { /// - /// TBD + /// Settings for Akka.IO.Tcp's outbound and inbound connection acvtors. /// - public class TcpSettings + public sealed record TcpSettings { /// /// Creates a new instance of class @@ -29,7 +29,7 @@ public static TcpSettings Create(ActorSystem system) ConfigurationException .NullOrEmptyConfig< TcpSettings>( - "akka.io.tcp"); //($"Failed to create {typeof(TcpSettings)}: akka.io.tcp configuration node not found"); + "akka.io.tcp"); return Create(config); } @@ -38,38 +38,72 @@ public static TcpSettings Create(ActorSystem system) /// Creates a new instance of class /// and fills it with values parsed from provided HOCON config. /// - /// TBD + /// The HOCON path that contains the `akka.io.tcp` section. public static TcpSettings Create(Config config) { if (config.IsNullOrEmpty()) throw ConfigurationException.NullOrEmptyConfig(); return new TcpSettings( - bufferPoolConfigPath: config.GetString("buffer-pool", "akka.io.tcp.disabled-buffer-pool"), - initialSocketAsyncEventArgs: config.GetInt("nr-of-socket-async-event-args", 32), traceLogging: config.GetBoolean("trace-logging", false), batchAcceptLimit: config.GetString("batch-accept-limit") == "scale-to-cpus" ? DefaultAcceptLimit : config.GetInt("batch-accept-limit", DefaultAcceptLimit), registerTimeout: config.GetTimeSpan("register-timeout", TimeSpan.FromSeconds(5)), - receivedMessageSizeLimit: config.GetString("max-received-message-size", "unlimited") == "unlimited" - ? int.MaxValue - : config.GetInt("max-received-message-size", 0), + maxFrameSizeBytes: (int)config.GetByteSize("maximum-frame-size", 4096).Value, + receiveBufferSize: (int)config.GetByteSize("receive-buffer-size", 8192).Value, + sendBufferSize: (int)config.GetByteSize("send-buffer-size", 8192).Value, managementDispatcher: config.GetString("management-dispatcher", "akka.actor.default-dispatcher"), - fileIoDispatcher: config.GetString("file-io-dispatcher", "akka.actor.default-dispatcher"), - transferToLimit: config.GetString("file-io-transferTo-limit", null) == "unlimited" - ? int.MaxValue - : config.GetInt("file-io-transferTo-limit", 512 * 1024), finishConnectRetries: config.GetInt("finish-connect-retries", 5), outgoingSocketForceIpv4: config.GetBoolean("outgoing-socket-force-ipv4", false), writeCommandsQueueMaxSize: config.GetInt("write-commands-queue-max-size", -1)); } + + + // private so we can change the constructor in the future + private TcpSettings( + bool traceLogging, + int batchAcceptLimit, + TimeSpan? registerTimeout, + int maxFrameSizeBytes, + int sendBufferSize, + int receiveBufferSize, + string managementDispatcher, + int finishConnectRetries, + bool outgoingSocketForceIpv4, + int writeCommandsQueueMaxSize) + { + TraceLogging = traceLogging; + BatchAcceptLimit = batchAcceptLimit; + RegisterTimeout = registerTimeout; + MaxFrameSizeBytes = maxFrameSizeBytes; + SendBufferSize = sendBufferSize; + ReceiveBufferSize = receiveBufferSize; + + // fail if send/receive buffer sizes are smaller than max frame size + if (SendBufferSize < MaxFrameSizeBytes) + throw new ArgumentException($"SendBufferSize ({SendBufferSize}) must be at least 2x the size of the maximum frame size ({MaxFrameSizeBytes})"); + if (ReceiveBufferSize < MaxFrameSizeBytes) + throw new ArgumentException($"ReceiveBufferSize ({ReceiveBufferSize}) must be at least 2x the size of the maximum frame size ({MaxFrameSizeBytes})"); + + // fail if the max frame size is negative + if (MaxFrameSizeBytes < 0) + throw new ArgumentException($"MaxFrameSizeBytes ({MaxFrameSizeBytes}) must be a positive number"); + + FinishConnectRetries = finishConnectRetries; + OutgoingSocketForceIpv4 = outgoingSocketForceIpv4; + WriteCommandsQueueMaxSize = writeCommandsQueueMaxSize; + ManagementDispatcher = managementDispatcher; + } + + /// /// Default size of the SAEA pool /// internal static readonly int DefaultAcceptLimit = Environment.ProcessorCount * 2; + [Obsolete("Many of these options are no longer used. Use the TcpSettings.Create method instead.")] public TcpSettings(string bufferPoolConfigPath, int initialSocketAsyncEventArgs, bool traceLogging, @@ -88,7 +122,12 @@ public TcpSettings(string bufferPoolConfigPath, TraceLogging = traceLogging; BatchAcceptLimit = batchAcceptLimit; RegisterTimeout = registerTimeout; - ReceivedMessageSizeLimit = receivedMessageSizeLimit; + MaxFrameSizeBytes = receivedMessageSizeLimit; + + // have to manually set these + SendBufferSize = receivedMessageSizeLimit * 2; + ReceiveBufferSize = receivedMessageSizeLimit * 2; + ManagementDispatcher = managementDispatcher; FileIODispatcher = fileIoDispatcher; TransferToLimit = transferToLimit; @@ -102,12 +141,14 @@ public TcpSettings(string bufferPoolConfigPath, /// Buffer pools are used to mitigate GC-pressure made by potential allocation /// and deallocation of byte buffers used for writing/receiving data from sockets. /// + [Obsolete("This property is unused")] public string BufferPoolConfigPath { get; } /// /// The initial number of SocketAsyncEventArgs to be preallocated. This value /// will grow infinitely if needed. /// + [Obsolete("This property is unused")] public int InitialSocketAsyncEventArgs { get; } /// @@ -115,20 +156,36 @@ public TcpSettings(string bufferPoolConfigPath, /// Be aware that this may log more than once per message sent to the /// actors of the tcp implementation. /// - public bool TraceLogging { get; } + public bool TraceLogging { get; init; } /// /// The maximum number of connection that are accepted in one go, higher /// numbers decrease latency, lower numbers increase fairness on the /// worker-dispatcher /// - public int BatchAcceptLimit { get; } + public int BatchAcceptLimit { get; init; } /// /// The duration a connection actor waits for a `Register` message from /// its commander before aborting the connection. /// - public TimeSpan? RegisterTimeout { get; } + public TimeSpan? RegisterTimeout { get; init; } + + /// + /// The maximum frame size we will accept when reading or writing to a socket. + /// + + public int MaxFrameSizeBytes { get; init; } + + /// + /// Should be at least 2x the size of the maximum frame size. + /// + public int ReceiveBufferSize { get; init; } + + /// + /// Should be at least 2x the size of the maximum frame size. + /// + public int SendBufferSize { get; init; } /// /// The maximum number of bytes delivered by a `Received` message. Before @@ -138,7 +195,8 @@ public TcpSettings(string bufferPoolConfigPath, /// configured receive buffer size. When using value 'unlimited' it will /// try to read all from the receive buffer. /// - public int ReceivedMessageSizeLimit { get; } + [Obsolete("This property is now MaxFrameSizeBytes")] + public long ReceivedMessageSizeLimit => MaxFrameSizeBytes; /// /// Fully qualified config path which holds the dispatcher configuration @@ -150,6 +208,7 @@ public TcpSettings(string bufferPoolConfigPath, /// Fully qualified config path which holds the dispatcher configuration /// on which file IO tasks are scheduled /// + [Obsolete("This property is unused")] public string FileIODispatcher { get; } /// @@ -160,6 +219,7 @@ public TcpSettings(string bufferPoolConfigPath, /// Decreasing the value may improve fairness while increasing may improve /// throughput. /// + [Obsolete("This property is unused")] public int TransferToLimit { get; set; } /// @@ -167,21 +227,24 @@ public TcpSettings(string bufferPoolConfigPath, /// OP_CONNECT. Retries are needed if the OP_CONNECT notification doesn't imply that /// `finishConnect` will succeed, which is the case on Android. /// - public int FinishConnectRetries { get; } + public int FinishConnectRetries { get; init; } /// /// Enforce outgoing socket connection to use IPv4 address family. Required in - /// scenario when IPv6 is not available, for example in Azure Web App sandbox. + /// a scenario when IPv6 is not available, for example in Azure Web App sandbox. /// When set to true it is required to set akka.io.dns.inet-address.use-ipv6 to false /// in cases when DnsEndPoint is used to describe the remote address /// - public bool OutgoingSocketForceIpv4 { get; } + public bool OutgoingSocketForceIpv4 { get; init; } /// /// Limits maximum size of internal queue, used in connection actor /// to store pending write commands. /// To allow unlimited size, set to -1. /// - public int WriteCommandsQueueMaxSize { get; } + /// + /// This setting defines the maximum number of messages, not the maximum size in bytes. + /// + public int WriteCommandsQueueMaxSize { get; init; } } } \ No newline at end of file