Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 92 additions & 162 deletions Sources/Flatten.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,25 @@ public enum FlattenStrategy: Equatable {
///
/// The resulting producer will complete only when all inputs have
/// completed.
case merge
public static let merge = FlattenStrategy.concurrent(limit: .max)

/// The producers should be concatenated, so that their values are sent in
/// the order of the producers themselves.
///
/// The resulting producer will complete only when all inputs have
/// completed.
case concat
public static let concat = FlattenStrategy.concurrent(limit: 1)

/// The producers should be merged, but only up to the given limit at any
/// point of time, so that any value received on any of the input producers
/// will be forwarded immediately to the output producer.
///
/// When the number of active producers reaches the limit, subsequent
/// producers are queued.
///
/// The resulting producer will complete only when all inputs have
/// completed.
case concurrent(limit: UInt)

/// Only the events from the latest input producer should be considered for
/// the output. Any producers received before that point will be disposed
Expand All @@ -31,8 +42,20 @@ public enum FlattenStrategy: Equatable {
/// The resulting producer will complete only when the producer-of-producers
/// and the latest producer has completed.
case latest
}

public static func ==(left: FlattenStrategy, right: FlattenStrategy) -> Bool {
switch (left, right) {
case (.latest, .latest):
return true

case (.concurrent(let leftLimit), .concurrent(let rightLimit)):
return leftLimit == rightLimit

default:
return false
}
}
}

extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
/// Flattens the inner producers sent upon `signal` (into a single signal of
Expand All @@ -45,11 +68,8 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
/// `Completed events on inner producers.
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
switch strategy {
case .merge:
return self.merge()

case .concat:
return self.concat()
case .concurrent(let limit):
return self.concurrent(limit: limit)

case .latest:
return self.switchToLatest()
Expand Down Expand Up @@ -87,11 +107,8 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == NoError,
/// - strategy: Strategy used when flattening signals.
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
switch strategy {
case .merge:
return self.merge()

case .concat:
return self.concat()
case .concurrent(let limit):
return self.concurrent(limit: limit)

case .latest:
return self.switchToLatest()
Expand Down Expand Up @@ -124,11 +141,8 @@ extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == V
/// `completed` events on inner producers.
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
switch strategy {
case .merge:
return self.merge()

case .concat:
return self.concat()
case .concurrent(let limit):
return self.concurrent(limit: limit)

case .latest:
return self.switchToLatest()
Expand Down Expand Up @@ -160,11 +174,8 @@ extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == N
/// `completed` events on inner producers.
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
switch strategy {
case .merge:
return self.merge()

case .concat:
return self.concat()
case .concurrent(let limit):
return self.concurrent(limit: limit)

case .latest:
return self.switchToLatest()
Expand Down Expand Up @@ -341,46 +352,41 @@ extension SignalProducerProtocol where Value: PropertyProtocol {
}

extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
/// Returns a signal which sends all the values from producer signal emitted
/// from `signal`, waiting until each inner producer completes before
/// beginning to send the values from the next inner producer.
///
/// - note: If any of the inner producers fail, the returned signal will
/// forward that failure immediately
///
/// - note: The returned signal completes only when `signal` and all
/// producers emitted from `signal` complete.
fileprivate func concat() -> Signal<Value.Value, Error> {
fileprivate func concurrent(limit: UInt) -> Signal<Value.Value, Error> {
return Signal<Value.Value, Error> { relayObserver in
let disposable = CompositeDisposable()
let relayDisposable = CompositeDisposable()

disposable += relayDisposable
disposable += self.observeConcat(relayObserver, relayDisposable)
disposable += self.observeConcurrent(relayObserver, limit, relayDisposable)

return disposable
}
}
fileprivate func observeConcat(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable? = nil) -> Disposable? {
let state = Atomic(ConcatState<Value.Value, Error>())

fileprivate func observeConcurrent(_ observer: Observer<Value.Value, Error>, _ limit: UInt, _ disposable: CompositeDisposable) -> Disposable? {
let state = Atomic(ConcurrentFlattenState<Value.Value, Error>(limit: limit))

func startNextIfNeeded() {
while let producer = state.modify({ $0.dequeue() }) {
var isStarting = true

producer.startWithSignal { signal, inner in
let handle = disposable?.add(inner)
let handle = disposable.add(inner)

signal.observe { event in
switch event {
case .completed, .interrupted:
handle?.remove()
let shouldStart: Bool = state.modify {
$0.active = nil
return !$0.isStarting
handle.remove()

let shouldComplete: Bool = state.modify { state in
state.activeCount -= 1
return state.shouldComplete
}

if shouldStart {
if shouldComplete {
observer.sendCompleted()
} else if !isStarting {
startNextIfNeeded()
}

Expand All @@ -389,10 +395,11 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
}
}
}
state.modify { $0.isStarting = false }

isStarting = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is assigning to a Bool an atomic operation? This write could happen concurrently with the read above. I think it needs to at least be in its own Atomic.

}
}

return observe { event in
switch event {
case let .value(value):
Expand All @@ -403,11 +410,15 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
observer.send(error: error)

case .completed:
state.modify { state in
state.queue.append(SignalProducer.empty.on(completed: observer.sendCompleted))
let shouldComplete: Bool = state.modify { state in
state.isOuterCompleted = true
return state.shouldComplete
}
startNextIfNeeded()


if shouldComplete {
observer.sendCompleted()
}

case .interrupted:
observer.sendInterrupted()
}
Expand All @@ -416,20 +427,12 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
}

extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
/// Returns a producer which sends all the values from each producer emitted
/// from `producer`, waiting until each inner producer completes before
/// beginning to send the values from the next inner producer.
///
/// - note: If any of the inner producers emit an error, the returned
/// producer will emit that error.
///
/// - note: The returned producer completes only when `producer` and all
/// producers emitted from `producer` complete.
fileprivate func concat() -> SignalProducer<Value.Value, Error> {
return SignalProducer<Value.Value, Error> { observer, disposable in
fileprivate func concurrent(limit: UInt) -> SignalProducer<Value.Value, Error> {
return SignalProducer<Value.Value, Error> { relayObserver, disposable in
self.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
_ = signal.observeConcat(observer, disposable)

_ = signal.observeConcurrent(relayObserver, limit, disposable)
}
}
}
Expand Down Expand Up @@ -459,113 +462,40 @@ extension SignalProducerProtocol {
}
}

private final class ConcatState<Value, Error: Swift.Error> {
typealias SignalProducer = ReactiveSwift.SignalProducer<Value, Error>
private final class ConcurrentFlattenState<Value, Error: Swift.Error> {
typealias Producer = ReactiveSwift.SignalProducer<Value, Error>

/// The limit of active producers.
let limit: UInt

/// The active producer, if any.
var active: SignalProducer? = nil
/// The number of active producers.
var activeCount: UInt = 0

/// The producers waiting to be started.
var queue: [SignalProducer] = []

/// Whether the active producer is currently starting.
/// Used to prevent deep recursion.
var isStarting: Bool = false

/// Dequeue the next producer if one should be started.
///
/// - note: The caller *must* set `isStarting` to false after the returned
/// producer has been started.
///
/// - returns: The `SignalProducer` to start or `nil` if no producer should
/// be started.
func dequeue() -> SignalProducer? {
if active != nil {
return nil
}

active = queue.first
if active != nil {
queue.removeFirst()
isStarting = true
}
return active
}
}
var queue: [Producer] = []

extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
/// Merges a `signal` of SignalProducers down into a single signal, biased
/// toward the producer added earlier. Returns a Signal that will forward
/// events from the inner producers as they arrive.
fileprivate func merge() -> Signal<Value.Value, Error> {
return Signal<Value.Value, Error> { relayObserver in
let disposable = CompositeDisposable()
let relayDisposable = CompositeDisposable()
/// Whether the outer producer has completed.
var isOuterCompleted = false

disposable += relayDisposable
disposable += self.observeMerge(relayObserver, relayDisposable)

return disposable
}
/// Whether the flattened signal should complete.
var shouldComplete: Bool {
return isOuterCompleted && activeCount == 0 && queue.isEmpty
}

fileprivate func observeMerge(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable) -> Disposable? {
let inFlight = Atomic(1)
let decrementInFlight = {
let shouldComplete: Bool = inFlight.modify {
$0 -= 1
return $0 == 0
}

if shouldComplete {
observer.sendCompleted()
}
}

return self.observe { event in
switch event {
case let .value(producer):
producer.startWithSignal { innerSignal, innerDisposable in
inFlight.modify { $0 += 1 }
let handle = disposable.add(innerDisposable)

innerSignal.observe { event in
switch event {
case .completed, .interrupted:
handle.remove()
decrementInFlight()

case .value, .failed:
observer.action(event)
}
}
}

case let .failed(error):
observer.send(error: error)

case .completed:
decrementInFlight()

case .interrupted:
observer.sendInterrupted()
}
}
init(limit: UInt) {
self.limit = limit
}
}

extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
/// Merges a `signal` of SignalProducers down into a single signal, biased
/// toward the producer added earlier. Returns a Signal that will forward
/// events from the inner producers as they arrive.
fileprivate func merge() -> SignalProducer<Value.Value, Error> {
return SignalProducer<Value.Value, Error> { relayObserver, disposable in
self.startWithSignal { signal, signalDisposable in
disposable += signalDisposable

_ = signal.observeMerge(relayObserver, disposable)
}


/// Dequeue the next producer if one should be started.
///
/// - returns: The `Producer` to start or `nil` if no producer should be
/// started.
func dequeue() -> Producer? {
if activeCount < limit, !queue.isEmpty {
activeCount += 1
return queue.removeFirst()
} else {
return nil
}
}
}
Expand Down