Skip to content

Commit 23f0b40

Browse files
committed
Introduce FlattenStrategy.concurrent.
1 parent 271d865 commit 23f0b40

File tree

1 file changed

+92
-162
lines changed

1 file changed

+92
-162
lines changed

Sources/Flatten.swift

Lines changed: 92 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,25 @@ public enum FlattenStrategy: Equatable {
1515
///
1616
/// The resulting producer will complete only when all inputs have
1717
/// completed.
18-
case merge
18+
public static let merge = FlattenStrategy.concurrent(limit: .max)
1919

2020
/// The producers should be concatenated, so that their values are sent in
2121
/// the order of the producers themselves.
2222
///
2323
/// The resulting producer will complete only when all inputs have
2424
/// completed.
25-
case concat
25+
public static let concat = FlattenStrategy.concurrent(limit: 1)
26+
27+
/// The producers should be merged, but only up to the given limit at any
28+
/// point of time, so that any value received on any of the input producers
29+
/// will be forwarded immediately to the output producer.
30+
///
31+
/// When the number of active producers reaches the limit, subsequent
32+
/// producers are queued.
33+
///
34+
/// The resulting producer will complete only when all inputs have
35+
/// completed.
36+
case concurrent(limit: UInt)
2637

2738
/// Only the events from the latest input producer should be considered for
2839
/// the output. Any producers received before that point will be disposed
@@ -31,8 +42,20 @@ public enum FlattenStrategy: Equatable {
3142
/// The resulting producer will complete only when the producer-of-producers
3243
/// and the latest producer has completed.
3344
case latest
34-
}
3545

46+
public static func ==(left: FlattenStrategy, right: FlattenStrategy) -> Bool {
47+
switch (left, right) {
48+
case (.latest, .latest):
49+
return true
50+
51+
case (.concurrent(let leftLimit), .concurrent(let rightLimit)):
52+
return leftLimit == rightLimit
53+
54+
default:
55+
return false
56+
}
57+
}
58+
}
3659

