Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 77 additions & 1 deletion src/core/Akka.Streams.Tests/Dsl/StreamRefsSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,86 @@ public void SinkRef_must_not_allow_materializing_multiple_times()
var p1 = this.SourceProbe<string>().To(sinkRef.Sink).Run(Materializer);
p1.EnsureSubscription();
var req = p1.ExpectRequest();

var p2 = this.SourceProbe<string>().To(sinkRef.Sink).Run(Materializer);
p2.EnsureSubscription(); // will be cancelled immediately, since it's 2nd
p2.ExpectCancellation();
}

[Fact]
public async Task SourceRef_Source_property_should_be_idempotent_issue_7895()
{
// Reproduction test for issue #7895: https://github.com/akkadotnet/akka.net/issues/7895
// The .Source property creates a new SourceRefStageImpl on every access,
// which is not idempotent behavior and can cause intermittent subscription timeouts

// Create a SourceRef
var sourceRef = await Source.From(new[] { 1, 2, 3 })
.ToMaterialized(StreamRefs.SourceRef<int>(), Keep.Right)
.Run(Materializer);

// Access .Source property twice (simulates multiple accesses)
// This could happen via debugger inspection, logging, serialization, etc.
var source1 = sourceRef.Source;
var source2 = sourceRef.Source;

// BUG: They're NOT the same object (non-idempotent behavior)
// Each property access creates a new Source with a new SourceRefStageImpl
// When fixed, this assertion should PASS with ReferenceEquals(source1, source2) == true
ReferenceEquals(source1, source2).Should().BeTrue(
"Source property should be idempotent and return the same instance");
}

[Fact]
public async Task SourceRef_multiple_materializations_cause_timeout_issue_7895()
{
// Reproduction test for issue #7895: https://github.com/akkadotnet/akka.net/issues/7895
// This test demonstrates the race condition from multiple .Source property accesses
// Multiple .Source property accesses create racing SourceRefStageImpl instances

// Create a SourceRef with short timeout
var sourceRef = await Source.From(Enumerable.Range(1, 100))
.ToMaterialized(StreamRefs.SourceRef<int>(), Keep.Right)
.WithAttributes(StreamRefAttributes.CreateSubscriptionTimeout(TimeSpan.FromSeconds(3)))
.Run(Materializer);

// Access .Source twice - creates TWO SourceRefStageImpl instances
var source1 = sourceRef.Source;
var source2 = sourceRef.Source;

// Materialize both - they race for the same SinkRef handshake
var task1 = source1.RunWith(Sink.Seq<int>(), Materializer);
var task2 = source2.RunWith(Sink.Seq<int>(), Materializer);

// Wait for both with timeout protection
var allTasks = Task.WhenAll(
task1.ContinueWith(t => t),
task2.ContinueWith(t => t)
);

try
{
await allTasks;
}
catch
{
// Expected: at least one should fail
}

// Check results - at least one should have failed/timed out
var results = new[] { task1, task2 };
var completedCount = results.Count(t => t.Status == TaskStatus.RanToCompletion);
var faultedCount = results.Count(t => t.Status == TaskStatus.Faulted);

// Due to race condition: sometimes both fail, sometimes one succeeds
(completedCount + faultedCount).Should().Be(2, "Both tasks should have completed or faulted");

// At least one should have issues due to duplicate stage instances
if (faultedCount > 0)
{
var failedTask = results.First(t => t.Status == TaskStatus.Faulted);
failedTask.Exception.InnerException.Should().BeOfType<RemoteStreamRefActorTerminatedException>();
}
}
}
}
14 changes: 11 additions & 3 deletions src/core/Akka.Streams/Implementation/StreamRef/SinkRefImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,19 @@ protected SinkRefImpl(IActorRef initialPartnerRef)
[InternalApi]
internal sealed class SinkRefImpl<T> : SinkRefImpl, ISinkRef<T>
{
public SinkRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) { }
private readonly Lazy<Sink<T, NotUsed>> _sink;

public SinkRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef)
{
_sink = new Lazy<Sink<T, NotUsed>>(() =>
Dsl.Sink.FromGraph(new SinkRefStageImpl<T>(InitialPartnerRef))
.MapMaterializedValue(_ => NotUsed.Instance));
}

public override Type EventType => typeof(T);
public override ISurrogate ToSurrogate(ActorSystem system) => SerializationTools.ToSurrogate(this);
public Sink<T, NotUsed> Sink => _sink.Value;

public Sink<T, NotUsed> Sink => Dsl.Sink.FromGraph(new SinkRefStageImpl<T>(InitialPartnerRef)).MapMaterializedValue(_ => NotUsed.Instance);
public override ISurrogate ToSurrogate(ActorSystem system) => SerializationTools.ToSurrogate(this);
}

/// <summary>
Expand Down
15 changes: 11 additions & 4 deletions src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,20 @@ protected SourceRefImpl(IActorRef initialPartnerRef)
/// <summary>
/// INTERNAL API: Implementation class, not intended to be touched directly by end-users.
/// </summary>
[InternalApi]
[InternalApi]
internal sealed class SourceRefImpl<T> : SourceRefImpl, ISourceRef<T>
{
public SourceRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) { }
private readonly Lazy<Source<T, NotUsed>> _source;

public SourceRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef)
{
_source = new Lazy<Source<T, NotUsed>>(() =>
Dsl.Source.FromGraph(new SourceRefStageImpl<T>(InitialPartnerRef))
.MapMaterializedValue(_ => NotUsed.Instance));
}

public override Type EventType => typeof(T);
public Source<T, NotUsed> Source =>
Dsl.Source.FromGraph(new SourceRefStageImpl<T>(InitialPartnerRef)).MapMaterializedValue(_ => NotUsed.Instance);
public Source<T, NotUsed> Source => _source.Value;

public override ISurrogate ToSurrogate(ActorSystem system) => SerializationTools.ToSurrogate(this);
}
Expand Down
Loading