Skip to content

Commit 3739bea

Browse files
authored
Merge pull request #298 from ReactiveCocoa/concurrent-strategy
Introduce `FlattenStrategy.concurrent`.
2 parents 92a6e2a + d016f7e commit 3739bea

File tree

2 files changed

+162
-159
lines changed

2 files changed

+162
-159
lines changed

Sources/Flatten.swift

Lines changed: 111 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,27 @@ public enum FlattenStrategy: Equatable {
1515
///
1616
/// The resulting producer will complete only when all inputs have
1717
/// completed.
18-
case merge
18+
public static let merge = FlattenStrategy.concurrent(limit: .max)
1919

2020
/// The producers should be concatenated, so that their values are sent in
2121
/// the order of the producers themselves.
2222
///
2323
/// The resulting producer will complete only when all inputs have
2424
/// completed.
25-
case concat
25+
public static let concat = FlattenStrategy.concurrent(limit: 1)
26+
27+
/// The producers should be merged, but only up to the given limit at any
28+
/// point of time, so that any value received on any of the input producers
29+
/// will be forwarded immediately to the output producer.
30+
///
31+
/// When the number of active producers reaches the limit, subsequent
32+
/// producers are queued.
33+
///
34+
/// The resulting producer will complete only when all inputs have
35+
/// completed.
36+
///
37+
/// - precondition: `limit > 0`.
38+
case concurrent(limit: UInt)
2639

2740
/// Only the events from the latest input producer should be considered for
2841
/// the output. Any producers received before that point will be disposed
@@ -31,8 +44,20 @@ public enum FlattenStrategy: Equatable {
3144
/// The resulting producer will complete only when the producer-of-producers
3245
/// and the latest producer has completed.
3346
case latest
34-
}
3547

48+
public static func ==(left: FlattenStrategy, right: FlattenStrategy) -> Bool {
49+
switch (left, right) {
50+
case (.latest, .latest):
51+
return true
52+
53+
case (.concurrent(let leftLimit), .concurrent(let rightLimit)):
54+
return leftLimit == rightLimit
55+
56+
default:
57+
return false
58+
}
59+
}
60+
}
3661

