Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 107 additions & 84 deletions Sources/SignalProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import Dispatch
import Foundation
import Result

/// A SignalProducer creates Signals that can produce values of type `Value`
/// and/or fail with errors of type `Error`. If no failure should be possible,
/// A SignalProducer creates Signals that can produce values of type `Value`
/// and/or fail with errors of type `Error`. If no failure should be possible,
/// `NoError` can be specified for `Error`.
///
/// SignalProducers can be used to represent operations or tasks, like network
Expand All @@ -19,7 +19,27 @@ import Result
public struct SignalProducer<Value, Error: Swift.Error> {
public typealias ProducedSignal = Signal<Value, Error>

private let startHandler: (Signal<Value, Error>.Observer, Lifetime) -> Void
/// Wraps a closure which, when invoked, produces a new instance of `Signal`, a
/// customized `observerDidSetup` post-creation side effect for the `Signal` and a
/// disposable to interrupt the produced `Signal`.
///
/// Unlike the safe `startWithSignal(_:)` API, `builder` shifts the responsibility of
/// invoking the post-creation side effect to the caller, while it takes from the
/// caller the responsibility of the `Signal` creation.
///
/// The design allows producer lifting to be as efficient as native `Signal`
/// operators, by eliminating the unnecessary relay `Signal` imposed by the old
/// `startWithSignal(_:)`, regardless of the fact that lifted operators can rely on
/// the upstreams for producer interruption.
///
/// `observerDidSetup` must be invoked before any other post-creation side effect.
fileprivate struct Instance {
let producedSignal: Signal<Value, Error>
let observerDidSetup: () -> Void
let interruptHandle: Disposable
}

fileprivate let builder: () -> Instance

/// Initializes a `SignalProducer` that will emit the same events as the
/// given signal.
Expand Down Expand Up @@ -51,7 +71,23 @@ public struct SignalProducer<Value, Error: Swift.Error> {
/// - parameters:
/// - startHandler: A closure that accepts observer and a disposable.
public init(_ startHandler: @escaping (Signal<Value, Error>.Observer, Lifetime) -> Void) {
self.startHandler = startHandler
self.init { _ -> Instance in
let disposable = CompositeDisposable()
let (signal, observer) = Signal<Value, Error>.pipe(disposable: disposable)
let observerDidSetup = { startHandler(observer, Lifetime(disposable)) }
let interruptHandle = AnyDisposable(observer.sendInterrupted)

return Instance(producedSignal: signal, observerDidSetup: observerDidSetup, interruptHandle: interruptHandle)
}
}

/// Create a SignalProducer that will invoke the given factory once for each
/// invocation of `start()`.
///
/// - parameters:
/// - builder: A builder that is used by `startWithSignal` to create new `Signal`s.
fileprivate init(_ builder: @escaping () -> Instance) {
self.builder = builder
}

/// Creates a producer for a `Signal` that will immediately send one value
Expand Down Expand Up @@ -187,23 +223,10 @@ public struct SignalProducer<Value, Error: Swift.Error> {
/// `Signal` commences. Both the produced `Signal` and an interrupt handle
/// of the signal would be passed to the closure.
public func startWithSignal(_ setup: (_ signal: Signal<Value, Error>, _ interruptHandle: Disposable) -> Void) {
// Disposes of the work associated with the SignalProducer and any
// upstream producers.
let producerDisposable = CompositeDisposable()

let (signal, observer) = Signal<Value, Error>.pipe(disposable: producerDisposable)

// Directly disposed of when `start()` or `startWithSignal()` is
// disposed.
let cancelDisposable = AnyDisposable(observer.sendInterrupted)

setup(signal, cancelDisposable)

if cancelDisposable.isDisposed {
return
}

startHandler(observer, Lifetime(producerDisposable))
let instance = builder()
setup(instance.producedSignal, instance.interruptHandle)
guard !instance.interruptHandle.isDisposed else { return }
instance.observerDidSetup()
}
}

Expand Down Expand Up @@ -384,14 +407,59 @@ extension SignalProducer {
/// - returns: A signal producer that applies signal's operator to every
/// created signal.
public func lift<U, F>(_ transform: @escaping (Signal<Value, Error>) -> Signal<U, F>) -> SignalProducer<U, F> {
return SignalProducer<U, F> { observer, lifetime in
self.startWithSignal { signal, innerDisposable in
lifetime.observeEnded(innerDisposable.dispose)
transform(signal).observe(observer)
return SignalProducer<U, F> { _ -> SignalProducer<U, F>.Instance in
// Transform the `Signal`, and pass through the `didCreate` side effect and
// the interruptHandle.
let instance = self.producer.builder()
return SignalProducer<U, F>.Instance(producedSignal: transform(instance.producedSignal),
observerDidSetup: instance.observerDidSetup,
interruptHandle: instance.interruptHandle)
}
}

/// Lift a binary Signal operator to operate upon SignalProducers.
///
/// The left producer would first be started. When both producers are synchronous this
/// order can be important depending on the operator to generate correct results.
///
/// - returns: A factory that creates a SignalProducer with the given operator
/// applied. `self` would be the LHS, and the factory input would
/// be the RHS.
fileprivate func liftLeft<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
return lift(leftFirst: true, transform)
}

/// Lift a binary Signal operator to operate upon SignalProducers.
///
/// The right producer would first be started. When both producers are synchronous
/// this order can be important depending on the operator to generate correct results.
///
/// - returns: A factory that creates a SignalProducer with the given operator
/// applied. `self` would be the LHS, and the factory input would
/// be the RHS.
fileprivate func liftRight<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
return lift(leftFirst: false, transform)
}

private func lift<U, F, V, G>(leftFirst: Bool, _ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
return { otherProducer in
return SignalProducer<V, G> { _ -> SignalProducer<V, G>.Instance in
let left = self.producer.builder()
let right = otherProducer.builder()

return .init(producedSignal: transform(left.producedSignal)(right.producedSignal),
observerDidSetup: {
if leftFirst {
left.observerDidSetup()
right.observerDidSetup()
} else {
right.observerDidSetup()
left.observerDidSetup()
}},
interruptHandle: CompositeDisposable([left.interruptHandle, right.interruptHandle]))
}
}
}


/// Lift a binary Signal operator to operate upon SignalProducers instead.
///
Expand All @@ -411,46 +479,6 @@ extension SignalProducer {
return liftRight(transform)
}

/// Right-associative lifting of a binary signal operator over producers.
/// That is, the argument producer will be started before the receiver. When
/// both producers are synchronous this order can be important depending on
/// the operator to generate correct results.
fileprivate func liftRight<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
return { otherProducer in
return SignalProducer<V, G> { observer, lifetime in
self.startWithSignal { signal, disposable in
lifetime.observeEnded(disposable.dispose)

otherProducer.startWithSignal { otherSignal, otherDisposable in
lifetime.observeEnded(otherDisposable.dispose)

transform(signal)(otherSignal).observe(observer)
}
}
}
}
}

/// Left-associative lifting of a binary signal operator over producers.
/// That is, the receiver will be started before the argument producer. When
/// both producers are synchronous this order can be important depending on
/// the operator to generate correct results.
fileprivate func liftLeft<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
return { otherProducer in
return SignalProducer<V, G> { observer, lifetime in
otherProducer.startWithSignal { otherSignal, otherDisposable in
lifetime.observeEnded(otherDisposable.dispose)

self.startWithSignal { signal, disposable in
lifetime.observeEnded(disposable.dispose)

transform(signal)(otherSignal).observe(observer)
}
}
}
}
}

/// Lift a binary Signal operator to operate upon a Signal and a
/// SignalProducer instead.
///
Expand Down Expand Up @@ -1568,24 +1596,19 @@ extension SignalProducer {
disposed: (() -> Void)? = nil,
value: ((Value) -> Void)? = nil
) -> SignalProducer<Value, Error> {
return SignalProducer { observer, lifetime in
starting?()
defer { started?() }

self.startWithSignal { signal, disposable in
lifetime.observeEnded(disposable.dispose)
signal
.on(
event: event,
failed: failed,
completed: completed,
interrupted: interrupted,
terminated: terminated,
disposed: disposed,
value: value
)
.observe(observer)
}
return SignalProducer { _ -> Instance in
let instance = self.producer.builder()
let signal = instance.producedSignal.on(event: event,
failed: failed,
completed: completed,
interrupted: interrupted,
terminated: terminated,
disposed: disposed,
value: value)

return Instance(producedSignal: signal,
observerDidSetup: { starting?(); instance.observerDidSetup(); started?() },
interruptHandle: instance.interruptHandle)
}
}

Expand Down
21 changes: 21 additions & 0 deletions Tests/ReactiveSwiftTests/SignalProducerSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,27 @@ class SignalProducerSpec: QuickSpec {
startDisposable.dispose()
expect(addedDisposable.isDisposed) == true
}

it("should deliver the interrupted event with respect to the applied asynchronous operators") {
let scheduler = TestScheduler()
var signalInterrupted = false
var observerInterrupted = false

let (signal, _) = Signal<Int, NoError>.pipe()

SignalProducer(signal)
.observe(on: scheduler)
.on(interrupted: { signalInterrupted = true })
.startWithInterrupted { observerInterrupted = true }
.dispose()

expect(signalInterrupted) == false
expect(observerInterrupted) == false

scheduler.run()
expect(signalInterrupted) == true
expect(observerInterrupted) == true
}
}

describe("init(signal:)") {
Expand Down