Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
d453f94
Working on refactoring `TcpConnection`
Aaronontheweb May 13, 2025
03b27d8
removing more garbage
Aaronontheweb May 13, 2025
fc37584
fixed compilation errors
Aaronontheweb May 13, 2025
0b5d391
fixed all compilation errors
Aaronontheweb May 13, 2025
4901869
fixes
Aaronontheweb May 13, 2025
2e5fd78
fixed write-send loop
Aaronontheweb May 13, 2025
a75b0c8
suppressed dead letters on `SocketReceiveCompleted` and `SocketSendCo…
Aaronontheweb May 13, 2025
b5f0d5d
Merge branch 'dev' into akka-io-tcpconnection-redesign-2
Aaronontheweb May 13, 2025
1e8705a
fixed a number of unit tests
Aaronontheweb May 13, 2025
37b8d0a
need a solution for computing max frame size and max buffer size
Aaronontheweb May 13, 2025
49dced2
Merge branch 'dev' into akka-io-tcpconnection-redesign-2
Aaronontheweb May 14, 2025
187b267
redid `TcpSettings` to include max-frame-size and buffer sizes
Aaronontheweb May 15, 2025
5a7a24e
added API approvals
Aaronontheweb May 15, 2025
cd6c64c
ensure that `TcpSettings` get propagated
Aaronontheweb May 15, 2025
41af0d6
more cleanup and fixes
Aaronontheweb May 15, 2025
08fe864
Merge branch 'dev' into akka-io-tcpconnection-redesign-2
Aaronontheweb May 15, 2025
c02a9a1
Merge branch 'dev' into akka-io-tcpconnection-redesign-2
Aaronontheweb May 15, 2025
477ce05
Merge branch 'dev' into akka-io-tcpconnection-redesign-2
Aaronontheweb May 15, 2025
5940b72
fix compilation errors
Aaronontheweb May 15, 2025
2d44cc4
added max-frame-size enforcement
Aaronontheweb May 15, 2025
334a2e4
fixup
Aaronontheweb May 15, 2025
d74cbd7
added ability to override `TcpSettings` using new configuration types
Aaronontheweb May 15, 2025
7ce01a6
fixing tests
Aaronontheweb May 15, 2025
e205480
remove
Aaronontheweb May 15, 2025
e3611f8
improved error handling around socket reads - removed `_peerClosed` c…
Aaronontheweb May 16, 2025
231c521
fixed all `TcpIntegrationSpecs`
Aaronontheweb May 16, 2025
ea4294f
fixed most Akka.Streams.IO.Tcp specs
Aaronontheweb May 16, 2025
e06f81e
added `ConnectionState` to better organize termination conditions
Aaronontheweb May 16, 2025
284bbb4
fix
Aaronontheweb May 16, 2025
df52514
hardened receive / stop receiving
Aaronontheweb May 16, 2025
e96c539
cleanup
Aaronontheweb May 16, 2025
f8df406
v2 implementation
Aaronontheweb May 16, 2025
d8dd04a
fixed issues with `PeerClosed`
Aaronontheweb May 16, 2025
eb16a65
cleanup outgoing connection error handling
Aaronontheweb May 16, 2025
ab65ef7
stash
Aaronontheweb May 16, 2025
d8e580d
fixed issue with `Close` commands
Aaronontheweb May 16, 2025
87a0175
stash
Aaronontheweb May 16, 2025
79cf97e
give Akka.IO.Tcp actors clearer role-based names in logs
Aaronontheweb May 17, 2025
db0eaf0
reintroduced pull mode
Aaronontheweb May 17, 2025
6589ae2
fixed issue with missing messages during `Tcp.Close` with writes pending
Aaronontheweb May 17, 2025
5ad22b4
fixed logic around flushing all writes
Aaronontheweb May 17, 2025
6f1668a
fixed issues with detecting half-closed connections
Aaronontheweb May 17, 2025
edbca26
fixed `Outgoing_TCP_stream_must_handle_when_connection_actor_terminat…
Aaronontheweb May 17, 2025
b7aa8dd
fixed issues with "pending half closures"
Aaronontheweb May 17, 2025
cd7814b
fixed issue with `_closeEvent` being overwritten
Aaronontheweb May 17, 2025
b601351
cleaned up XML-DOC
Aaronontheweb May 17, 2025
af4973e
simplified `TcpConnection` state and behavior
Aaronontheweb May 17, 2025
3c40fa1
fixed a number of specs
Aaronontheweb May 17, 2025
4c7be29
all Akka.Streams.IO.Tcp specs now pass
Aaronontheweb May 17, 2025
f7e7d98
removed unused `ConnectionState` fields
Aaronontheweb May 17, 2025
b4c5fa3
added default `_closeInformation` so socket closures can never no-op
Aaronontheweb May 17, 2025
5fa23bf
fixed issue with `ExpectReceivedDataAsync`
Aaronontheweb May 17, 2025
f15802a
fixed partial write offset handling
Aaronontheweb May 17, 2025
0c00783
fixed a bug with writing being enabled right away
Aaronontheweb May 17, 2025
8413020
cleaned up `Should_fail_writing_when_buffer_is_filled` spec
Aaronontheweb May 17, 2025
9e9cd2c
fixed issue with reads starting too early in pull mode
Aaronontheweb May 17, 2025
4237808
added logging support for `sys2` in `Outgoing_TCP_stream_must_not_thr…
Aaronontheweb May 17, 2025
3ee94cb
fixed batching code
Aaronontheweb May 17, 2025
547474a
added support for all `WriteCommand` implementations
Aaronontheweb May 17, 2025
fbae5d6
removed recursion
Aaronontheweb May 17, 2025
c4efa93
cleanup
Aaronontheweb May 17, 2025
5974533
api approvals
Aaronontheweb May 17, 2025
ad63320
fixed `Write_before_Register_should_not_be_silently_dropped`
Aaronontheweb May 19, 2025
30e5a28
Update `ConnectionSourceStageLogic` to buffer pending connections
Aaronontheweb May 19, 2025
24eb50b
Merge branch 'dev' into akka-io-tcpconnection-redesign-2
Aaronontheweb May 19, 2025
e3331ee
fixes found during review
Aaronontheweb May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2269,6 +2269,7 @@ namespace Akka.Configuration
public virtual System.Collections.Generic.IList<bool> GetBooleanList(string path) { }
public virtual System.Collections.Generic.IList<byte> GetByteList(string path) { }
public virtual System.Nullable<long> GetByteSize(string path) { }
[return: System.Diagnostics.CodeAnalysis.NotNullIfNotNullAttribute("def")]
public virtual System.Nullable<long> GetByteSize(string path, System.Nullable<long> def = null) { }
public virtual Akka.Configuration.Config GetConfig(string path) { }
public virtual decimal GetDecimal(string path, [System.Runtime.CompilerServices.DecimalConstantAttribute(0, 0, 0u, 0u, 0u)] decimal @default) { }
Expand Down Expand Up @@ -4001,6 +4002,8 @@ namespace Akka.IO
public System.Net.EndPoint LocalAddress { get; }
public System.Collections.Generic.IEnumerable<Akka.IO.Inet.SocketOption> Options { get; }
public bool PullMode { get; }
[System.Runtime.CompilerServices.NullableAttribute(2)]
public Akka.IO.TcpSettings TcpSettings { get; set; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Customize TcpSettings on per-Bind basis. Set as a mutable property since I didn't want to manipulate the CTOR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to duplicate the constructor and replace the old SocketOption with TcpSettings instead? We can then mark the old .ctor as obsolete

public override string ToString() { }
}
public sealed class Bound : Akka.IO.Tcp.Event
Expand Down Expand Up @@ -4043,6 +4046,7 @@ namespace Akka.IO
public sealed class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
{
public CompoundWrite(Akka.IO.Tcp.SimpleWriteCommand head, Akka.IO.Tcp.WriteCommand tailCommand) { }
public override long Bytes { get; }
public Akka.IO.Tcp.SimpleWriteCommand Head { get; }
public Akka.IO.Tcp.WriteCommand TailCommand { get; }
public System.Collections.Generic.IEnumerator<Akka.IO.Tcp.SimpleWriteCommand> GetEnumerator() { }
Expand All @@ -4065,6 +4069,8 @@ namespace Akka.IO
public System.Collections.Generic.IEnumerable<Akka.IO.Inet.SocketOption> Options { get; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These really need to be obsoleted and deleted

public bool PullMode { get; }
public System.Net.EndPoint RemoteAddress { get; }
[System.Runtime.CompilerServices.NullableAttribute(2)]
public Akka.IO.TcpSettings TcpSettings { get; set; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Customize TcpSettings on per-Connect basis

public System.Nullable<System.TimeSpan> Timeout { get; }
public override string ToString() { }
}
Expand Down Expand Up @@ -4184,6 +4190,7 @@ namespace Akka.IO
{
public static readonly Akka.IO.Tcp.Write Empty;
public override Akka.IO.Tcp.Event Ack { get; }
public override long Bytes { get; }
public Akka.IO.ByteString Data { get; }
public static Akka.IO.Tcp.Write Create(Akka.IO.ByteString data) { }
public static Akka.IO.Tcp.Write Create(Akka.IO.ByteString data, Akka.IO.Tcp.Event ack) { }
Expand All @@ -4192,6 +4199,7 @@ namespace Akka.IO
public abstract class WriteCommand : Akka.IO.Tcp.Command
{
protected WriteCommand() { }
public abstract long Bytes { get; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New field so we can instantly measure how long our output is - need this for buffer-overflow checking

public static Akka.IO.Tcp.WriteCommand Create(System.Collections.Generic.IEnumerable<Akka.IO.Tcp.WriteCommand> writes) { }
public static Akka.IO.Tcp.WriteCommand Create(params WriteCommand[] writes) { }
public Akka.IO.Tcp.CompoundWrite Prepend(Akka.IO.Tcp.SimpleWriteCommand other) { }
Expand All @@ -4205,7 +4213,6 @@ namespace Akka.IO
public sealed class TcpExt : Akka.IO.IOExtension
{
public TcpExt(Akka.Actor.ExtendedActorSystem system) { }
public Akka.IO.Buffers.IBufferPool BufferPool { get; }
public override Akka.Actor.IActorRef Manager { get; }
public Akka.IO.TcpSettings Settings { get; }
}
Expand All @@ -4231,21 +4238,31 @@ namespace Akka.IO
public static Akka.IO.Tcp.Command Unbind() { }
public static Akka.IO.Tcp.Command Write(Akka.IO.ByteString data, Akka.IO.Tcp.Event ack = null) { }
}
public class TcpSettings
public sealed class TcpSettings : System.IEquatable<Akka.IO.TcpSettings>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now a record type

{
[System.ObsoleteAttribute("Many of these options are no longer used. Use the TcpSettings.Create method inste" +
"ad.")]
public TcpSettings(string bufferPoolConfigPath, int initialSocketAsyncEventArgs, bool traceLogging, int batchAcceptLimit, System.Nullable<System.TimeSpan> registerTimeout, int receivedMessageSizeLimit, string managementDispatcher, string fileIoDispatcher, int transferToLimit, int finishConnectRetries, bool outgoingSocketForceIpv4, int writeCommandsQueueMaxSize) { }
public int BatchAcceptLimit { get; }
public int BatchAcceptLimit { get; set; }
[System.ObsoleteAttribute("This property is unused")]
public string BufferPoolConfigPath { get; }
[System.ObsoleteAttribute("This property is unused")]
public string FileIODispatcher { get; }
public int FinishConnectRetries { get; }
public int FinishConnectRetries { get; set; }
[System.ObsoleteAttribute("This property is unused")]
public int InitialSocketAsyncEventArgs { get; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use this setting anymore - the BatchAcceptLimit does this and the value will not change at any point during execution.

public string ManagementDispatcher { get; }
public bool OutgoingSocketForceIpv4 { get; }
public int ReceivedMessageSizeLimit { get; }
public System.Nullable<System.TimeSpan> RegisterTimeout { get; }
public bool TraceLogging { get; }
public int MaxFrameSizeBytes { get; set; }
public bool OutgoingSocketForceIpv4 { get; set; }
public int ReceiveBufferSize { get; set; }
[System.ObsoleteAttribute("This property is now MaxFrameSizeBytes")]
public long ReceivedMessageSizeLimit { get; }
public System.Nullable<System.TimeSpan> RegisterTimeout { get; set; }
public int SendBufferSize { get; set; }
public bool TraceLogging { get; set; }
[System.ObsoleteAttribute("This property is unused")]
public int TransferToLimit { get; set; }
public int WriteCommandsQueueMaxSize { get; }
public int WriteCommandsQueueMaxSize { get; set; }
public static Akka.IO.TcpSettings Create(Akka.Actor.ActorSystem system) { }
public static Akka.IO.TcpSettings Create(Akka.Configuration.Config config) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3991,6 +3991,8 @@ namespace Akka.IO
public System.Net.EndPoint LocalAddress { get; }
public System.Collections.Generic.IEnumerable<Akka.IO.Inet.SocketOption> Options { get; }
public bool PullMode { get; }
[System.Runtime.CompilerServices.NullableAttribute(2)]
public Akka.IO.TcpSettings TcpSettings { get; set; }
public override string ToString() { }
}
public sealed class Bound : Akka.IO.Tcp.Event
Expand Down Expand Up @@ -4033,6 +4035,7 @@ namespace Akka.IO
public sealed class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
{
public CompoundWrite(Akka.IO.Tcp.SimpleWriteCommand head, Akka.IO.Tcp.WriteCommand tailCommand) { }
public override long Bytes { get; }
public Akka.IO.Tcp.SimpleWriteCommand Head { get; }
public Akka.IO.Tcp.WriteCommand TailCommand { get; }
public System.Collections.Generic.IEnumerator<Akka.IO.Tcp.SimpleWriteCommand> GetEnumerator() { }
Expand All @@ -4055,6 +4058,8 @@ namespace Akka.IO
public System.Collections.Generic.IEnumerable<Akka.IO.Inet.SocketOption> Options { get; }
public bool PullMode { get; }
public System.Net.EndPoint RemoteAddress { get; }
[System.Runtime.CompilerServices.NullableAttribute(2)]
public Akka.IO.TcpSettings TcpSettings { get; set; }
public System.Nullable<System.TimeSpan> Timeout { get; }
public override string ToString() { }
}
Expand Down Expand Up @@ -4174,6 +4179,7 @@ namespace Akka.IO
{
public static readonly Akka.IO.Tcp.Write Empty;
public override Akka.IO.Tcp.Event Ack { get; }
public override long Bytes { get; }
public Akka.IO.ByteString Data { get; }
public static Akka.IO.Tcp.Write Create(Akka.IO.ByteString data) { }
public static Akka.IO.Tcp.Write Create(Akka.IO.ByteString data, Akka.IO.Tcp.Event ack) { }
Expand All @@ -4182,6 +4188,7 @@ namespace Akka.IO
public abstract class WriteCommand : Akka.IO.Tcp.Command
{
protected WriteCommand() { }
public abstract long Bytes { get; }
public static Akka.IO.Tcp.WriteCommand Create(System.Collections.Generic.IEnumerable<Akka.IO.Tcp.WriteCommand> writes) { }
public static Akka.IO.Tcp.WriteCommand Create(params WriteCommand[] writes) { }
public Akka.IO.Tcp.CompoundWrite Prepend(Akka.IO.Tcp.SimpleWriteCommand other) { }
Expand All @@ -4195,7 +4202,6 @@ namespace Akka.IO
public sealed class TcpExt : Akka.IO.IOExtension
{
public TcpExt(Akka.Actor.ExtendedActorSystem system) { }
public Akka.IO.Buffers.IBufferPool BufferPool { get; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used

public override Akka.Actor.IActorRef Manager { get; }
public Akka.IO.TcpSettings Settings { get; }
}
Expand All @@ -4221,21 +4227,31 @@ namespace Akka.IO
public static Akka.IO.Tcp.Command Unbind() { }
public static Akka.IO.Tcp.Command Write(Akka.IO.ByteString data, Akka.IO.Tcp.Event ack = null) { }
}
public class TcpSettings
public sealed class TcpSettings : System.IEquatable<Akka.IO.TcpSettings>
{
[System.ObsoleteAttribute("Many of these options are no longer used. Use the TcpSettings.Create method inste" +
"ad.")]
public TcpSettings(string bufferPoolConfigPath, int initialSocketAsyncEventArgs, bool traceLogging, int batchAcceptLimit, System.Nullable<System.TimeSpan> registerTimeout, int receivedMessageSizeLimit, string managementDispatcher, string fileIoDispatcher, int transferToLimit, int finishConnectRetries, bool outgoingSocketForceIpv4, int writeCommandsQueueMaxSize) { }
public int BatchAcceptLimit { get; }
public int BatchAcceptLimit { get; set; }
[System.ObsoleteAttribute("This property is unused")]
public string BufferPoolConfigPath { get; }
[System.ObsoleteAttribute("This property is unused")]
public string FileIODispatcher { get; }
public int FinishConnectRetries { get; }
public int FinishConnectRetries { get; set; }
[System.ObsoleteAttribute("This property is unused")]
public int InitialSocketAsyncEventArgs { get; }
public string ManagementDispatcher { get; }
public bool OutgoingSocketForceIpv4 { get; }
public int ReceivedMessageSizeLimit { get; }
public System.Nullable<System.TimeSpan> RegisterTimeout { get; }
public bool TraceLogging { get; }
public int MaxFrameSizeBytes { get; set; }
public bool OutgoingSocketForceIpv4 { get; set; }
public int ReceiveBufferSize { get; set; }
[System.ObsoleteAttribute("This property is now MaxFrameSizeBytes")]
public long ReceivedMessageSizeLimit { get; }
public System.Nullable<System.TimeSpan> RegisterTimeout { get; set; }
public int SendBufferSize { get; set; }
public bool TraceLogging { get; set; }
[System.ObsoleteAttribute("This property is unused")]
public int TransferToLimit { get; set; }
public int WriteCommandsQueueMaxSize { get; }
public int WriteCommandsQueueMaxSize { get; set; }
public static Akka.IO.TcpSettings Create(Akka.Actor.ActorSystem system) { }
public static Akka.IO.TcpSettings Create(Akka.Configuration.Config config) { }
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/IO/TcpHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,9 @@ await _connectionProbe.FishForMessageAsync(

public async Task ExpectTerminatedAsync(CancellationToken cancellationToken = default)
{
_connectionProbe.Watch(_connectionActor);
await _connectionProbe.WatchAsync(_connectionActor);
await _connectionProbe.ExpectTerminatedAsync(_connectionActor, cancellationToken: cancellationToken);
_connectionProbe.Unwatch(_connectionActor);
await _connectionProbe.UnwatchAsync(_connectionActor);
}
}

Expand Down
28 changes: 16 additions & 12 deletions src/core/Akka.Streams.Tests/IO/TcpSpec.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//-----------------------------------------------------------------------
//-----------------------------------------------------------------------
// <copyright file="TcpSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
Expand Down Expand Up @@ -29,9 +29,11 @@ namespace Akka.Streams.Tests.IO
{
public class TcpSpec : TcpHelper
{
public TcpSpec(ITestOutputHelper helper) : base(@"
akka.loglevel = DEBUG
akka.stream.materializer.subscription-timeout.timeout = 2s", helper)
public TcpSpec(ITestOutputHelper helper) : base("""
akka.io.tcp.trace-logging = on
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turned trace logging on here to help me debug some issues with the design

akka.loglevel = DEBUG
akka.stream.materializer.subscription-timeout.timeout = 2s
""", helper)
{
}

Expand All @@ -40,7 +42,7 @@ public async Task Outgoing_TCP_stream_must_work_in_the_happy_case()
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var testData = ByteString.FromBytes(new byte[] {1, 2, 3, 4, 5});
var testData = ByteString.FromBytes([1, 2, 3, 4, 5]);

var server = await new Server(this).InitializeAsync();
var tcpReadProbe = new TcpReadProbe(this);
Expand All @@ -65,7 +67,7 @@ await ValidateServerClientCommunicationAsync(testData, serverConnection, tcpRead
public async Task Outgoing_TCP_stream_must_be_able_to_write_a_sequence_of_ByteStrings()
{
var server = await new Server(this).InitializeAsync();
var testInput = Enumerable.Range(0, 256).Select(i => ByteString.FromBytes(new byte[] {Convert.ToByte(i)}));
var testInput = Enumerable.Range(0, 256).Select(i => ByteString.FromBytes([Convert.ToByte(i)]));
var expectedOutput = ByteString.FromBytes(Enumerable.Range(0, 256).Select(Convert.ToByte).ToArray());

Source.From(testInput)
Expand All @@ -86,7 +88,7 @@ public async Task Outgoing_TCP_stream_must_be_able_to_read_a_sequence_of_ByteStr
var testOutput = new byte[255];
for (byte i = 0; i < 255; i++)
{
testInput[i] = ByteString.FromBytes(new [] {i});
testInput[i] = ByteString.FromBytes([i]);
testOutput[i] = i;
}

Expand All @@ -103,6 +105,7 @@ public async Task Outgoing_TCP_stream_must_be_able_to_read_a_sequence_of_ByteStr
serverConnection.Write(input);

serverConnection.ConfirmedClose();
// Reduced timeout - otherwise we're just waiting longer for the failure
var result = await resultFuture.ShouldCompleteWithin(3.Seconds());
result.ShouldBe(expectedOutput);
}
Expand Down Expand Up @@ -132,7 +135,7 @@ public async Task Outgoing_TCP_stream_must_work_when_client_closes_write_then_re
{
await this.AssertAllStagesStoppedAsync(async () =>
{
var testData = ByteString.FromBytes(new byte[] { 1, 2, 3, 4, 5 });
var testData = ByteString.FromBytes([1, 2, 3, 4, 5]);
var server = await new Server(this).InitializeAsync();

var tcpWriteProbe = new TcpWriteProbe(this);
Expand Down Expand Up @@ -466,7 +469,7 @@ public async Task Outgoing_TCP_stream_must_Echo_should_work_even_if_server_is_in
.Run(Materializer);

var result = await Source.From(Enumerable.Repeat(0, 1000)
.Select(i => ByteString.FromBytes(new byte[] { Convert.ToByte(i) })))
.Select(i => ByteString.FromBytes([Convert.ToByte(i)])))
.Via(Sys.TcpStream().OutgoingConnection(serverAddress, halfClose: true))
.RunAggregate(0, (i, s) => i + s.Count, Materializer).ShouldCompleteWithin(10.Seconds());

Expand Down Expand Up @@ -503,7 +506,7 @@ await WithinAsync(TimeSpan.FromSeconds(15), async () =>
await AwaitAssertAsync(async () =>
{
// Getting rid of existing connection actors by using a blunt instrument
system2.ActorSelection(system2.Tcp().Path / "$a" / "*").Tell(Kill.Instance);
system2.ActorSelection(system2.Tcp().Path / "tcp-client-connection-*").Tell(Kill.Instance);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

akka.io.tcp actors now get clear names, so that broke the functionality of this test which relied on guessing the randomly assigned actor path name


await result.ShouldCompleteWithin(3.Seconds());
}, interval:TimeSpan.FromSeconds(4));
Expand All @@ -525,6 +528,7 @@ await AwaitAssertAsync(async () =>
public async Task Outgoing_TCP_stream_must_not_thrown_on_unbind_after_system_has_been_shut_down()
{
var sys2 = ActorSystem.Create("shutdown-test-system", Sys.Settings.Config);
InitializeLogger(sys2);

try
{
Expand Down Expand Up @@ -573,7 +577,7 @@ public async Task Tcp_listen_stream_must_be_able_to_implement_echo()
var binding = await bindTask.ShouldCompleteWithin(3.Seconds());

var testInput = Enumerable.Range(0, 255)
.Select(i => ByteString.FromBytes(new byte[] { Convert.ToByte(i) }))
.Select(i => ByteString.FromBytes([Convert.ToByte(i)]))
.ToList();

var expectedOutput = testInput.Aggregate(ByteString.Empty, (agg, b) => agg.Concat(b));
Expand Down Expand Up @@ -603,7 +607,7 @@ public async Task Tcp_listen_stream_must_work_with_a_chain_of_echoes()
var echoConnection = Sys.TcpStream().OutgoingConnection(serverAddress);

var testInput = Enumerable.Range(0, 255)
.Select(i => ByteString.FromBytes(new byte[] { Convert.ToByte(i) }))
.Select(i => ByteString.FromBytes([Convert.ToByte(i)]))
.ToList();

var expectedOutput = testInput.Aggregate(ByteString.Empty, (agg, b) => agg.Concat(b));
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Streams/Dsl/Tcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,14 @@ public TcpExt(ExtendedActorSystem system)
/// <param name="idleTimeout">TBD</param>
/// <exception cref="ArgumentException">TBD</exception>
/// <returns>TBD</returns>
// TODO: this really needs to be an async method
public Source<Tcp.IncomingConnection, Task<Tcp.ServerBinding>> Bind(string host, int port, int backlog = 100,
IImmutableList<Inet.SocketOption> options = null, bool halfClose = false, TimeSpan? idleTimeout = null)
{
// DnsEndpoint isn't allowed
var ipAddresses = System.Net.Dns.GetHostAddressesAsync(host).Result;
if (ipAddresses.Length == 0)
throw new ArgumentException($"Couldn't resolve IpAdress for host {host}", nameof(host));
throw new ArgumentException($"Couldn't resolve IpAddress for host {host}", nameof(host));

return Source.FromGraph(new ConnectionSourceStage(_system.Tcp(), new IPEndPoint(ipAddresses[0], port), backlog,
options, halfClose, idleTimeout, BindShutdownTimeout));
Expand Down
Loading
Loading