-
Couldn't load subscription status.
- Fork 1.1k
Description
Bug Description
The ISourceRef<T>.Source property in SourceRefImpl.cs creates a new SourceRefStageImpl instance on every property access. This causes intermittent subscription timeouts when the property is accessed multiple times, as multiple stage instances compete to connect to the same SinkRef.
Affected Code
File: src/core/Akka.Streams/Implementation/StreamRef/SourceRefImpl.cs
Lines: 56-57
public Source<T, NotUsed> Source =>
Dsl.Source.FromGraph(new SourceRefStageImpl<T>(InitialPartnerRef))
.MapMaterializedValue(_ => NotUsed.Instance);The Problem
Each time .Source is accessed, it creates a new SourceRefStageImpl with the same InitialPartnerRef (the SinkRef actor). When multiple SourceRefStageImpl instances are materialized:
- Each instance tries to establish a handshake with the same
SinkRefactor - Only one instance can successfully complete the handshake
- The other instances timeout after 90 seconds with
RemoteStreamRefActorTerminatedException
How Multiple Accesses Occur
Even if user code accesses .Source only once explicitly, multiple accesses can happen through:
- Debugger inspection: IDE property viewers
- Logging frameworks: ToString() implementations
- Serialization: Property enumeration
- Framework code: Implicit property access
- Multiple code paths: Passing
ISourceRefthrough layers that each access.Source
Symptoms
- Intermittent failures (not consistent - depends on timing of which instance "wins" the handshake)
- Timeout occurs even when subscriber materializes quickly
- Error:
RemoteStreamRefActorTerminatedException: subscription timeout triggered termination - Increasing timeout doesn't help (handshake never completes for losing instances)
Reproduction
[Fact]
public async Task Multiple_Source_accesses_cause_timeout()
{
// Create StreamRef
var sourceRef = await Source.From(new[] { 1, 2, 3 })
.ToMaterialized(StreamRefs.SourceRef<int>(), Keep.Right)
.WithAttributes(StreamRefAttributes.SubscriptionTimeout(TimeSpan.FromSeconds(5)))
.Run(Sys.Materializer());
// Access .Source property twice (simulates multiple accesses)
var source1 = sourceRef.Source;
var source2 = sourceRef.Source;
// Verify they're NOT the same object
Assert.False(ReferenceEquals(source1, source2));
// Materialize both - one will timeout
var task1 = source1.RunWith(Sink.Seq<int>(), Sys.Materializer());
var task2 = source2.RunWith(Sink.Seq<int>(), Sys.Materializer());
var results = await Task.WhenAll(
task1.ContinueWith(t => t.IsCompletedSuccessfully),
task2.ContinueWith(t => t.IsCompletedSuccessfully)
);
// One succeeds, one times out
Assert.Contains(true, results);
Assert.Contains(false, results);
}Proposed Fix
Make the .Source property idempotent by caching the created Source:
private Source<T, NotUsed>? _cachedSource;
private readonly object _sourceLock = new object();
public Source<T, NotUsed> Source
{
get
{
if (_cachedSource != null) return _cachedSource;
lock (_sourceLock)
{
if (_cachedSource != null) return _cachedSource;
_cachedSource = Dsl.Source.FromGraph(
new SourceRefStageImpl<T>(InitialPartnerRef))
.MapMaterializedValue(_ => NotUsed.Instance);
return _cachedSource;
}
}
}Workaround for Users
Until this is fixed, users should ensure .Source is accessed exactly once:
// ✅ GOOD: Access .Source once
var sourceRef = await CreateStreamRef();
var source = sourceRef.Source; // Access ONCE, store result
var pipeline = source.Select(...).RunWith(...);
// ❌ BAD: Multiple accesses
var sourceRef = await CreateStreamRef();
var pipeline1 = sourceRef.Source.Select(...); // Access #1
var pipeline2 = sourceRef.Source.Select(...); // Access #2 - creates duplicate!Impact
- Severity: Medium-High (causes production timeouts)
- Frequency: Intermittent (30% failure rate in customer case)
- Workaround: Available (access .Source once)
- Affected versions: All versions with StreamRefs
Customer Impact
Customer ticket: This is causing intermittent RemoteStreamRefActorTerminatedException timeouts in production with ~30% failure rate on distributed stream processing workloads.