Skip to content

Commit 1214c26

Browse files
author
Oren Novotny
authored
Merge pull request #881 from akarnokd/ObserveOnLongRunningFix
Fix wrong usage of ISchedulerLongRunning in ObserveOn
2 parents 8e87096 + a678705 commit 1214c26

File tree

4 files changed

+252
-66
lines changed

4 files changed

+252
-66
lines changed

Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,26 @@ public Scheduler(IObservable<TSource> source, IScheduler scheduler)
2828
protected override void Run(ObserveOnObserverNew<TSource> sink) => sink.Run(_source);
2929
}
3030

31+
/// <summary>
32+
/// The new ObserveOn operator run with an ISchedulerLongRunning in a mostly lock-free manner.
33+
/// </summary>
34+
internal sealed class SchedulerLongRunning : Producer<TSource, ObserveOnObserverLongRunning<TSource>>
35+
{
36+
private readonly IObservable<TSource> _source;
37+
private readonly ISchedulerLongRunning _scheduler;
38+
39+
public SchedulerLongRunning(IObservable<TSource> source, ISchedulerLongRunning scheduler)
40+
{
41+
_source = source;
42+
_scheduler = scheduler;
43+
}
44+
45+
protected override ObserveOnObserverLongRunning<TSource> CreateSink(IObserver<TSource> observer) => new ObserveOnObserverLongRunning<TSource>(_scheduler, observer);
46+
47+
[Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")]
48+
protected override void Run(ObserveOnObserverLongRunning<TSource> sink) => sink.Run(_source);
49+
}
50+
3151
internal sealed class Context : Producer<TSource, Context._>
3252
{
3353
private readonly IObservable<TSource> _source;

Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> sourc
181181
throw new ArgumentNullException(nameof(scheduler));
182182
}
183183

184+
var longRunning = scheduler.AsLongRunning();
185+
if (longRunning != null)
186+
{
187+
return new ObserveOn<TSource>.SchedulerLongRunning(source, longRunning);
188+
}
184189
return new ObserveOn<TSource>.Scheduler(source, scheduler);
185190
}
186191

Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs

Lines changed: 179 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -381,11 +381,6 @@ internal sealed class ObserveOnObserverNew<T> : IdentitySink<T>
381381
{
382382
private readonly IScheduler _scheduler;
383383

384-
/// <summary>
385-
/// If not null, the <see cref="_scheduler"/> supports
386-
/// long running tasks.
387-
/// </summary>
388-
private readonly ISchedulerLongRunning _longRunning;
389384
private readonly ConcurrentQueue<T> _queue;
390385

391386
/// <summary>
@@ -418,7 +413,6 @@ internal sealed class ObserveOnObserverNew<T> : IdentitySink<T>
418413
public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream) : base(downstream)
419414
{
420415
_scheduler = scheduler;
421-
_longRunning = scheduler.AsLongRunning();
422416
_queue = new ConcurrentQueue<T>();
423417
}
424418

@@ -480,15 +474,7 @@ private void Schedule()
480474

481475
if (Disposable.TrySetMultiple(ref _task, newTask))
482476
{
483-
var longRunning = _longRunning;
484-
if (longRunning != null)
485-
{
486-
newTask.Disposable = longRunning.ScheduleLongRunning(this, DrainLongRunningAction);
487-
}
488-
else
489-
{
490-
newTask.Disposable = _scheduler.Schedule(this, DrainShortRunningFunc);
491-
}
477+
newTask.Disposable = _scheduler.Schedule(this, DrainShortRunningFunc);
492478
}
493479

494480
// If there was a cancellation, clear the queue
@@ -502,14 +488,6 @@ private void Schedule()
502488
}
503489
}
504490

