Skip to content

Commit ec30c09

Browse files
committed
Introduce FlattenStrategy.concurrent.
1 parent 271d865 commit ec30c09

File tree

1 file changed

+102
-172
lines changed

1 file changed

+102
-172
lines changed

Sources/Flatten.swift

Lines changed: 102 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,25 @@ public enum FlattenStrategy: Equatable {
1515
///
1616
/// The resulting producer will complete only when all inputs have
1717
/// completed.
18-
case merge
18+
public static let merge = FlattenStrategy.concurrent(limit: .max)
1919

2020
/// The producers should be concatenated, so that their values are sent in
2121
/// the order of the producers themselves.
2222
///
2323
/// The resulting producer will complete only when all inputs have
2424
/// completed.
25-
case concat
25+
public static let concat = FlattenStrategy.concurrent(limit: 1)
26+
27+
/// The producers should be merged, but only up to the given limit at any
28+
/// point of time, so that any value received on any of the input producers
29+
/// will be forwarded immediately to the output producer.
30+
///
31+
/// When the number of active producers reaches the limit, subsequent
32+
/// producers are queued.
33+
///
34+
/// The resulting producer will complete only when all inputs have
35+
/// completed.
36+
case concurrent(limit: Int)
2637

2738
/// Only the events from the latest input producer should be considered for
2839
/// the output. Any producers received before that point will be disposed
@@ -31,8 +42,20 @@ public enum FlattenStrategy: Equatable {
3142
/// The resulting producer will complete only when the producer-of-producers
3243
/// and the latest producer has completed.
3344
case latest
34-
}
3545

46+
public static func ==(left: FlattenStrategy, right: FlattenStrategy) -> Bool {
47+
switch (left, right) {
48+
case (.latest, .latest):
49+
return true
50+
51+
case (.concurrent(let leftLimit), .concurrent(let rightLimit)):
52+
return leftLimit == rightLimit
53+
54+
default:
55+
return false
56+
}
57+
}
58+
}
3659

