Skip to content

Commit 188d148

Browse files
committed
Optimized producer lifting.
1 parent 557dbfe commit 188d148

File tree

1 file changed

+116
-97
lines changed

1 file changed

+116
-97
lines changed

Sources/SignalProducer.swift

Lines changed: 116 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,41 @@ import Dispatch
22
import Foundation
33
import Result
44

5-
/// A SignalProducer creates Signals that can produce values of type `Value`
6-
/// and/or fail with errors of type `Error`. If no failure should be possible,
7-
/// `NoError` can be specified for `Error`.
5+
/// A `SignalProducer` produces a `Signal` with an instance of repeatable work which
6+
/// yields events to the `Signal`, every time it is started with an observer.
87
///
9-
/// SignalProducers can be used to represent operations or tasks, like network
10-
/// requests, where each invocation of `start()` will create a new underlying
11-
/// operation. This ensures that consumers will receive the results, versus a
12-
/// plain Signal, where the results might be sent before any observers are
13-
/// attached.
8+
/// Even if multiple instances of work from the same `SignalProducer` are executing
9+
/// concurrently, produced `Signal`s would see only events from its own instance of work,
10+
/// but not from the others.
1411
///
15-
/// Because of the behavior of `start()`, different Signals created from the
16-
/// producer may see a different version of Events. The Events may arrive in a
17-
/// different order between Signals, or the stream might be completely
18-
/// different!
12+
/// The _work_ performed by a `SignalProducer` is also known as the _post-creation side
13+
/// effect_.
14+
///
15+
/// `SignalProducer`s can be used to represent repeatable operations or tasks — like
16+
/// network requests — that are customizable, lazy and/or on demand.
1917
public struct SignalProducer<Value, Error: Swift.Error> {
2018
public typealias ProducedSignal = Signal<Value, Error>
2119

22-
private let startHandler: (Signal<Value, Error>.Observer, Lifetime) -> Void
20+
/// Wraps a closure which, when invoked, produces a new instance of `Signal`, a
21+
/// customized `didCreate` post-creation side effect for the `Signal` and a disposable
22+
/// to interrupt the produced `Signal`.
23+
///
24+
/// Unlike the safe `startWithSignal(_:)` API, `Builder` shifts the responsibility of
25+
/// invoking the post-creation side effect to the caller, while it takes from the
26+
/// caller the responsibility of the `Signal` creation.
27+
///
28+
/// The design allows producer lifting to be as efficient as native `Signal`
29+
/// operators, by eliminating the unnecessary relay `Signal` imposed by the old
30+
/// `startWithSignal(_:)`, regardless of the fact that lifted operators can rely on
31+
/// the upstreams for producer interruption.
32+
///
33+
/// If the caller is a `Builder`, it must invoke the produced `didCreate` before
34+
/// performing any of its own post-creation side effect.
35+
fileprivate struct Builder {
36+
fileprivate let make: () -> (signal: Signal<Value, Error>, didCreate: () -> Void, interruptHandle: Disposable)
37+
}
38+
39+
fileprivate let builder: Builder
2340

2441
/// Initializes a `SignalProducer` that will emit the same events as the
2542
/// given signal.
@@ -51,7 +68,23 @@ public struct SignalProducer<Value, Error: Swift.Error> {
5168
/// - parameters:
5269
/// - startHandler: A closure that accepts observer and a disposable.
5370
public init(_ startHandler: @escaping (Signal<Value, Error>.Observer, Lifetime) -> Void) {
54-
self.startHandler = startHandler
71+
self.init(Builder {
72+
let disposable = CompositeDisposable()
73+
let (signal, observer) = Signal<Value, Error>.pipe(disposable: disposable)
74+
let didCreate = { startHandler(observer, Lifetime(disposable)) }
75+
let interruptHandle = AnyDisposable(observer.sendInterrupted)
76+
77+
return (signal, didCreate, interruptHandle)
78+
})
79+
}
80+
81+
/// Create a SignalProducer that will invoke the given factory once for each
82+
/// invocation of `start()`.
83+
///
84+
/// - parameters:
85+
/// - builder: A builder that is used by `startWithSignal` to create new `Signal`s.
86+
fileprivate init(_ builder: Builder) {
87+
self.builder = builder
5588
}
5689

5790
/// Creates a producer for a `Signal` that will immediately send one value
@@ -174,7 +207,9 @@ public struct SignalProducer<Value, Error: Swift.Error> {
174207

175208
/// A producer for a Signal that never sends any events to its observers.
176209
public static var never: SignalProducer {
177-
return self.init { _ in return }
210+
return self.init { observer, lifetime in
211+
lifetime.observeEnded { _ = observer }
212+
}
178213
}
179214

180215
/// Create a `Signal` from `self`, pass it into the given closure, and start the
@@ -185,23 +220,10 @@ public struct SignalProducer<Value, Error: Swift.Error> {
185220
/// `Signal` commences. Both the produced `Signal` and an interrupt handle
186221
/// of the signal would be passed to the closure.
187222
public func startWithSignal(_ setup: (_ signal: Signal<Value, Error>, _ interruptHandle: Disposable) -> Void) {
188-
// Disposes of the work associated with the SignalProducer and any
189-
// upstream producers.
190-
let producerDisposable = CompositeDisposable()
191-
192-
let (signal, observer) = Signal<Value, Error>.pipe(disposable: producerDisposable)
193-
194-
// Directly disposed of when `start()` or `startWithSignal()` is
195-
// disposed.
196-
let cancelDisposable = AnyDisposable(observer.sendInterrupted)
197-
198-
setup(signal, cancelDisposable)
199-
200-
if cancelDisposable.isDisposed {
201-
return
202-
}
203-
204-
startHandler(observer, Lifetime(producerDisposable))
223+
let (signal, didCreate, interruptHandle) = builder.make()
224+
setup(signal, interruptHandle)
225+
guard !interruptHandle.isDisposed else { return }
226+
didCreate()
205227
}
206228
}
207229

@@ -382,14 +404,58 @@ extension SignalProducer {
382404
/// - returns: A signal producer that applies signal's operator to every
383405
/// created signal.
384406
public func lift<U, F>(_ transform: @escaping (Signal<Value, Error>) -> Signal<U, F>) -> SignalProducer<U, F> {
385-
return SignalProducer<U, F> { observer, lifetime in
386-
self.startWithSignal { signal, innerDisposable in
387-
lifetime.observeEnded(innerDisposable.dispose)
388-
transform(signal).observe(observer)
389-
}
407+
return SignalProducer<U, F>(SignalProducer<U, F>.Builder {
408+
// Transform the `Signal`, and pass through the `didCreate` side effect and
409+
// the interruptHandle.
410+
let (signal, didCreate, interruptHandle) = self.producer.builder.make()
411+
return (signal: transform(signal), didCreate: didCreate, interruptHandle: interruptHandle)
412+
})
413+
}
414+
415+
/// Lift a binary Signal operator to operate upon SignalProducers.
416+
///
417+
/// The left producer would first be started. When both producers are synchronous this
418+
/// order can be important depending on the operator to generate correct results.
419+
///
420+
/// - returns: A factory that creates a SignalProducer with the given operator
421+
/// applied. `self` would be the LHS, and the factory input would
422+
/// be the RHS.
423+
fileprivate func liftLeft<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
424+
return lift(leftFirst: true, transform)
425+
}
426+
427+
/// Lift a binary Signal operator to operate upon SignalProducers.
428+
///
429+
/// The right producer would first be started. When both producers are synchronous
430+
/// this order can be important depending on the operator to generate correct results.
431+
///
432+
/// - returns: A factory that creates a SignalProducer with the given operator
433+
/// applied. `self` would be the LHS, and the factory input would
434+
/// be the RHS.
435+
fileprivate func liftRight<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
436+
return lift(leftFirst: false, transform)
437+
}
438+
439+
private func lift<U, F, V, G>(leftFirst: Bool, _ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
440+
return { otherProducer in
441+
return SignalProducer<V, G>(SignalProducer<V, G>.Builder {
442+
let (left, didCreateLeft, leftInterrupter) = self.producer.builder.make()
443+
let (right, didCreateRight, rightInterrupter) = otherProducer.builder.make()
444+
445+
return (signal: transform(left)(right),
446+
didCreate: {
447+
if leftFirst {
448+
didCreateLeft()
449+
didCreateRight()
450+
} else {
451+
didCreateRight()
452+
didCreateLeft()
453+
}
454+
},
455+
interruptHandle: CompositeDisposable([leftInterrupter, rightInterrupter]))
456+
})
390457
}
391458
}
392-
393459

394460
/// Lift a binary Signal operator to operate upon SignalProducers instead.
395461
///
@@ -409,46 +475,6 @@ extension SignalProducer {
409475
return liftRight(transform)
410476
}
411477

412-
/// Right-associative lifting of a binary signal operator over producers.
413-
/// That is, the argument producer will be started before the receiver. When
414-
/// both producers are synchronous this order can be important depending on
415-
/// the operator to generate correct results.
416-
fileprivate func liftRight<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
417-
return { otherProducer in
418-
return SignalProducer<V, G> { observer, lifetime in
419-
self.startWithSignal { signal, disposable in
420-
lifetime.observeEnded(disposable.dispose)
421-
422-
otherProducer.startWithSignal { otherSignal, otherDisposable in
423-
lifetime.observeEnded(otherDisposable.dispose)
424-
425-
transform(signal)(otherSignal).observe(observer)
426-
}
427-
}
428-
}
429-
}
430-
}
431-
432-
/// Left-associative lifting of a binary signal operator over producers.
433-
/// That is, the receiver will be started before the argument producer. When
434-
/// both producers are synchronous this order can be important depending on
435-
/// the operator to generate correct results.
436-
fileprivate func liftLeft<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
437-
return { otherProducer in
438-
return SignalProducer<V, G> { observer, lifetime in
439-
otherProducer.startWithSignal { otherSignal, otherDisposable in
440-
lifetime.observeEnded(otherDisposable.dispose)
441-
442-
self.startWithSignal { signal, disposable in
443-
lifetime.observeEnded(disposable.dispose)
444-
445-
transform(signal)(otherSignal).observe(observer)
446-
}
447-
}
448-
}
449-
}
450-
}
451-
452478
/// Lift a binary Signal operator to operate upon a Signal and a
453479
/// SignalProducer instead.
454480
///
@@ -1551,25 +1577,18 @@ extension SignalProducer {
15511577
disposed: (() -> Void)? = nil,
15521578
value: ((Value) -> Void)? = nil
15531579
) -> SignalProducer<Value, Error> {
1554-
return SignalProducer { observer, lifetime in
1555-
starting?()
1556-
defer { started?() }
1557-
1558-
self.startWithSignal { signal, disposable in
1559-
lifetime.observeEnded(disposable.dispose)
1560-
signal
1561-
.on(
1562-
event: event,
1563-
failed: failed,
1564-
completed: completed,
1565-
interrupted: interrupted,
1566-
terminated: terminated,
1567-
disposed: disposed,
1568-
value: value
1569-
)
1570-
.observe(observer)
1571-
}
1572-
}
1580+
return SignalProducer(Builder {
1581+
let (signal, start, interruptHandle) = self.producer.builder.make()
1582+
return (signal: signal.on(event: event,
1583+
failed: failed,
1584+
completed: completed,
1585+
interrupted: interrupted,
1586+
terminated: terminated,
1587+
disposed: disposed,
1588+
value: value),
1589+
didCreate: { starting?(); start(); started?() },
1590+
interruptHandle: interruptHandle)
1591+
})
15731592
}
15741593

15751594
/// Start the returned producer on the given `Scheduler`.

0 commit comments

Comments
 (0)