3760
extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
3861
/// Flattens the inner producers sent upon `signal` (into a single signal of
@@ -45,11 +68,8 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
4568
/// `Completed events on inner producers.
4669
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
4770
switch strategy {
48-
case .merge:
49-
return self.merge()
50-
51-
case .concat:
52-
return self.concat()
71+
case .concurrent(let limit):
72+
return self.concurrent(limit: limit)
5373

5474
case .latest:
5575
return self.switchToLatest()
@@ -87,11 +107,8 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == NoError,
87107
/// - strategy: Strategy used when flattening signals.
88108
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
89109
switch strategy {
90-
case .merge:
91-
return self.merge()
92-
93-
case .concat:
94-
return self.concat()
110+
case .concurrent(let limit):
111+
return self.concurrent(limit: limit)
95112

96113
case .latest:
97114
return self.switchToLatest()
@@ -124,11 +141,8 @@ extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == V
124141
/// `completed` events on inner producers.
125142
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
126143
switch strategy {
127-
case .merge:
128-
return self.merge()
129-
130-
case .concat:
131-
return self.concat()
144+
case .concurrent(let limit):
145+
return self.concurrent(limit: limit)
132146

133147
case .latest:
134148
return self.switchToLatest()
@@ -160,11 +174,8 @@ extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == N
160174
/// `completed` events on inner producers.
161175
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
162176
switch strategy {
163-
case .merge:
164-
return self.merge()
165-
166-
case .concat:
167-
return self.concat()
177+
case .concurrent(let limit):
178+
return self.concurrent(limit: limit)
168179

169180
case .latest:
170181
return self.switchToLatest()
@@ -341,46 +352,41 @@ extension SignalProducerProtocol where Value: PropertyProtocol {
341352
}
342353

343354
extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
344-
/// Returns a signal which sends all the values from producer signal emitted
345-
/// from `signal`, waiting until each inner producer completes before
346-
/// beginning to send the values from the next inner producer.
347-
///
348-
/// - note: If any of the inner producers fail, the returned signal will
349-
/// forward that failure immediately
350-
///
351-
/// - note: The returned signal completes only when `signal` and all
352-
/// producers emitted from `signal` complete.
353-
fileprivate func concat() -> Signal<Value.Value, Error> {
355+
fileprivate func concurrent(limit: UInt) -> Signal<Value.Value, Error> {
354356
return Signal<Value.Value, Error> { relayObserver in
355357
let disposable = CompositeDisposable()
356358
let relayDisposable = CompositeDisposable()
357359

358360
disposable += relayDisposable
359-
disposable += self.observeConcat(relayObserver, relayDisposable)
361+
disposable += self.observeConcurrent(relayObserver, limit, relayDisposable)
360362

361363
return disposable
362364
}
363365
}
364-
365-
fileprivate func observeConcat(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable? = nil) -> Disposable? {
366-
let state = Atomic(ConcatState<Value.Value, Error>())
367-
366+
367+
fileprivate func observeConcurrent(_ observer: Observer<Value.Value, Error>, _ limit: UInt, _ disposable: CompositeDisposable) -> Disposable? {
368+
let state = Atomic(ConcurrentFlattenState<Value.Value, Error>(limit: limit))
369+
368370
func startNextIfNeeded() {
369371
while let producer = state.modify({ $0.dequeue() }) {
372+
var isStarting = true
373+
370374
producer.startWithSignal { signal, inner in
371-
let handle = disposable?.add(inner)
375+
let handle = disposable.add(inner)
372376

373377
signal.observe { event in
374378
switch event {
375379
case .completed, .interrupted:
376-
handle?.remove()
377-
378-
let shouldStart: Bool = state.modify {
379-
$0.active = nil
380-
return !$0.isStarting
380+
handle.remove()
381+
382+
let shouldComplete: Bool = state.modify { state in
383+
state.activeCount -= 1
384+
return state.shouldComplete
381385
}
382386

383-
if shouldStart {
387+
if shouldComplete {
388+
observer.sendCompleted()
389+
} else if !isStarting {
384390
startNextIfNeeded()
385391
}
386392

@@ -389,10 +395,11 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
389395
}
390396
}
391397
}
392-
state.modify { $0.isStarting = false }
398+
399+
isStarting = false
393400
}
394401
}
395-
402+
396403
return observe { event in
397404
switch event {
398405
case let .value(value):
@@ -403,11 +410,15 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
403410
observer.send(error: error)
404411

405412
case .completed:
406-
state.modify { state in
407-
state.queue.append(SignalProducer.empty.on(completed: observer.sendCompleted))
413+
let shouldComplete: Bool = state.modify { state in
414+
state.isOuterCompleted = true
415+
return state.shouldComplete
408416
}
409-
startNextIfNeeded()
410-
417+
418+
if shouldComplete {
419+
observer.sendCompleted()
420+
}
421+
411422
case .interrupted:
412423
observer.sendInterrupted()
413424
}
@@ -416,20 +427,12 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
416427
}
417428

418429
extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
419-
/// Returns a producer which sends all the values from each producer emitted
420-
/// from `producer`, waiting until each inner producer completes before
421-
/// beginning to send the values from the next inner producer.
422-
///
423-
/// - note: If any of the inner producers emit an error, the returned
424-
/// producer will emit that error.
425-
///
426-
/// - note: The returned producer completes only when `producer` and all
427-
/// producers emitted from `producer` complete.
428-
fileprivate func concat() -> SignalProducer<Value.Value, Error> {
429-
return SignalProducer<Value.Value, Error> { observer, disposable in
430+
fileprivate func concurrent(limit: UInt) -> SignalProducer<Value.Value, Error> {
431+
return SignalProducer<Value.Value, Error> { relayObserver, disposable in
430432
self.startWithSignal { signal, signalDisposable in
431433
disposable += signalDisposable
432-
_ = signal.observeConcat(observer, disposable)
434+
435+
_ = signal.observeConcurrent(relayObserver, limit, disposable)
433436
}
434437
}
435438
}
@@ -459,113 +462,40 @@ extension SignalProducerProtocol {
459462
}
460463
}
461464

462-
private final class ConcatState<Value, Error: Swift.Error> {
463-
typealias SignalProducer = ReactiveSwift.SignalProducer<Value, Error>
465+
private final class ConcurrentFlattenState<Value, Error: Swift.Error> {
466+
typealias Producer = ReactiveSwift.SignalProducer<Value, Error>
467+
468+
/// The limit of active producers.
469+
let limit: UInt
464470

465-
/// The active producer, if any.
466-
var active: SignalProducer? = nil
471+
/// The number of active producers.
472+
var activeCount: UInt = 0
467473

468474
/// The producers waiting to be started.
469-
var queue: [SignalProducer] = []
470-
471-
/// Whether the active producer is currently starting.
472-
/// Used to prevent deep recursion.
473-
var isStarting: Bool = false
474-
475-
/// Dequeue the next producer if one should be started.
476-
///
477-
/// - note: The caller *must* set `isStarting` to false after the returned
478-
/// producer has been started.
479-
///
480-
/// - returns: The `SignalProducer` to start or `nil` if no producer should
481-
/// be started.
482-
func dequeue() -> SignalProducer? {
483-
if active != nil {
484-
return nil
485-
}
486-
487-
active = queue.first
488-
if active != nil {
489-
queue.removeFirst()
490-
isStarting = true
491-
}
492-
return active
493-
}
494-
}
475+
var queue: [Producer] = []
495476

496-
extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
497-
/// Merges a `signal` of SignalProducers down into a single signal, biased
498-
/// toward the producer added earlier. Returns a Signal that will forward
499-
/// events from the inner producers as they arrive.
500-
fileprivate func merge() -> Signal<Value.Value, Error> {
501-
return Signal<Value.Value, Error> { relayObserver in
502-
let disposable = CompositeDisposable()
503-
let relayDisposable = CompositeDisposable()
477+
/// Whether the outer producer has completed.
478+
var isOuterCompleted = false
504479

505-
disposable += relayDisposable
506-
disposable += self.observeMerge(relayObserver, relayDisposable)
507-
508-
return disposable
509-
}
480+
/// Whether the flattened signal should complete.
481+
var shouldComplete: Bool {
482+
return isOuterCompleted && activeCount == 0 && queue.isEmpty
510483
}
511484

512-
fileprivate func observeMerge(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable) -> Disposable? {
513-
let inFlight = Atomic(1)
514-
let decrementInFlight = {
515-
let shouldComplete: Bool = inFlight.modify {
516-
$0 -= 1
517-
return $0 == 0
518-
}
519-
520-
if shouldComplete {
521-
observer.sendCompleted()
522-
}
523-
}
524-
525-
return self.observe { event in
526-
switch event {
527-
case let .value(producer):
528-
producer.startWithSignal { innerSignal, innerDisposable in
529-
inFlight.modify { $0 += 1 }
530-
let handle = disposable.add(innerDisposable)
531-
532-
innerSignal.observe { event in
533-
switch event {
534-
case .completed, .interrupted:
535-
handle.remove()
536-
decrementInFlight()
537-
538-
case .value, .failed:
539-
observer.action(event)
540-
}
541-
}
542-
}
543-
544-
case let .failed(error):
545-
observer.send(error: error)
546-
547-
case .completed:
548-
decrementInFlight()
549-
550-
case .interrupted:
551-
observer.sendInterrupted()
552-
}
553-
}
485+
init(limit: UInt) {
486+
self.limit = limit
554487
}
555-
}
556-
557-
extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
558-
/// Merges a `signal` of SignalProducers down into a single signal, biased
559-
/// toward the producer added earlier. Returns a Signal that will forward
560-
/// events from the inner producers as they arrive.
561-
fileprivate func merge() -> SignalProducer<Value.Value, Error> {
562-
return SignalProducer<Value.Value, Error> { relayObserver, disposable in
563-
self.startWithSignal { signal, signalDisposable in
564-
disposable += signalDisposable
565-
566-
_ = signal.observeMerge(relayObserver, disposable)
567-
}
568-
488+
489+
/// Dequeue the next producer if one should be started.
490+
///
491+
/// - returns: The `Producer` to start or `nil` if no producer should be
492+
/// started.
493+
func dequeue() -> Producer? {
494+
if activeCount < limit, !queue.isEmpty {
495+
activeCount += 1
496+
return queue.removeFirst()
497+
} else {
498+
return nil
569499
}
570500
}
571501
}

0 commit comments

Comments
 (0)