Skip to content

Commit e8beec4

Browse files
authored
Fix StreamRefSerializer NRE bug (#7333)
* Fix StreamRefSerializer NRE bug * Update API Approval list
1 parent b47b922 commit e8beec4

File tree

4 files changed

+77
-35
lines changed

4 files changed

+77
-35
lines changed

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4753,6 +4753,7 @@ namespace Akka.Streams.Implementation.Stages
47534753
}
47544754
namespace Akka.Streams.Serialization
47554755
{
4756+
[System.Runtime.CompilerServices.NullableAttribute(0)]
47564757
public sealed class StreamRefSerializer : Akka.Serialization.SerializerWithStringManifest
47574758
{
47584759
public StreamRefSerializer(Akka.Actor.ExtendedActorSystem system) { }

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4727,6 +4727,7 @@ namespace Akka.Streams.Implementation.Stages
47274727
}
47284728
namespace Akka.Streams.Serialization
47294729
{
4730+
[System.Runtime.CompilerServices.NullableAttribute(0)]
47304731
public sealed class StreamRefSerializer : Akka.Serialization.SerializerWithStringManifest
47314732
{
47324733
public StreamRefSerializer(Akka.Actor.ExtendedActorSystem system) { }
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="StreamRefSerializer.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
// -----------------------------------------------------------------------
7+
8+
using System;
9+
using Akka.Serialization;
10+
using Akka.Streams.Implementation.StreamRef;
11+
using FluentAssertions;
12+
using Xunit;
13+
using Xunit.Abstractions;
14+
using static FluentAssertions.FluentActions;
15+
16+
namespace Akka.Streams.Tests.Serialization;
17+
18+
public class StreamRefSerializer: Akka.TestKit.Xunit2.TestKit
19+
{
20+
public StreamRefSerializer(ITestOutputHelper output)
21+
: base(ActorMaterializer.DefaultConfig(), nameof(StreamRefSerializer), output)
22+
{
23+
}
24+
25+
[Fact(DisplayName = "StreamRefSerializer should not throw NRE when configuration were set before ActorSystem started")]
26+
public void StreamsConfigBugTest()
27+
{
28+
var message = new SequencedOnNext(10, "test");
29+
var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerFor(message);
30+
var manifest = serializer.Manifest(message);
31+
32+
byte[] bytes = null;
33+
Invoking(() =>
34+
{
35+
bytes = serializer.ToBinary(message); // This throws an NRE in the bug
36+
}).Should().NotThrow<NullReferenceException>();
37+
38+
var deserialized = (SequencedOnNext) serializer.FromBinary(bytes, manifest);
39+
deserialized.SeqNr.Should().Be(message.SeqNr);
40+
deserialized.Payload.Should().Be(message.Payload);
41+
}
42+
}

src/core/Akka.Streams/Serialization/StreamRefSerializer.cs

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
using Akka.Actor;
1111
using Akka.Serialization;
1212
using Akka.Streams.Serialization.Proto.Msg;
13-
using Akka.Util;
1413
using Google.Protobuf;
1514
using Akka.Streams.Implementation.StreamRef;
1615
using CumulativeDemand = Akka.Streams.Implementation.StreamRef.CumulativeDemand;
@@ -19,12 +18,12 @@
1918
using RemoteStreamFailure = Akka.Streams.Implementation.StreamRef.RemoteStreamFailure;
2019
using SequencedOnNext = Akka.Streams.Implementation.StreamRef.SequencedOnNext;
2120

21+
#nullable enable
2222
namespace Akka.Streams.Serialization
2323
{
2424
public sealed class StreamRefSerializer : SerializerWithStringManifest
2525
{
2626
private readonly ExtendedActorSystem _system;
27-
private readonly Akka.Serialization.Serialization _serialization;
2827

2928
private const string SequencedOnNextManifest = "A";
3029
private const string CumulativeDemandManifest = "B";
@@ -37,52 +36,51 @@ public sealed class StreamRefSerializer : SerializerWithStringManifest
3736
public StreamRefSerializer(ExtendedActorSystem system) : base(system)
3837
{
3938
_system = system;
40-
_serialization = system.Serialization;
4139
}
4240

4341
public override string Manifest(object o)
4442
{
45-
switch (o)
43+
return o switch
4644
{
47-
case SequencedOnNext _: return SequencedOnNextManifest;
48-
case CumulativeDemand _: return CumulativeDemandManifest;
49-
case OnSubscribeHandshake _: return OnSubscribeHandshakeManifest;
50-
case RemoteStreamFailure _: return RemoteSinkFailureManifest;
51-
case RemoteStreamCompleted _: return RemoteSinkCompletedManifest;
52-
case SourceRefImpl _: return SourceRefManifest;
53-
case SinkRefImpl _: return SinkRefManifest;
54-
default: throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o));
55-
}
45+
SequencedOnNext => SequencedOnNextManifest,
46+
CumulativeDemand => CumulativeDemandManifest,
47+
OnSubscribeHandshake => OnSubscribeHandshakeManifest,
48+
RemoteStreamFailure => RemoteSinkFailureManifest,
49+
RemoteStreamCompleted => RemoteSinkCompletedManifest,
50+
SourceRefImpl => SourceRefManifest,
51+
SinkRefImpl => SinkRefManifest,
52+
_ => throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o))
53+
};
5654
}
5755

