Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Akka.Remote;
using Akka.Remote.Serialization;
using Akka.Remote.Transport;
using Akka.Serialization;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Loggers;
using Google.Protobuf;
Expand All @@ -40,6 +41,7 @@ public class AkkaPduCodecBenchmark

private Address _addr1;
private Address _addr2;
private Information _addr2Info;
private AkkaPduProtobuffCodec _recvCodec;
private AkkaPduProtobuffCodec _sendCodec;

Expand All @@ -62,6 +64,7 @@ public async Task Setup()
_rarp = RARP.For(_sys1).Provider;
_addr1 = _rarp.DefaultAddress;
_addr2 = RARP.For(_sys2).Provider.DefaultAddress;
_addr2Info = new Information(_addr2, _sys2);

_senderActorRef =
_sys2.ActorOf(act => { act.ReceiveAny((_, context) => context.Sender.Tell(context.Sender)); },
Expand Down Expand Up @@ -188,7 +191,7 @@ public void DeserializePayloadOnly()
private ByteString CreatePayloadPdu()
{
return _sendCodec.ConstructPayload(_sendCodec.ConstructMessage(_remoteReceiveRef.LocalAddressToUse, _remoteReceiveRef,
MessageSerializer.Serialize(_sys2, _addr2, _message), _senderActorRef, null, _lastAck));
MessageSerializer.Serialize(_sys2, _addr2Info, _message), _senderActorRef, null, _lastAck));
}
}
}
3 changes: 2 additions & 1 deletion src/core/Akka.Remote.Tests/Serialization/BugFix5062Spec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public void Failed_serialization_should_give_proper_exception_message()
true);

var node1 = new Address("akka.tcp", "Sys", "localhost", 2551);
var serialized = MessageSerializer.Serialize((ExtendedActorSystem)Sys, node1, message);
var info = new Information(node1, Sys);
var serialized = MessageSerializer.Serialize((ExtendedActorSystem)Sys, info, message);