3762
extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
3863
/// Flattens the inner producers sent upon `signal` (into a single signal of
@@ -48,11 +73,8 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
4873
/// - strategy: Strategy used when flattening signals.
4974
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
5075
switch strategy {
51-
case .merge:
52-
return self.merge()
53-
54-
case .concat:
55-
return self.concat()
76+
case .concurrent(let limit):
77+
return self.concurrent(limit: limit)
5678

5779
case .latest:
5880
return self.switchToLatest()
@@ -90,11 +112,8 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == NoError,
90112
/// - strategy: Strategy used when flattening signals.
91113
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
92114
switch strategy {
93-
case .merge:
94-
return self.merge()
95-
96-
case .concat:
97-
return self.concat()
115+
case .concurrent(let limit):
116+
return self.concurrent(limit: limit)
98117

99118
case .latest:
100119
return self.switchToLatest()
@@ -133,11 +152,8 @@ extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == V
133152
/// - strategy: Strategy used when flattening signals.
134153
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
135154
switch strategy {
136-
case .merge:
137-
return self.merge()
138-
139-
case .concat:
140-
return self.concat()
155+
case .concurrent(let limit):
156+
return self.concurrent(limit: limit)
141157

142158
case .latest:
143159
return self.switchToLatest()
@@ -175,11 +191,8 @@ extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == N
175191
/// - strategy: Strategy used when flattening signals.
176192
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
177193
switch strategy {
178-
case .merge:
179-
return self.merge()
180-
181-
case .concat:
182-
return self.concat()
194+
case .concurrent(let limit):
195+
return self.concurrent(limit: limit)
183196

184197
case .latest:
185198
return self.switchToLatest()
@@ -389,58 +402,61 @@ extension SignalProducerProtocol where Value: PropertyProtocol {
389402
}
390403

391404
extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
392-
/// Returns a signal which sends all the values from producer signal emitted
393-
/// from `signal`, waiting until each inner producer completes before
394-
/// beginning to send the values from the next inner producer.
395-
///
396-
/// - note: If any of the inner producers fail, the returned signal will
397-
/// forward that failure immediately
398-
///
399-
/// - note: The returned signal completes only when `signal` and all
400-
/// producers emitted from `signal` complete.
401-
fileprivate func concat() -> Signal<Value.Value, Error> {
405+
fileprivate func concurrent(limit: UInt) -> Signal<Value.Value, Error> {
406+
precondition(limit > 0, "The concurrent limit must be greater than zero.")
407+
402408
return Signal<Value.Value, Error> { relayObserver in
403409
let disposable = CompositeDisposable()
404410
let relayDisposable = CompositeDisposable()
405411

406412
disposable += relayDisposable
407-
disposable += self.observeConcat(relayObserver, relayDisposable)
413+
disposable += self.observeConcurrent(relayObserver, limit, relayDisposable)
408414

409415
return disposable
410416
}
411417
}
412-
413-
fileprivate func observeConcat(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable? = nil) -> Disposable? {
414-
let state = Atomic(ConcatState<Value.Value, Error>())
415-
418+
419+
fileprivate func observeConcurrent(_ observer: Observer<Value.Value, Error>, _ limit: UInt, _ disposable: CompositeDisposable) -> Disposable? {
420+
let state = Atomic(ConcurrentFlattenState<Value.Value, Error>(limit: limit))
421+
416422
func startNextIfNeeded() {
417423
while let producer = state.modify({ $0.dequeue() }) {
424+
let producerState = UnsafeAtomicState<ProducerState>(.starting)
425+
let deinitializer = ScopedDisposable(ActionDisposable(action: producerState.deinitialize))
426+
418427
producer.startWithSignal { signal, inner in
419-
let handle = disposable?.add(inner)
428+
let handle = disposable.add(inner)
420429

421430
signal.observe { event in
422431
switch event {
423432
case .completed, .interrupted:
424-
handle?.remove()
425-
426-
let shouldStart: Bool = state.modify {
427-
$0.active = nil
428-
return !$0.isStarting
433+
handle.remove()
434+
435+
let shouldComplete: Bool = state.modify { state in
436+
state.activeCount -= 1
437+
return state.shouldComplete
429438
}
430439

431-
if shouldStart {
432-
startNextIfNeeded()
440+
withExtendedLifetime(deinitializer) {
441+
if shouldComplete {
442+
observer.sendCompleted()
443+
} else if producerState.is(.started) {
444+
startNextIfNeeded()
445+
}
433446
}
434447

435448
case .value, .failed:
436449
observer.action(event)
437450
}
438451
}
439452
}
440-
state.modify { $0.isStarting = false }
453+
454+
withExtendedLifetime(deinitializer) {
455+
producerState.setStarted()
456+
}
441457
}
442458
}
443-
459+
444460
return observe { event in
445461
switch event {
446462
case let .value(value):
@@ -451,11 +467,15 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
451467
observer.send(error: error)
452468

453469
case .completed:
454-
state.modify { state in
455-
state.queue.append(SignalProducer.empty.on(completed: observer.sendCompleted))
470+
let shouldComplete: Bool = state.modify { state in
471+
state.isOuterCompleted = true
472+
return state.shouldComplete
456473
}
457-
startNextIfNeeded()
458-
474+
475+
if shouldComplete {
476+
observer.sendCompleted()
477+
}
478+
459479
case .interrupted:
460480
observer.sendInterrupted()
461481
}
@@ -464,20 +484,14 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
464484
}
465485

466486
extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
467-
/// Returns a producer which sends all the values from each producer emitted
468-
/// from `producer`, waiting until each inner producer completes before
469-
/// beginning to send the values from the next inner producer.
470-
///
471-
/// - note: If any of the inner producers emit an error, the returned
472-
/// producer will emit that error.
473-
///
474-
/// - note: The returned producer completes only when `producer` and all
475-
/// producers emitted from `producer` complete.
476-
fileprivate func concat() -> SignalProducer<Value.Value, Error> {
477-
return SignalProducer<Value.Value, Error> { observer, disposable in
487+
fileprivate func concurrent(limit: UInt) -> SignalProducer<Value.Value, Error> {
488+
precondition(limit > 0, "The concurrent limit must be greater than zero.")
489+
490+
return SignalProducer<Value.Value, Error> { relayObserver, disposable in
478491
self.startWithSignal { signal, signalDisposable in
479492
disposable += signalDisposable
480-
_ = signal.observeConcat(observer, disposable)
493+
494+
_ = signal.observeConcurrent(relayObserver, limit, disposable)
481495
}
482496
}
483497
}
@@ -531,114 +545,52 @@ extension SignalProducerProtocol {
531545
}
532546
}
533547

534-
private final class ConcatState<Value, Error: Swift.Error> {
535-
typealias SignalProducer = ReactiveSwift.SignalProducer<Value, Error>
548+
private final class ConcurrentFlattenState<Value, Error: Swift.Error> {
549+
typealias Producer = ReactiveSwift.SignalProducer<Value, Error>
550+
551+
/// The limit of active producers.
552+
let limit: UInt
536553

537-
/// The active producer, if any.
538-
var active: SignalProducer? = nil
554+
/// The number of active producers.
555+
var activeCount: UInt = 0
539556

540557
/// The producers waiting to be started.
541-
var queue: [SignalProducer] = []
542-
543-
/// Whether the active producer is currently starting.
544-
/// Used to prevent deep recursion.
545-
var isStarting: Bool = false
558+
var queue: [Producer] = []
559+
560+
/// Whether the outer producer has completed.
561+
var isOuterCompleted = false
562+
563+
/// Whether the flattened signal should complete.
564+
var shouldComplete: Bool {
565+
return isOuterCompleted && activeCount == 0 && queue.isEmpty
566+
}
567+
568+
init(limit: UInt) {
569+
self.limit = limit
570+
}
546571

547572
/// Dequeue the next producer if one should be started.
548573
///
549-
/// - note: The caller *must* set `isStarting` to false after the returned
550-
/// producer has been started.
551-
///
552-
/// - returns: The `SignalProducer` to start or `nil` if no producer should
553-
/// be started.
554-
func dequeue() -> SignalProducer? {
555-
if active != nil {
574+
/// - returns: The `Producer` to start or `nil` if no producer should be
575+
/// started.
576+
func dequeue() -> Producer? {
577+
if activeCount < limit, !queue.isEmpty {
578+
activeCount += 1
579+
return queue.removeFirst()
580+
} else {
556581
return nil
557582
}
558-
559-
active = queue.first
560-
if active != nil {
561-
queue.removeFirst()
562-
isStarting = true
563-
}
564-
return active
565583
}
566584
}
567585

568-
extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
569-
/// Merges a `signal` of SignalProducers down into a single signal, biased
570-
/// toward the producer added earlier. Returns a Signal that will forward
571-
/// events from the inner producers as they arrive.
572-
fileprivate func merge() -> Signal<Value.Value, Error> {
573-
return Signal<Value.Value, Error> { relayObserver in
574-
let disposable = CompositeDisposable()
575-
let relayDisposable = CompositeDisposable()
576-
577-
disposable += relayDisposable
578-
disposable += self.observeMerge(relayObserver, relayDisposable)
579-
580-
return disposable
581-
}
582-
}
583-
584-
fileprivate func observeMerge(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable) -> Disposable? {
585-
let inFlight = Atomic(1)
586-
let decrementInFlight = {
587-
let shouldComplete: Bool = inFlight.modify {
588-
$0 -= 1
589-
return $0 == 0
590-
}
591-
592-
if shouldComplete {
593-
observer.sendCompleted()
594-
}
595-
}
596-
597-
return self.observe { event in
598-
switch event {
599-
case let .value(producer):
600-
producer.startWithSignal { innerSignal, innerDisposable in
601-
inFlight.modify { $0 += 1 }
602-
let handle = disposable.add(innerDisposable)
603-
604-
innerSignal.observe { event in
605-
switch event {
606-
case .completed, .interrupted:
607-
handle.remove()
608-
decrementInFlight()
609-
610-
case .value, .failed:
611-
observer.action(event)
612-
}
613-
}
614-
}
615-
616-
case let .failed(error):
617-
observer.send(error: error)
618-
619-
case .completed:
620-
decrementInFlight()
621-
622-
case .interrupted:
623-
observer.sendInterrupted()
624-
}
625-
}
626-
}
586+
private enum ProducerState: Int32 {
587+
case starting
588+
case started
627589
}
628590

629-
extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
630-
/// Merges a `signal` of SignalProducers down into a single signal, biased
631-
/// toward the producer added earlier. Returns a Signal that will forward
632-
/// events from the inner producers as they arrive.
633-
fileprivate func merge() -> SignalProducer<Value.Value, Error> {
634-
return SignalProducer<Value.Value, Error> { relayObserver, disposable in
635-
self.startWithSignal { signal, signalDisposable in
636-
disposable += signalDisposable
637-
638-
_ = signal.observeMerge(relayObserver, disposable)
639-
}
640-
641-
}
591+
extension AtomicStateProtocol where State == ProducerState {
592+
fileprivate func setStarted() {
593+
precondition(tryTransition(from: .starting, to: .started), "The transition is not supposed to fail.")
642594
}
643595
}
644596

0 commit comments

Comments
 (0)