Skip to content

Commit 840259b

Browse files
committed
Optimized producer lifting.
1 parent 02e7cf6 commit 840259b

File tree

1 file changed

+213
-70
lines changed

1 file changed

+213
-70
lines changed

Sources/SignalProducer.swift

Lines changed: 213 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import Dispatch
22
import Foundation
33
import Result
44

5-
/// A SignalProducer creates Signals that can produce values of type `Value`
5+
/// A SignalProducer creates Signals that can produce values of type `Value`
66
/// and/or fail with errors of type `Error`. If no failure should be possible,
77
/// `NoError` can be specified for `Error`.
88
///
@@ -19,7 +19,7 @@ import Result
1919
public struct SignalProducer<Value, Error: Swift.Error> {
2020
public typealias ProducedSignal = Signal<Value, Error>
2121

22-
private let startHandler: (Signal<Value, Error>.Observer, CompositeDisposable) -> Void
22+
private let backing: Backing<Value, Error>
2323

2424
/// Initializes a `SignalProducer` that will emit the same events as the
2525
/// given signal.
@@ -30,9 +30,7 @@ public struct SignalProducer<Value, Error: Swift.Error> {
3030
/// - parameters:
3131
/// - signal: A signal to observe after starting the producer.
3232
public init<S: SignalProtocol>(signal: S) where S.Value == Value, S.Error == Error {
33-
self.init { observer, disposable in
34-
disposable += signal.observe(observer)
35-
}
33+
backing = .signal { (signal.signal, {}) }
3634
}
3735

3836
/// Initializes a SignalProducer that will invoke the given closure once for
@@ -49,7 +47,23 @@ public struct SignalProducer<Value, Error: Swift.Error> {
4947
/// - parameters:
5048
/// - startHandler: A closure that accepts observer and a disposable.
5149
public init(_ startHandler: @escaping (Signal<Value, Error>.Observer, CompositeDisposable) -> Void) {
52-
self.startHandler = startHandler
50+
backing = .defer(startHandler)
51+
}
52+
53+
/// Create a SignalProducer that will invoke the given factory once for each
54+
/// invocation of `start()`.
55+
///
56+
/// - warning: `Signal` should not escape the factory unles absolutely
57+
/// necessary.
58+
///
59+
/// - note: When nesting factories, remember to call the upstream's starting
60+
/// side effect.
61+
///
62+
/// - parameters:
63+
/// - factory: A factory that produces a `Signal` and its associated
64+
/// starting side effects.
65+
internal init(_ factory: @escaping () -> (Signal<Value, Error>, () -> Void)) {
66+
backing = .signal(factory)
5367
}
5468

5569
/// Creates a producer for a `Signal` that will immediately send one value
@@ -162,6 +176,128 @@ public struct SignalProducer<Value, Error: Swift.Error> {
162176
}
163177
}
164178

179+
/// Lift an unary Signal operator to operate upon SignalProducers instead.
180+
///
181+
/// In other words, this will create a new `SignalProducer` which will apply
182+
/// the given `Signal` operator to _every_ created `Signal`, just as if the
183+
/// operator had been applied to each `Signal` yielded from `start()`.
184+
///
185+
/// - parameters:
186+
/// - transform: An unary operator to lift.
187+
///
188+
/// - returns: A signal producer that applies signal's operator to every
189+
/// created signal.
190+
fileprivate func _lift<U, F>(_ transform: @escaping (Signal<Value, Error>) -> Signal<U, F>) -> SignalProducer<U, F> {
191+
switch backing {
192+
case let .defer(body):
193+
return SignalProducer<U, F> {
194+
let producerDisposable = CompositeDisposable()
195+
let (signal, observer) = Signal<Value, Error>.pipe(disposable: producerDisposable)
196+
return (transform(signal), { body(observer, producerDisposable) })
197+
}
198+
199+
case let .signal(factory):
200+
return SignalProducer<U, F> {
201+
let (signal, start) = factory()
202+
return (transform(signal), start)
203+
}
204+
}
205+
}
206+
207+
/// Lift a binary Signal operator to operate upon SignalProducers.
208+
///
209+
/// The start order of the producer must be specified. When both producers are
210+
/// synchronous this order can be important depending on the operator to
211+
/// generate correct results.
212+
///
213+
/// - parameters:
214+
/// - order: The start order of the producers.
215+
///
216+
/// - returns: A factory that creates a SignalProducer with the given operator
217+
/// applied. `self` would be the LHS, and the factory input would
218+
/// be the RHS.
219+
fileprivate func _lift<U, F, V, G>(_ order: StartOrder, _ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
220+
return { otherProducer in
221+
switch (self.backing, otherProducer.backing) {
222+
case (let .defer(leftBody), let .defer(rightBody)):
223+
return SignalProducer<V, G> {
224+
let leftDisposable = CompositeDisposable()
225+
let rightDisposable = CompositeDisposable()
226+
let (leftSignal, leftObserver) = Signal<Value, Error>.pipe(disposable: leftDisposable)
227+
let (rightSignal, rightObserver) = Signal<U, F>.pipe(disposable: rightDisposable)
228+
229+
return (transform(leftSignal)(rightSignal), {
230+
switch order {
231+
case .rightLeft:
232+
rightBody(rightObserver, rightDisposable)
233+
leftBody(leftObserver, leftDisposable)
234+
235+
case .leftRight:
236+
leftBody(leftObserver, leftDisposable)
237+
rightBody(rightObserver, rightDisposable)
238+
}
239+
})
240+
}
241+
242+
case (let .defer(leftBody), let .signal(rightFactory)):
243+
return SignalProducer<V, G> {
244+
let leftDisposable = CompositeDisposable()
245+
let (leftSignal, leftObserver) = Signal<Value, Error>.pipe(disposable: leftDisposable)
246+
let (rightSignal, rightStart) = rightFactory()
247+
248+
return (transform(leftSignal)(rightSignal), {
249+
switch order {
250+
case .rightLeft:
251+
rightStart()
252+
leftBody(leftObserver, leftDisposable)
253+
254+
case .leftRight:
255+
leftBody(leftObserver, leftDisposable)
256+
rightStart()
257+
}
258+
})
259+
}
260+
261+
case (let .signal(leftFactory), let .defer(rightBody)):
262+
return SignalProducer<V, G> {
263+
let (leftSignal, leftStart) = leftFactory()
264+
let rightDisposable = CompositeDisposable()
265+
let (rightSignal, rightObserver) = Signal<U, F>.pipe(disposable: rightDisposable)
266+
267+
return (transform(leftSignal)(rightSignal), {
268+
switch order {
269+
case .rightLeft:
270+
rightBody(rightObserver, rightDisposable)
271+
leftStart()
272+
273+
case .leftRight:
274+
leftStart()
275+
rightBody(rightObserver, rightDisposable)
276+
}
277+
})
278+
}
279+
280+
case (let .signal(leftFactory), let .signal(rightFactory)):
281+
return SignalProducer<V, G> {
282+
let (leftSignal, leftStart) = leftFactory()
283+
let (rightSignal, rightStart) = rightFactory()
284+
285+
return (transform(leftSignal)(rightSignal), {
286+
switch order {
287+
case .rightLeft:
288+
rightStart()
289+
leftStart()
290+
291+
case .leftRight:
292+
leftStart()
293+
rightStart()
294+
}
295+
})
296+
}
297+
}
298+
}
299+
}
300+
165301
/// Create a Signal from the producer, pass it into the given closure,
166302
/// then start sending events on the Signal when the closure has returned.
167303
///
@@ -187,11 +323,47 @@ public struct SignalProducer<Value, Error: Swift.Error> {
187323
if cancelDisposable.isDisposed {
188324
return
189325
}
190-
191-
startHandler(observer, producerDisposable)
326+
327+
switch backing {
328+
case let .defer(body):
329+
body(observer, producerDisposable)
330+
331+
case let .signal(factory):
332+
// The resource cleanup in this mode relies on the `Signal` lifetime
333+
// semantics.
334+
//
335+
// `signal` would be disposed of when `observer` is detached from it,
336+
// causing it to cease its self retainment. Generally, this would trigger
337+
// the propagation of disposal from this end of the Signal graph back to
338+
// the ultimate upstreams, unless any intermediate `Signal` is escaped.
339+
let (signal, start) = factory()
340+
producerDisposable += signal.observe(observer)
341+
start()
342+
}
192343
}
193344
}
194345

346+
/// Used to differentiate variants of SignalProducer start handlers.
347+
internal enum Backing<Value, Error: Swift.Error> {
348+
/// `defer`: The closure is passed directly an event emitter, and a composite
349+
/// disposable to aggregate resource cleanup logic.
350+
case `defer`((_ emitter: Signal<Value, Error>.Observer, _ disposable: CompositeDisposable) -> Void)
351+
352+
/// `signal`: The closure produces a `Signal` that would be observed by the
353+
/// final produced `Signal` in `startWithSignal`. Then the starting
354+
/// side effect associated would be invoked.
355+
case signal(() -> (signal: Signal<Value, Error>, start: () -> Void))
356+
}
357+
358+
/// The start order of `SignalProducer`s in a binary lifted operator.
359+
internal enum StartOrder {
360+
/// Start the left producer first.
361+
case leftRight
362+
363+
/// Start the right producer first.
364+
case rightLeft
365+
}
366+
195367
public protocol SignalProducerProtocol {
196368
/// The type of values being sent on the producer
197369
associatedtype Value
@@ -216,6 +388,39 @@ extension SignalProducer: SignalProducerProtocol {
216388
}
217389
}
218390

391+
extension SignalProducerProtocol {
392+
/// Lift an unary Signal operator to operate upon SignalProducers instead.
393+
///
394+
/// In other words, this will create a new `SignalProducer` which will apply
395+
/// the given `Signal` operator to _every_ created `Signal`, just as if the
396+
/// operator had been applied to each `Signal` yielded from `start()`.
397+
///
398+
/// - parameters:
399+
/// - transform: An unary operator to lift.
400+
///
401+
/// - returns: A signal producer that applies signal's operator to every
402+
/// created signal.
403+
public func lift<U, F>(_ transform: @escaping (Signal<Value, Error>) -> Signal<U, F>) -> SignalProducer<U, F> {
404+
return producer._lift(transform)
405+
}
406+
407+
/// Right-associative lifting of a binary signal operator over producers.
408+
/// That is, the argument producer will be started before the receiver. When
409+
/// both producers are synchronous this order can be important depending on
410+
/// the operator to generate correct results.
411+
fileprivate func liftRight<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
412+
return producer._lift(.rightLeft, transform)
413+
}
414+
415+
/// Left-associative lifting of a binary signal operator over producers.
416+
/// That is, the receiver will be started before the argument producer. When
417+
/// both producers are synchronous this order can be important depending on
418+
/// the operator to generate correct results.
419+
fileprivate func liftLeft<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
420+
return producer._lift(.leftRight, transform)
421+
}
422+
}
423+
219424
extension SignalProducerProtocol {
220425
/// Create a Signal from the producer, then attach the given observer to
221426
/// the `Signal` as an observer.
@@ -353,28 +558,6 @@ extension SignalProducerProtocol where Error == NoError {
353558
}
354559

355560
extension SignalProducerProtocol {
356-
/// Lift an unary Signal operator to operate upon SignalProducers instead.
357-
///
358-
/// In other words, this will create a new `SignalProducer` which will apply
359-
/// the given `Signal` operator to _every_ created `Signal`, just as if the
360-
/// operator had been applied to each `Signal` yielded from `start()`.
361-
///
362-
/// - parameters:
363-
/// - transform: An unary operator to lift.
364-
///
365-
/// - returns: A signal producer that applies signal's operator to every
366-
/// created signal.
367-
public func lift<U, F>(_ transform: @escaping (Signal<Value, Error>) -> Signal<U, F>) -> SignalProducer<U, F> {
368-
return SignalProducer { observer, outerDisposable in
369-
self.startWithSignal { signal, innerDisposable in
370-
outerDisposable += innerDisposable
371-
372-
transform(signal).observe(observer)
373-
}
374-
}
375-
}
376-
377-
378561
/// Lift a binary Signal operator to operate upon SignalProducers instead.
379562
///
380563
/// In other words, this will create a new `SignalProducer` which will apply
@@ -393,46 +576,6 @@ extension SignalProducerProtocol {
393576
return liftRight(transform)
394577
}
395578

396-
/// Right-associative lifting of a binary signal operator over producers.
397-
/// That is, the argument producer will be started before the receiver. When
398-
/// both producers are synchronous this order can be important depending on
399-
/// the operator to generate correct results.
400-
private func liftRight<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
401-
return { otherProducer in
402-
return SignalProducer { observer, outerDisposable in
403-
self.startWithSignal { signal, disposable in
404-
outerDisposable.add(disposable)
405-
406-
otherProducer.startWithSignal { otherSignal, otherDisposable in
407-
outerDisposable += otherDisposable
408-
409-
transform(signal)(otherSignal).observe(observer)
410-
}
411-
}
412-
}
413-
}
414-
}
415-
416-
/// Left-associative lifting of a binary signal operator over producers.
417-
/// That is, the receiver will be started before the argument producer. When
418-
/// both producers are synchronous this order can be important depending on
419-
/// the operator to generate correct results.
420-
private func liftLeft<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
421-
return { otherProducer in
422-
return SignalProducer { observer, outerDisposable in
423-
otherProducer.startWithSignal { otherSignal, otherDisposable in
424-
outerDisposable += otherDisposable
425-
426-
self.startWithSignal { signal, disposable in
427-
outerDisposable.add(disposable)
428-
429-
transform(signal)(otherSignal).observe(observer)
430-
}
431-
}
432-
}
433-
}
434-
}
435-
436579
/// Lift a binary Signal operator to operate upon a Signal and a
437580
/// SignalProducer instead.
438581
///

0 commit comments

Comments
 (0)