Skip to content

Commit acc33d7

Browse files
committed
Optimized producer lifting.
1 parent a323ced commit acc33d7

File tree

1 file changed

+100
-72
lines changed

1 file changed

+100
-72
lines changed

Sources/SignalProducer.swift

Lines changed: 100 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import Dispatch
22
import Foundation
33
import Result
44

5-
/// A SignalProducer creates Signals that can produce values of type `Value`
5+
/// A SignalProducer creates Signals that can produce values of type `Value`
66
/// and/or fail with errors of type `Error`. If no failure should be possible,
77
/// `NoError` can be specified for `Error`.
88
///
@@ -19,7 +19,11 @@ import Result
1919
public struct SignalProducer<Value, Error: Swift.Error> {
2020
public typealias ProducedSignal = Signal<Value, Error>
2121

22-
private let startHandler: (Signal<Value, Error>.Observer, CompositeDisposable) -> Void
22+
fileprivate struct Builder {
23+
fileprivate let make: () -> (Signal<Value, Error>, () -> Void, Disposable)
24+
}
25+
26+
fileprivate let builder: Builder
2327

2428
/// Initializes a `SignalProducer` that will emit the same events as the
2529
/// given signal.
@@ -49,7 +53,23 @@ public struct SignalProducer<Value, Error: Swift.Error> {
4953
/// - parameters:
5054
/// - startHandler: A closure that accepts observer and a disposable.
5155
public init(_ startHandler: @escaping (Signal<Value, Error>.Observer, CompositeDisposable) -> Void) {
52-
self.startHandler = startHandler
56+
self.init(Builder {
57+
let disposable = CompositeDisposable()
58+
let (signal, observer) = Signal<Value, Error>.pipe(disposable: disposable)
59+
let start = { startHandler(observer, disposable) }
60+
let interrupter = ActionDisposable(action: observer.sendInterrupted)
61+
62+
return (signal, start, interrupter)
63+
})
64+
}
65+
66+
/// Create a SignalProducer that will invoke the given factory once for each
67+
/// invocation of `start()`.
68+
///
69+
/// - parameters:
70+
/// - builder: A builder that creates a new `Signal`.
71+
fileprivate init(_ builder: Builder) {
72+
self.builder = builder
5373
}
5474

5575
/// Creates a producer for a `Signal` that will immediately send one value
@@ -169,19 +189,22 @@ public struct SignalProducer<Value, Error: Swift.Error> {
169189
// upstream producers.
170190
let producerDisposable = CompositeDisposable()
171191

172-
let (signal, observer) = Signal<Value, Error>.pipe(disposable: producerDisposable)
192+
let (relay, relayObserver) = Signal<Value, Error>.pipe(disposable: producerDisposable)
173193

174194
// Directly disposed of when `start()` or `startWithSignal()` is
175195
// disposed.
176-
let cancelDisposable = ActionDisposable(action: observer.sendInterrupted)
196+
let cancelDisposable = ActionDisposable(action: relayObserver.sendInterrupted)
177197

178-
setup(signal, cancelDisposable)
198+
setup(relay, cancelDisposable)
179199

180200
if cancelDisposable.isDisposed {
181201
return
182202
}
183203

184-
startHandler(observer, producerDisposable)
204+
let (source, start, interrupter) = builder.make()
205+
producerDisposable += interrupter
206+
source.observe(relayObserver)
207+
start()
185208
}
186209
}
187210

@@ -352,15 +375,67 @@ extension SignalProducer {
352375
/// - returns: A signal producer that applies signal's operator to every
353376
/// created signal.
354377
public func lift<U, F>(_ transform: @escaping (Signal<Value, Error>) -> Signal<U, F>) -> SignalProducer<U, F> {
355-
return SignalProducer<U, F> { observer, outerDisposable in
356-
self.startWithSignal { signal, innerDisposable in
357-
outerDisposable += innerDisposable
378+
return SignalProducer<U, F>(.init {
379+
let (signal, start, interrupter) = self.producer.builder.make()
380+
return (transform(signal), start, interrupter)
381+
})
382+
}
358383

359-
transform(signal).observe(observer)
360-
}
384+
/// Lift a binary Signal operator to operate upon SignalProducers.
385+
///
386+
/// The start order of the producer must be specified. When both producers are
387+
/// synchronous this order can be important depending on the operator to
388+
/// generate correct results.
389+
///
390+
/// - parameters:
391+
/// - order: The start order of the producers.
392+
///
393+
/// - returns: A factory that creates a SignalProducer with the given operator
394+
/// applied. `self` would be the LHS, and the factory input would
395+
/// be the RHS.
396+
fileprivate func liftLeft<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
397+
return lift(leftFirst: true, transform)
398+
}
399+
400+
/// Lift a binary Signal operator to operate upon SignalProducers.
401+
///
402+
/// The start order of the producer must be specified. When both producers are
403+
/// synchronous this order can be important depending on the operator to
404+
/// generate correct results.
405+
///
406+
/// - parameters:
407+
/// - order: The start order of the producers.
408+
///
409+
/// - returns: A factory that creates a SignalProducer with the given operator
410+
/// applied. `self` would be the LHS, and the factory input would
411+
/// be the RHS.
412+
fileprivate func liftRight<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
413+
return lift(leftFirst: false, transform)
414+
}
415+
416+
private func lift<U, F, V, G>(leftFirst: Bool, _ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
417+
return { otherProducer in
418+
return SignalProducer<V, G>(.init {
419+
let (leftSignal, leftStart, leftInterrupter) = self.producer.builder.make()
420+
let (rightSignal, rightStart, rightInterrupter) = otherProducer.builder.make()
421+
422+
let start = {
423+
if leftFirst {
424+
leftStart()
425+
rightStart()
426+
} else {
427+
rightStart()
428+
leftStart()
429+
}
430+
}
431+
432+
return (transform(leftSignal)(rightSignal), start, ActionDisposable {
433+
leftInterrupter.dispose()
434+
rightInterrupter.dispose()
435+
})
436+
})
361437
}
362438
}
363-
364439

365440
/// Lift a binary Signal operator to operate upon SignalProducers instead.
366441
///
@@ -380,46 +455,6 @@ extension SignalProducer {
380455
return liftRight(transform)
381456
}
382457

383-
/// Right-associative lifting of a binary signal operator over producers.
384-
/// That is, the argument producer will be started before the receiver. When
385-
/// both producers are synchronous this order can be important depending on
386-
/// the operator to generate correct results.
387-
private func liftRight<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
388-
return { otherProducer in
389-
return SignalProducer<V, G> { observer, outerDisposable in
390-
self.startWithSignal { signal, disposable in
391-
outerDisposable.add(disposable)
392-
393-
otherProducer.startWithSignal { otherSignal, otherDisposable in
394-
outerDisposable += otherDisposable
395-
396-
transform(signal)(otherSignal).observe(observer)
397-
}
398-
}
399-
}
400-
}
401-
}
402-
403-
/// Left-associative lifting of a binary signal operator over producers.
404-
/// That is, the receiver will be started before the argument producer. When
405-
/// both producers are synchronous this order can be important depending on
406-
/// the operator to generate correct results.
407-
fileprivate func liftLeft<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
408-
return { otherProducer in
409-
return SignalProducer<V, G> { observer, outerDisposable in
410-
otherProducer.startWithSignal { otherSignal, otherDisposable in
411-
outerDisposable += otherDisposable
412-
413-
self.startWithSignal { signal, disposable in
414-
outerDisposable.add(disposable)
415-
416-
transform(signal)(otherSignal).observe(observer)
417-
}
418-
}
419-
}
420-
}
421-
}
422-
423458
/// Lift a binary Signal operator to operate upon a Signal and a
424459
/// SignalProducer instead.
425460
///
@@ -1437,25 +1472,18 @@ extension SignalProducer {
14371472
disposed: (() -> Void)? = nil,
14381473
value: ((Value) -> Void)? = nil
14391474
) -> SignalProducer<Value, Error> {
1440-
return SignalProducer { observer, compositeDisposable in
1441-
starting?()
1442-
defer { started?() }
1443-
1444-
self.startWithSignal { signal, disposable in
1445-
compositeDisposable += disposable
1446-
signal
1447-
.on(
1448-
event: event,
1449-
failed: failed,
1450-
completed: completed,
1451-
interrupted: interrupted,
1452-
terminated: terminated,
1453-
disposed: disposed,
1454-
value: value
1455-
)
1456-
.observe(observer)
1457-
}
1458-
}
1475+
return SignalProducer<Value, Error>(Builder {
1476+
let (signal, start, interrupter) = self.producer.builder.make()
1477+
return (signal.on(event: event,
1478+
failed: failed,
1479+
completed: completed,
1480+
interrupted: interrupted,
1481+
terminated: terminated,
1482+
disposed: disposed,
1483+
value: value),
1484+
{ starting?(); start(); started?() },
1485+
interrupter)
1486+
})
14591487
}
14601488

14611489
/// Start the returned producer on the given `Scheduler`.

0 commit comments

Comments
 (0)