3760
extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
3861
/// Flattens the inner producers sent upon `signal` (into a single signal of
@@ -45,11 +68,8 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
4568
/// `Completed events on inner producers.
4669
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
4770
switch strategy {
48-
case .merge:
49-
return self.merge()
50-
51-
case .concat:
52-
return self.concat()
71+
case .concurrent(let limit):
72+
return self.concurrent(limit: limit)
5373

5474
case .latest:
5575
return self.switchToLatest()
@@ -87,11 +107,8 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == NoError,
87107
/// - strategy: Strategy used when flattening signals.
88108
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
89109
switch strategy {
90-
case .merge:
91-
return self.merge()
92-
93-
case .concat:
94-
return self.concat()
110+
case .concurrent(let limit):
111+
return self.concurrent(limit: limit)
95112

96113
case .latest:
97114
return self.switchToLatest()
@@ -124,11 +141,8 @@ extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == V
124141
/// `completed` events on inner producers.
125142
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
126143
switch strategy {
127-
case .merge:
128-
return self.merge()
129-
130-
case .concat:
131-
return self.concat()
144+
case .concurrent(let limit):
145+
return self.concurrent(limit: limit)
132146

133147
case .latest:
134148
return self.switchToLatest()
@@ -160,11 +174,8 @@ extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == N
160174
/// `completed` events on inner producers.
161175
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
162176
switch strategy {
163-
case .merge:
164-
return self.merge()
165-
166-
case .concat:
167-
return self.concat()
177+
case .concurrent(let limit):
178+
return self.concurrent(limit: limit)
168179

169180
case .latest:
170181
return self.switchToLatest()
@@ -340,101 +351,6 @@ extension SignalProducerProtocol where Value: PropertyProtocol {
340351
}
341352
}
342353

343-
extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
344-
/// Returns a signal which sends all the values from producer signal emitted
345-
/// from `signal`, waiting until each inner producer completes before
346-
/// beginning to send the values from the next inner producer.
347-
///
348-
/// - note: If any of the inner producers fail, the returned signal will
349-
/// forward that failure immediately
350-
///
351-
/// - note: The returned signal completes only when `signal` and all
352-
/// producers emitted from `signal` complete.
353-
fileprivate func concat() -> Signal<Value.Value, Error> {
354-
return Signal<Value.Value, Error> { relayObserver in
355-
let disposable = CompositeDisposable()
356-
let relayDisposable = CompositeDisposable()
357-
358-
disposable += relayDisposable
359-
disposable += self.observeConcat(relayObserver, relayDisposable)
360-
361-
return disposable
362-
}
363-
}
364-
365-
fileprivate func observeConcat(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable? = nil) -> Disposable? {
366-
let state = Atomic(ConcatState<Value.Value, Error>())
367-
368-
func startNextIfNeeded() {
369-
while let producer = state.modify({ $0.dequeue() }) {
370-
producer.startWithSignal { signal, inner in
371-
let handle = disposable?.add(inner)
372-
373-
signal.observe { event in
374-
switch event {
375-
case .completed, .interrupted:
376-
handle?.remove()
377-
378-
let shouldStart: Bool = state.modify {
379-
$0.active = nil
380-
return !$0.isStarting
381-
}
382-
383-
if shouldStart {
384-
startNextIfNeeded()
385-
}
386-
387-
case .value, .failed:
388-
observer.action(event)
389-
}
390-
}
391-
}
392-
state.modify { $0.isStarting = false }
393-
}
394-
}
395-
396-
return observe { event in
397-
switch event {
398-
case let .value(value):
399-
state.modify { $0.queue.append(value.producer) }
400-
startNextIfNeeded()
401-
402-
case let .failed(error):
403-
observer.send(error: error)
404-
405-
case .completed:
406-
state.modify { state in
407-
state.queue.append(SignalProducer.empty.on(completed: observer.sendCompleted))
408-
}
409-
startNextIfNeeded()
410-
411-
case .interrupted:
412-
observer.sendInterrupted()
413-
}
414-
}
415-
}
416-
}
417-
418-
extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
419-
/// Returns a producer which sends all the values from each producer emitted
420-
/// from `producer`, waiting until each inner producer completes before
421-
/// beginning to send the values from the next inner producer.
422-
///
423-
/// - note: If any of the inner producers emit an error, the returned
424-
/// producer will emit that error.
425-
///
426-
/// - note: The returned producer completes only when `producer` and all
427-
/// producers emitted from `producer` complete.
428-
fileprivate func concat() -> SignalProducer<Value.Value, Error> {
429-
return SignalProducer<Value.Value, Error> { observer, disposable in
430-
self.startWithSignal { signal, signalDisposable in
431-
disposable += signalDisposable
432-
_ = signal.observeConcat(observer, disposable)
433-
}
434-
}
435-
}
436-
}
437-
438354
extension SignalProducerProtocol {
439355
/// `concat`s `next` onto `self`.
440356
public func concat(_ next: SignalProducer<Value, Error>) -> SignalProducer<Value, Error> {
@@ -459,93 +375,111 @@ extension SignalProducerProtocol {
459375
}
460376
}
461377

462-
private final class ConcatState<Value, Error: Swift.Error> {
463-
typealias SignalProducer = ReactiveSwift.SignalProducer<Value, Error>
378+
private final class ConcurrentFlattenState<Value, Error: Swift.Error> {
379+
typealias Producer = ReactiveSwift.SignalProducer<Value, Error>
380+
381+
/// The limit of active producers.
382+
let limit: Int
464383

465-
/// The active producer, if any.
466-
var active: SignalProducer? = nil
384+
/// The number of active producers.
385+
var activeCount = 0
467386

468387
/// The producers waiting to be started.
469-
var queue: [SignalProducer] = []
470-
471-
/// Whether the active producer is currently starting.
472-
/// Used to prevent deep recursion.
473-
var isStarting: Bool = false
388+
var queue: [Producer] = []
389+
390+
/// Whether the outer producer has completed.
391+
var isOuterCompleted = false
392+
393+
/// Whether the flattened signal should complete.
394+
var shouldComplete: Bool {
395+
return isOuterCompleted && activeCount == 0 && queue.isEmpty
396+
}
397+
398+
init(limit: Int) {
399+
self.limit = limit
400+
}
474401

475402
/// Dequeue the next producer if one should be started.
476403
///
477-
/// - note: The caller *must* set `isStarting` to false after the returned
478-
/// producer has been started.
479-
///
480-
/// - returns: The `SignalProducer` to start or `nil` if no producer should
481-
/// be started.
482-
func dequeue() -> SignalProducer? {
483-
if active != nil {
404+
/// - returns: The `Producer` to start or `nil` if no producer should be
405+
/// started.
406+
func dequeue() -> Producer? {
407+
if activeCount < limit, !queue.isEmpty {
408+
activeCount += 1
409+
return queue.removeFirst()
410+
} else {
484411
return nil
485412
}
486-
487-
active = queue.first
488-
if active != nil {
489-
queue.removeFirst()
490-
isStarting = true
491-
}
492-
return active
493413
}
494414
}
495415

496416
extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
497-
/// Merges a `signal` of SignalProducers down into a single signal, biased
498-
/// toward the producer added earlier. Returns a Signal that will forward
499-
/// events from the inner producers as they arrive.
500-
fileprivate func merge() -> Signal<Value.Value, Error> {
417+
fileprivate func concurrent(limit: Int) -> Signal<Value.Value, Error> {
501418
return Signal<Value.Value, Error> { relayObserver in
502419
let disposable = CompositeDisposable()
503420
let relayDisposable = CompositeDisposable()
504421

505422
disposable += relayDisposable
506-
disposable += self.observeMerge(relayObserver, relayDisposable)
423+
disposable += self.observeConcurrent(relayObserver, limit, relayDisposable)
507424

508425
return disposable
509426
}
510427
}
511428

512-
fileprivate func observeMerge(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable) -> Disposable? {
513-
let inFlight = Atomic(1)
514-
let decrementInFlight = {
515-
let shouldComplete: Bool = inFlight.modify {
516-
$0 -= 1
517-
return $0 == 0
518-
}
429+
fileprivate func observeConcurrent(_ observer: Observer<Value.Value, Error>, _ limit: Int, _ disposable: CompositeDisposable) -> Disposable? {
430+
let state = Atomic(ConcurrentFlattenState<Value.Value, Error>(limit: limit))
519431

520-
if shouldComplete {
521-
observer.sendCompleted()
522-
}
523-
}
432+
func startNextIfNeeded() {
433+
while let producer = state.modify({ $0.dequeue() }) {
434+
var isStarting = true
524435

525-
return self.observe { event in
526-
switch event {
527-
case let .value(producer):
528-
producer.startWithSignal { innerSignal, innerDisposable in
529-
inFlight.modify { $0 += 1 }
530-
let handle = disposable.add(innerDisposable)
436+
producer.startWithSignal { signal, inner in
437+
let handle = disposable.add(inner)
531438

532-
innerSignal.observe { event in
439+
signal.observe { event in
533440
switch event {
534441
case .completed, .interrupted:
535442
handle.remove()
536-
decrementInFlight()
443+
444+
let shouldComplete: Bool = state.modify { state in
445+
state.activeCount -= 1
446+
return state.shouldComplete
447+
}
448+
449+
if shouldComplete {
450+
observer.sendCompleted()
451+
} else if !isStarting {
452+
startNextIfNeeded()
453+
}
537454

538455
case .value, .failed:
539456
observer.action(event)
540457
}
541458
}
542459
}
543460

461+
isStarting = false
462+
}
463+
}
464+
465+
return observe { event in
466+
switch event {
467+
case let .value(value):
468+
state.modify { $0.queue.append(value.producer) }
469+
startNextIfNeeded()
470+
544471
case let .failed(error):
545472
observer.send(error: error)
546473

547474
case .completed:
548-
decrementInFlight()
475+
let shouldComplete: Bool = state.modify { state in
476+
state.isOuterCompleted = true
477+
return state.shouldComplete
478+
}
479+
480+
if shouldComplete {
481+
observer.sendCompleted()
482+
}
549483

550484
case .interrupted:
551485
observer.sendInterrupted()
@@ -555,17 +489,13 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
555489
}
556490

557491
extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
558-
/// Merges a `signal` of SignalProducers down into a single signal, biased
559-
/// toward the producer added earlier. Returns a Signal that will forward
560-
/// events from the inner producers as they arrive.
561-
fileprivate func merge() -> SignalProducer<Value.Value, Error> {
492+
fileprivate func concurrent(limit: Int) -> SignalProducer<Value.Value, Error> {
562493
return SignalProducer<Value.Value, Error> { relayObserver, disposable in
563494
self.startWithSignal { signal, signalDisposable in
564495
disposable += signalDisposable
565496

566-
_ = signal.observeMerge(relayObserver, disposable)
497+
_ = signal.observeConcurrent(relayObserver, limit, disposable)
567498
}
568-
569499
}
570500
}
571501
}

0 commit comments

Comments
 (0)