505-
/// <summary>
506-
/// The static action to be scheduled on a long running scheduler.
507-
/// Avoids creating a delegate that captures <code>this</code>
508-
/// whenever the signals have to be drained.
509-
/// </summary>
510-
private static readonly Action<ObserveOnObserverNew<T>, ICancelable> DrainLongRunningAction =
511-
(self, cancel) => self.DrainLongRunning();
512-
513491
/// <summary>
514492
/// The static action to be scheduled on a simple scheduler.
515493
/// Avoids creating a delegate that captures <code>this</code>
@@ -526,7 +504,7 @@ private void Schedule()
526504
/// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
527505
private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
528506
{
529-
DrainStep(_queue, false);
507+
DrainStep(_queue);
530508

531509
if (Interlocked.Decrement(ref _wip) != 0)
532510
{
@@ -550,23 +528,20 @@ private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
550528
/// In addition, the DrainStep is invoked from the DrainLongRunning's loop
551529
/// so reading _queue inside this method would still incur the same barrier
552530
/// overhead otherwise.</param>
553-
/// <param name="delayError">Should the errors be delayed until all
554-
/// queued items have been emitted to the downstream?</param>
555-
/// <returns>True if the drain loop should stop.</returns>
556-
private bool DrainStep(ConcurrentQueue<T> q, bool delayError)
531+
private void DrainStep(ConcurrentQueue<T> q)
557532
{
558533
// Check if the operator has been disposed
559534
if (Volatile.Read(ref _disposed))
560535
{
561536
// cleanup residue items in the queue
562537
Clear(q);
563-
return true;
538+
return;
564539
}
565540

566541
// Has the upstream call OnCompleted?
567542
var d = Volatile.Read(ref _done);
568543

569-
if (d && !delayError)
544+
if (d)
570545
{
571546
// done = true happens before setting error
572547
// this is safe to be a plain read
@@ -576,7 +551,7 @@ private bool DrainStep(ConcurrentQueue<T> q, bool delayError)
576551
{
577552
Volatile.Write(ref _disposed, true);
578553
ForwardOnError(ex);
579-
return true;
554+
return;
580555
}
581556
}
582557

@@ -588,65 +563,203 @@ private bool DrainStep(ConcurrentQueue<T> q, bool delayError)
588563
if (d && empty)
589564
{
590565
Volatile.Write(ref _disposed, true);
591-
// done = true happens before setting error
592-
// this is safe to be a plain read
593-
var ex = _error;
594-
// if not null, there was an OnError call
595-
if (ex != null)
596-
{
597-
ForwardOnError(ex);
598-
}
599-
else
600-
{
601-
// otherwise, complete normally
602-
ForwardOnCompleted();
603-
}
604-
return true;
566+
// otherwise, complete normally
567+
ForwardOnCompleted();
568+
return;
605569
}
606570

607571
// the queue is empty and the upstream hasn't completed yet
608572
if (empty)
609573
{
610-
return true;
574+
return;
611575
}
612576
// emit the item
613577
ForwardOnNext(v);
578+
}
579+
}
580+
581+
/// <summary>
582+
/// Signals events on a ISchedulerLongRunning by blocking the emission thread while waiting
583+
/// for them from the upstream.
584+
/// </summary>
585+
/// <typeparam name="TSource">The element type of the sequence.</typeparam>
586+
internal sealed class ObserveOnObserverLongRunning<TSource> : IdentitySink<TSource>
587+
{
588+
/// <summary>
589+
/// This will run a suspending drain task, hogging the backing thread
590+
/// until the sequence terminates or gets disposed.
591+
/// </summary>
592+
private readonly ISchedulerLongRunning _scheduler;
593+
594+
/// <summary>
595+
/// The queue for holding the OnNext items, terminal signals have their own fields.
596+
/// </summary>
597+
private readonly ConcurrentQueue<TSource> _queue;
598+
599+
/// <summary>
600+
/// Protects the suspension and resumption of the long running drain task.
601+
/// </summary>
602+
private readonly object _suspendGuard;
603+
604+
/// <summary>
605+
/// The work-in-progress counter. If it jumps from 0 to 1, the drain task is resumed,
606+
/// if it reaches 0 again, the drain task is suspended.
607+
/// </summary>
608+
private long _wip;
609+
610+
/// <summary>
611+
/// Set to true if the upstream terminated.
612+
/// </summary>
613+
private bool _done;
614+
615+
/// <summary>
616+
/// Set to a non-null Exception if the upstream terminated with OnError.
617+
/// </summary>
618+
private Exception _error;
619+
620+
/// <summary>
621+
/// Indicates the sequence has been disposed and the drain task should quit.
622+
/// </summary>
623+
private bool _disposed;
624+
625+
/// <summary>
626+
/// Makes sure the drain task is scheduled only once, when the first signal
627+
/// from upstream arrives.
628+
/// </summary>
629+
private int _runDrainOnce;
630+
631+
/// <summary>
632+
/// The disposable tracking the drain task.
633+
/// </summary>
634+
private IDisposable _drainTask;
614635

615-
// keep looping
616-
return false;
636+
public ObserveOnObserverLongRunning(ISchedulerLongRunning scheduler, IObserver<TSource> observer) : base(observer)
637+
{
638+
_scheduler = scheduler;
639+
_queue = new ConcurrentQueue<TSource>();
640+
_suspendGuard = new object();
641+
}
642+
643+
public override void OnCompleted()
644+
{
645+
Volatile.Write(ref _done, true);
646+
Schedule();
647+
}
648+
649+
public override void OnError(Exception error)
650+
{
651+
_error = error;
652+
Volatile.Write(ref _done, true);
653+
Schedule();
654+
}
655+
656+
public override void OnNext(TSource value)
657+
{
658+
_queue.Enqueue(value);
659+
Schedule();
660+
}
661+
662+
private void Schedule()
663+
{
664+
// Schedule the suspending drain once
665+
if (Volatile.Read(ref _runDrainOnce) == 0
666+
&& Interlocked.CompareExchange(ref _runDrainOnce, 1, 0) == 0)
667+
{
668+
Disposable.SetSingle(ref _drainTask, _scheduler.ScheduleLongRunning(this, DrainLongRunning));
669+
}
670+
671+
// Indicate more work is to be done by the drain loop
672+
if (Interlocked.Increment(ref _wip) == 1L)
673+
{
674+
// resume the drain loop waiting on the guard
675+
lock (_suspendGuard)
676+
{
677+
Monitor.Pulse(_suspendGuard);
678+
}
679+
}
617680
}
618681

619682
/// <summary>
620-
/// Emits as many signals as possible to the downstream observer
621-
/// as this is executing a long-running scheduler so
622-
/// it can occupy that thread as long as it needs to.
683+
/// Static reference to the Drain method, saves allocation.
623684
/// </summary>
624-
private void DrainLongRunning()
685+
private static readonly Action<ObserveOnObserverLongRunning<TSource>, ICancelable> DrainLongRunning = (self, cancelable) => self.Drain();
686+
687+
protected override void Dispose(bool disposing)
625688
{
626-
var missed = 1;
689+
// Indicate the drain task should quit
690+
Volatile.Write(ref _disposed, true);
691+
// Resume the drain task in case it sleeps
692+
lock (_suspendGuard)
693+
{
694+
Monitor.Pulse(_suspendGuard);
695+
}
696+
// Cancel the drain task handle.
697+
Disposable.TryDispose(ref _drainTask);
698+
base.Dispose(disposing);
699+
}
627700

628-
// read out fields upfront as the DrainStep uses atomics
629-
// that would force the re-read of these constant values
630-
// from memory, regardless of readonly, afaik
701+
private void Drain()
702+
{
631703
var q = _queue;
632-
633704
for (; ; )
634705
{
635-
for (; ; )
706+
// If the sequence was disposed, clear the queue and quit
707+
if (Volatile.Read(ref _disposed))
708+
{
709+
while (q.TryDequeue(out var _)) ;
710+
break;
711+
}
712+
713+
// Has the upstream terminated?
714+
var isDone = Volatile.Read(ref _done);
715+
// Do we have an item in the queue
716+
var hasValue = q.TryDequeue(out var item);
717+
718+
// If the upstream has terminated and no further items are in the queue
719+
if (isDone && !hasValue)
636720
{
637-
// delayError: true - because of
638-
// ObserveOn_LongRunning_HoldUpDuringDispatchAndFail
639-
// expects something that almost looks like full delayError
640-
if (DrainStep(q, true))
721+
// Find out if the upstream terminated with an error and signal accordingly.
722+
var e = _error;
723+
if (e != null)
724+
{
725+
ForwardOnError(e);
726+
}
727+
else
641728
{
642-
break;
729+
ForwardOnCompleted();
643730
}
731+
break;
644732
}
645733

646-
missed = Interlocked.Add(ref _wip, -missed);
647-
if (missed == 0)
734+
// There was an item, signal it.
735+
if (hasValue)
648736
{
649-
break;
737+
ForwardOnNext(item);
738+
// Consume the item and try the next item if the work-in-progress
739+
// indicator is still not zero
740+
if (Interlocked.Decrement(ref _wip) != 0L)
741+
{
742+
continue;
743+
}
744+
}
745+
746+
// If we run out of work and the sequence is not disposed
747+
if (Volatile.Read(ref _wip) == 0L && !Volatile.Read(ref _disposed))
748+
{
749+
var g = _suspendGuard;
750+
// try sleeping, if we can't even enter the lock, the producer
751+
// side is currently trying to resume us
752+
if (Monitor.TryEnter(g))
753+
{
754+
// Make sure again there is still no work and the sequence is not disposed
755+
if (Volatile.Read(ref _wip) == 0L && !Volatile.Read(ref _disposed))
756+
{
757+
// wait for a Pulse(g)
758+
Monitor.Wait(g);
759+
}
760+
// Unlock
761+
Monitor.Exit(g);
762+
}
650763
}
651764
}
652765
}

0 commit comments

Comments
 (0)