5856
public override byte[] ToBinary(object o)
5957
{
60-
switch (o)
58+
return o switch
6159
{
62-
case SequencedOnNext onNext: return SerializeSequencedOnNext(onNext).ToByteArray();
63-
case CumulativeDemand demand: return SerializeCumulativeDemand(demand).ToByteArray();
64-
case OnSubscribeHandshake handshake: return SerializeOnSubscribeHandshake(handshake).ToByteArray();
65-
case RemoteStreamFailure failure: return SerializeRemoteStreamFailure(failure).ToByteArray();
66-
case RemoteStreamCompleted completed: return SerializeRemoteStreamCompleted(completed).ToByteArray();
67-
case SourceRefImpl sourceRef: return SerializeSourceRef(sourceRef).ToByteArray();
68-
case SinkRefImpl sinkRef: return SerializeSinkRef(sinkRef).ToByteArray();
69-
default: throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o));
70-
}
60+
SequencedOnNext onNext => SerializeSequencedOnNext(onNext).ToByteArray(),
61+
CumulativeDemand demand => SerializeCumulativeDemand(demand).ToByteArray(),
62+
OnSubscribeHandshake handshake => SerializeOnSubscribeHandshake(handshake).ToByteArray(),
63+
RemoteStreamFailure failure => SerializeRemoteStreamFailure(failure).ToByteArray(),
64+
RemoteStreamCompleted completed => SerializeRemoteStreamCompleted(completed).ToByteArray(),
65+
SourceRefImpl sourceRef => SerializeSourceRef(sourceRef).ToByteArray(),
66+
SinkRefImpl sinkRef => SerializeSinkRef(sinkRef).ToByteArray(),
67+
_ => throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o))
68+
};
7169
}
7270

7371
public override object FromBinary(byte[] bytes, string manifest)
7472
{
75-
switch (manifest)
73+
return manifest switch
7674
{
77-
case SequencedOnNextManifest: return DeserializeSequenceOnNext(bytes);
78-
case CumulativeDemandManifest: return DeserializeCumulativeDemand(bytes);
79-
case OnSubscribeHandshakeManifest: return DeserializeOnSubscribeHandshake(bytes);
80-
case RemoteSinkFailureManifest: return DeserializeRemoteSinkFailure(bytes);
81-
case RemoteSinkCompletedManifest: return DeserializeRemoteSinkCompleted(bytes);
82-
case SourceRefManifest: return DeserializeSourceRef(bytes);
83-
case SinkRefManifest: return DeserializeSinkRef(bytes);
84-
default: throw new ArgumentException($"Unsupported manifest '{manifest}'", nameof(manifest));
85-
}
75+
SequencedOnNextManifest => DeserializeSequenceOnNext(bytes),
76+
CumulativeDemandManifest => DeserializeCumulativeDemand(bytes),
77+
OnSubscribeHandshakeManifest => DeserializeOnSubscribeHandshake(bytes),
78+
RemoteSinkFailureManifest => DeserializeRemoteSinkFailure(bytes),
79+
RemoteSinkCompletedManifest => DeserializeRemoteSinkCompleted(bytes),
80+
SourceRefManifest => DeserializeSourceRef(bytes),
81+
SinkRefManifest => DeserializeSinkRef(bytes),
82+
_ => throw new ArgumentException($"Unsupported manifest '{manifest}'", nameof(manifest))
83+
};
8684
}
8785

8886
private SinkRefImpl DeserializeSinkRef(byte[] bytes)
@@ -129,7 +127,7 @@ private SequencedOnNext DeserializeSequenceOnNext(byte[] bytes)
129127
{
130128
var onNext = Proto.Msg.SequencedOnNext.Parser.ParseFrom(bytes);
131129
var p = onNext.Payload;
132-
var payload = _serialization.Deserialize(
130+
var payload = system.Serialization.Deserialize(
133131
p.EnclosedMessage.ToByteArray(),
134132
p.SerializerId,
135133
p.MessageManifest?.ToStringUtf8());
@@ -169,7 +167,7 @@ private ByteString SerializeCumulativeDemand(CumulativeDemand demand) =>
169167
private ByteString SerializeSequencedOnNext(SequencedOnNext onNext)
170168
{
171169
var payload = onNext.Payload;
172-
var serializer = _serialization.FindSerializerFor(payload);
170+
var serializer = system.Serialization.FindSerializerFor(payload);
173171
var manifest = Akka.Serialization.Serialization.ManifestFor(serializer, payload);
174172

175173
var p = new Payload

0 commit comments

Comments
 (0)