From f97f7032ce69ad56007e744a1d24b0e5a2974584 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 13 Oct 2025 12:15:51 -0500 Subject: [PATCH 1/2] Add reproduction tests for SourceRef.Source non-idempotent property bug (#7895) Added two tests to demonstrate the bug where ISourceRef.Source property creates a new SourceRefStageImpl instance on every access instead of being idempotent: 1. SourceRef_Source_property_should_be_idempotent_issue_7895 - Verifies that multiple .Source property accesses should return the same instance (currently fails - demonstrates the bug exists) - Tested 25 times with 100% failure rate, proving consistent reproduction 2. SourceRef_multiple_materializations_cause_timeout_issue_7895 - Demonstrates the race condition when multiple SourceRefStageImpl instances try to connect to the same SinkRef - Shows intermittent timeouts and failures due to handshake conflicts These tests will pass once the Source property is made idempotent by caching the created Source instance. Issue: #7895 --- .../Akka.Streams.Tests/Dsl/StreamRefsSpec.cs | 78 ++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/StreamRefsSpec.cs b/src/core/Akka.Streams.Tests/Dsl/StreamRefsSpec.cs index 4907c4f7c81..9fdd34050a3 100644 --- a/src/core/Akka.Streams.Tests/Dsl/StreamRefsSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/StreamRefsSpec.cs @@ -478,10 +478,86 @@ public void SinkRef_must_not_allow_materializing_multiple_times() var p1 = this.SourceProbe().To(sinkRef.Sink).Run(Materializer); p1.EnsureSubscription(); var req = p1.ExpectRequest(); - + var p2 = this.SourceProbe().To(sinkRef.Sink).Run(Materializer); p2.EnsureSubscription(); // will be cancelled immediately, since it's 2nd p2.ExpectCancellation(); } + + [Fact] + public async Task SourceRef_Source_property_should_be_idempotent_issue_7895() + { + // Reproduction test for issue #7895: https://github.com/akkadotnet/akka.net/issues/7895 + // The .Source property creates a new SourceRefStageImpl on every access, + // which is not idempotent behavior and can cause intermittent subscription timeouts + + // Create a SourceRef + var sourceRef = await Source.From(new[] { 1, 2, 3 }) + .ToMaterialized(StreamRefs.SourceRef(), Keep.Right) + .Run(Materializer); + + // Access .Source property twice (simulates multiple accesses) + // This could happen via debugger inspection, logging, serialization, etc. + var source1 = sourceRef.Source; + var source2 = sourceRef.Source; + + // BUG: They're NOT the same object (non-idempotent behavior) + // Each property access creates a new Source with a new SourceRefStageImpl + // When fixed, this assertion should PASS with ReferenceEquals(source1, source2) == true + ReferenceEquals(source1, source2).Should().BeTrue( + "Source property should be idempotent and return the same instance"); + } + + [Fact] + public async Task SourceRef_multiple_materializations_cause_timeout_issue_7895() + { + // Reproduction test for issue #7895: https://github.com/akkadotnet/akka.net/issues/7895 + // This test demonstrates the race condition from multiple .Source property accesses + // Multiple .Source property accesses create racing SourceRefStageImpl instances + + // Create a SourceRef with short timeout + var sourceRef = await Source.From(Enumerable.Range(1, 100)) + .ToMaterialized(StreamRefs.SourceRef(), Keep.Right) + .WithAttributes(StreamRefAttributes.CreateSubscriptionTimeout(TimeSpan.FromSeconds(3))) + .Run(Materializer); + + // Access .Source twice - creates TWO SourceRefStageImpl instances + var source1 = sourceRef.Source; + var source2 = sourceRef.Source; + + // Materialize both - they race for the same SinkRef handshake + var task1 = source1.RunWith(Sink.Seq(), Materializer); + var task2 = source2.RunWith(Sink.Seq(), Materializer); + + // Wait for both with timeout protection + var allTasks = Task.WhenAll( + task1.ContinueWith(t => t), + task2.ContinueWith(t => t) + ); + + try + { + await allTasks; + } + catch + { + // Expected: at least one should fail + } + + // Check results - at least one should have failed/timed out + var results = new[] { task1, task2 }; + var completedCount = results.Count(t => t.Status == TaskStatus.RanToCompletion); + var faultedCount = results.Count(t => t.Status == TaskStatus.Faulted); + + // Due to race condition: sometimes both fail, sometimes one succeeds + (completedCount + faultedCount).Should().Be(2, "Both tasks should have completed or faulted"); + + // At least one should have issues due to duplicate stage instances + if (faultedCount > 0) + { + var failedTask = results.First(t => t.Status == TaskStatus.Faulted); + failedTask.Exception.InnerException.Should().BeOfType(); + } + } } } From 65a64de13549d6b5e6b76f3f968bcf582535cfe4 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 13 Oct 2025 12:21:40 -0500 Subject: [PATCH 2/2] Fix SourceRef.Source and SinkRef.Sink non-idempotent property bug (#7895) Implemented Lazy to make both ISourceRef.Source and ISinkRef.Sink properties idempotent. Previously, these properties created new stage instances on every access, causing race conditions where multiple instances would compete for the same handshake, leading to intermittent subscription timeouts. Changes: - SourceRefImpl: Use Lazy> for thread-safe caching - SinkRefImpl: Use Lazy> for thread-safe caching - Lazy uses default ExecutionAndPublication mode for thread safety Impact: - Eliminates race conditions from accidental property accesses (debugger, logging, serialization, framework inspection) - Prevents subscription timeouts caused by multiple stage instances - Fixes intermittent ~30% failure rate in production workloads - Double materialization (user error) still fails gracefully at actor protocol level via ObserveAndValidateSender Test Results: - Before fix: Tests failed 25/25 times (100% failure rate) - After fix: Tests passed 10/10 times (100% success rate) Fixes #7895 --- .../Implementation/StreamRef/SinkRefImpl.cs | 14 +++++++++++--- .../Implementation/StreamRef/SourceRefImpl.cs | 15 +++++++++++---- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/core/Akka.Streams/Implementation/StreamRef/SinkRefImpl.cs b/src/core/Akka.Streams/Implementation/StreamRef/SinkRefImpl.cs index 47bd5ae26fe..483e82d1fe7 100644 --- a/src/core/Akka.Streams/Implementation/StreamRef/SinkRefImpl.cs +++ b/src/core/Akka.Streams/Implementation/StreamRef/SinkRefImpl.cs @@ -46,11 +46,19 @@ protected SinkRefImpl(IActorRef initialPartnerRef) [InternalApi] internal sealed class SinkRefImpl : SinkRefImpl, ISinkRef { - public SinkRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) { } + private readonly Lazy> _sink; + + public SinkRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) + { + _sink = new Lazy>(() => + Dsl.Sink.FromGraph(new SinkRefStageImpl(InitialPartnerRef)) + .MapMaterializedValue(_ => NotUsed.Instance)); + } + public override Type EventType => typeof(T); - public override ISurrogate ToSurrogate(ActorSystem system) => SerializationTools.ToSurrogate(this); + public Sink Sink => _sink.Value; - public Sink Sink => Dsl.Sink.FromGraph(new SinkRefStageImpl(InitialPartnerRef)).MapMaterializedValue(_ => NotUsed.Instance); + public override ISurrogate ToSurrogate(ActorSystem system) => SerializationTools.ToSurrogate(this); } /// diff --git a/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs b/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs index e9bef29f744..8964129a6cd 100644 --- a/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs +++ b/src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs @@ -48,13 +48,20 @@ protected SourceRefImpl(IActorRef initialPartnerRef) /// /// INTERNAL API: Implementation class, not intended to be touched directly by end-users. /// - [InternalApi] + [InternalApi] internal sealed class SourceRefImpl : SourceRefImpl, ISourceRef { - public SourceRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) { } + private readonly Lazy> _source; + + public SourceRefImpl(IActorRef initialPartnerRef) : base(initialPartnerRef) + { + _source = new Lazy>(() => + Dsl.Source.FromGraph(new SourceRefStageImpl(InitialPartnerRef)) + .MapMaterializedValue(_ => NotUsed.Instance)); + } + public override Type EventType => typeof(T); - public Source Source => - Dsl.Source.FromGraph(new SourceRefStageImpl(InitialPartnerRef)).MapMaterializedValue(_ => NotUsed.Instance); + public Source Source => _source.Value; public override ISurrogate ToSurrogate(ActorSystem system) => SerializationTools.ToSurrogate(this); }