var o = new object();
o.Invoking(_ => MessageSerializer.Deserialize((ExtendedActorSystem)Sys, serialized)).Should()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ namespace Akka.Remote.Tests.Transport;
/// <summary>
/// Added this spec to prove the existence of https://github.com/akkadotnet/akka.net/issues/7378
/// </summary>
public class MultiTransportAddressingSpec : AkkaSpec
public class MultiTransportAddressingSpec : TestKit.Xunit2.TestKit
{
public MultiTransportAddressingSpec(ITestOutputHelper output) : base(GetConfig(Sys1Port1, Sys1Port2), output)
public MultiTransportAddressingSpec(ITestOutputHelper output) : base(GetConfig(Sys1Port1, Sys1Port2), "MultiTransportSpec", output)
{
}

Expand All @@ -30,7 +30,7 @@ public MultiTransportAddressingSpec(ITestOutputHelper output) : base(GetConfig(S
public const int Sys2Port1 = 9993;
public const int Sys2Port2 = 9994;

private static Config GetConfig(int transportPort1, int transportPort2)
private static Config GetConfig(int transportPort1, int transportPort2, string actorSystemName = "MultiTransportSpec")
{
return $$"""

Expand All @@ -45,15 +45,15 @@ private static Config GetConfig(int transportPort1, int transportPort2)
transport-class = "Akka.Remote.Transport.TestTransport, Akka.Remote"
applied-adapters = []
registry-key = aX33k0jWKg
local-address = "test1://MultiTransportSpec@localhost:{{transportPort1}}"
local-address = "test1://{{actorSystemName}}@localhost:{{transportPort1}}"
maximum-payload-bytes = 32000b
scheme-identifier = test1
}
test2 {
transport-class = "Akka.Remote.Transport.TestTransport, Akka.Remote"
applied-adapters = []
registry-key = aX33k0j11c
local-address = "test2://MultiTransportSpec@localhost:{{transportPort2}}"
local-address = "test2://{{actorSystemName}}@localhost:{{transportPort2}}"
maximum-payload-bytes = 32000b
scheme-identifier = test2
}
Expand All @@ -67,7 +67,8 @@ private static Config GetConfig(int transportPort1, int transportPort2)
[Fact]
public async Task Should_Use_Second_Transport_For_Communication()
{
var secondSystem = ActorSystem.Create("MultiTransportSpec", GetConfig(Sys2Port1, Sys2Port2).WithFallback(Sys.Settings.Config));
const string secondActorSystemName = "MultiTransportSpec2";
var secondSystem = ActorSystem.Create(secondActorSystemName, GetConfig(Sys2Port1, Sys2Port2, secondActorSystemName).WithFallback(Sys.Settings.Config));
InitializeLogger(secondSystem);
var assertProbe = CreateTestProbe(secondSystem);

Expand All @@ -87,11 +88,19 @@ public async Task Should_Use_Second_Transport_For_Communication()
Shutdown(secondSystem);
}

return;

async Task PingAndVerify(string scheme, int port)
{
var selection = Sys.ActorSelection($"akka.{scheme}://MultiTransportSpec@localhost:{port}/user/echo");
selection.Tell("ping", TestActor);

var selection = Sys.ActorSelection($"akka.{scheme}://{secondActorSystemName}@localhost:{port}/user/echo");

// important: https://github.com/akkadotnet/akka.net/issues/7378 only occurs with IActorRefs
var actor = await selection.ResolveOne(TimeSpan.FromSeconds(1));

// assert that the remote actor is using the correct transport
Assert.Contains(scheme, actor.Path.Address.Protocol);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently fails here - the IActorRef returned to the ResolveOne method belongs to transport 1 rather than transport 2.


actor.Tell("ping");
var reply = await ExpectMsgAsync<string>(TimeSpan.FromSeconds(3));
Assert.Equal("pong", reply);

Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ public EndpointWriter(
Inbound = handleOrActive != null;
_ackDeadline = NewAckDeadline();
_handle = handleOrActive;
_transportInformation = new Information(localAddress, Context.System);
_remoteMetrics = RemoteMetricsExtension.Create(Context.System.AsInstanceOf<ExtendedActorSystem>());

if (_handle == null)
Expand All @@ -1056,6 +1057,7 @@ public EndpointWriter(
}

private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Information _transportInformation;
private readonly int? _refuseUid;
private readonly AkkaPduCodec _codec;
private readonly IActorRef _reliableDeliverySupervisor;
Expand Down Expand Up @@ -1357,7 +1359,7 @@ private SerializedMessage SerializeMessage(object msg)
{
throw new EndpointException("Internal error: No handle was present during serialization of outbound message.");
}
return MessageSerializer.Serialize(_system, _handle.LocalAddress, msg);
return MessageSerializer.Serialize(_system, _transportInformation, msg);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the real fix - make sure we're always setting the current transport information to our transport before we perform outbound serialization. That way, any messages included inside the payloads get serialized onto the correct transport when we have multiple running.

}

private int _writeCount = 0;
Expand Down
20 changes: 11 additions & 9 deletions src/core/Akka.Remote/EndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -549,14 +549,14 @@ Directive Hopeless(HopelessAssociation e)
{
switch (e)
{
case HopelessAssociation h when h.Uid != null:
_log.Error(e.InnerException ?? e, "Association to [{0}] with UID [{1}] is irrecoverably failed. Quarantining address.", h.RemoteAddress, h.Uid);
case HopelessAssociation { Uid: not null }:
_log.Error(e.InnerException ?? e, "Association to [{0}] with UID [{1}] is irrecoverably failed. Quarantining address.", e.RemoteAddress, e.Uid);
if (_settings.QuarantineDuration.HasValue && _settings.QuarantineDuration != TimeSpan.MaxValue)
{
// have a finite quarantine duration specified in settings.
// If we don't have one specified, don't bother quarantining - it's disabled.
_endpoints.MarkAsQuarantined(h.RemoteAddress, h.Uid.Value, Deadline.Now + _settings.QuarantineDuration);
_eventPublisher.NotifyListeners(new QuarantinedEvent(h.RemoteAddress, h.Uid.Value));
_endpoints.MarkAsQuarantined(e.RemoteAddress, e.Uid.Value, Deadline.Now + _settings.QuarantineDuration);
_eventPublisher.NotifyListeners(new QuarantinedEvent(e.RemoteAddress, e.Uid.Value));
}

return Directive.Stop;
Expand Down Expand Up @@ -818,9 +818,6 @@ bool MatchesQuarantine(AkkaProtocolHandle handle)
Receive<Send>(send =>
{
var recipientAddress = send.Recipient.Path.Address;
IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpoint(recipientAddress,
CreateEndpoint(recipientAddress, send.Recipient.LocalAddressToUse, _transportMapping[send.Recipient.LocalAddressToUse],
_settings, writing: true, handleOption: null), uid: null);

// pattern match won't throw a NullReferenceException if one is returned by WritableEndpointWithPolicyFor
switch (_endpoints.WritableEndpointWithPolicyFor(recipientAddress))
Expand All @@ -841,6 +838,12 @@ IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpo
CreateAndRegisterWritingEndpoint().Tell(send);
break;
}

return;

IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpoint(recipientAddress,
CreateEndpoint(recipientAddress, send.Recipient.LocalAddressToUse, _transportMapping[send.Recipient.LocalAddressToUse],
_settings, writing: true, handleOption: null), uid: null);
});
Receive<InboundAssociation>(ia => HandleInboundAssociation(ia, false));
Receive<EndpointWriter.StoppedReading>(endpoint => AcceptPendingReader(endpoint.Writer));
Expand Down Expand Up @@ -894,8 +897,7 @@ IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpo
{
if (result.IsFaulted || result.IsCanceled)
{
if (result.Exception != null)
result.Exception.Handle(_ => true);
result.Exception?.Handle(_ => true);
return false;
}
return result.Result.All(x => x);
Expand Down
14 changes: 7 additions & 7 deletions src/core/Akka.Remote/MessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ internal static class MessageSerializer
/// <param name="system">The system.</param>
/// <param name="messageProtocol">The message protocol.</param>
/// <returns>System.Object.</returns>
public static object Deserialize(ExtendedActorSystem system, SerializedMessage messageProtocol)
public static object Deserialize(ExtendedActorSystem system,
SerializedMessage messageProtocol)
{
return system.Serialization.Deserialize(
messageProtocol.Message.ToByteArray(),
Expand All @@ -39,19 +40,18 @@ public static object Deserialize(ExtendedActorSystem system, SerializedMessage m
/// Serializes the specified message.
/// </summary>
/// <param name="system">The system.</param>
/// <param name="address">TBD</param>
/// <param name="transportInformation">The address for the current transport</param>
/// <param name="message">The message.</param>
/// <returns>SerializedMessage.</returns>
public static SerializedMessage Serialize(ExtendedActorSystem system, Address address, object message)
public static SerializedMessage Serialize(ExtendedActorSystem system, Information transportInformation,
object message)
{
var serializer = system.Serialization.FindSerializerFor(message);

var oldInfo = Akka.Serialization.Serialization.CurrentTransportInformation;
try
{
if (oldInfo == null)
Akka.Serialization.Serialization.CurrentTransportInformation =
system.Provider.SerializationInformation;
Akka.Serialization.Serialization.CurrentTransportInformation = transportInformation;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other part of the real fix - make sure we explicitly set the CurrentTransportInformation to whatever was passed in by the EndpointReader before we get going


var serializedMsg = new SerializedMessage
{
Expand Down Expand Up @@ -81,4 +81,4 @@ public static SerializedMessage Serialize(ExtendedActorSystem system, Address ad
}
}
}
}
}
Loading