diff --git a/Sources/Signal.swift b/Sources/Signal.swift index 918daf36e..62c9ec5ab 100644 --- a/Sources/Signal.swift +++ b/Sources/Signal.swift @@ -1413,6 +1413,20 @@ extension Signal { /// /// - returns: A signal that sends accumulated value after `self` completes. public func reduce(_ initial: U, _ combine: @escaping (U, Value) -> U) -> Signal { + return self.reduce(into: initial) { accumulator, value in + accumulator = combine(accumulator, value) + } + } + + /// Send only the final value and then immediately completes. + /// + /// - parameters: + /// - initial: Initial value for the accumulator. + /// - combine: A closure that accepts accumulator and sent value of + /// `self`. + /// + /// - returns: A signal that sends accumulated value after `self` completes. + public func reduce(into initial: U, _ combine: @escaping (inout U, Value) -> Void) -> Signal { // We need to handle the special case in which `signal` sends no values. // We'll do that by sending `initial` on the output signal (before // taking the last value). @@ -1420,11 +1434,11 @@ extension Signal { let outputSignal = scannedSignalWithInitialValue.take(last: 1) // Now that we've got takeLast() listening to the piped signal, send - // that initial value. + // that initial value. outputSignalObserver.send(value: initial) // Pipe the scanned input signal into the output signal. - self.scan(initial, combine) + self.scan(into: initial, combine) .observe(outputSignalObserver) return outputSignal @@ -1445,12 +1459,32 @@ extension Signal { /// - returns: A signal that sends accumulated value each time `self` emits /// own value. public func scan(_ initial: U, _ combine: @escaping (U, Value) -> U) -> Signal { + return self.scan(into: initial) { accumulator, value in + accumulator = combine(accumulator, value) + } + } + + /// Aggregate values into a single combined value. When `self` emits its + /// first value, `combine` is invoked with `initial` as the first argument + /// and that emitted value as the second argument. The result is emitted + /// from the signal returned from `scan`. That result is then passed to + /// `combine` as the first argument when the next value is emitted, and so + /// on. + /// + /// - parameters: + /// - initial: Initial value for the accumulator. + /// - combine: A closure that accepts accumulator and sent value of + /// `self`. + /// + /// - returns: A signal that sends accumulated value each time `self` emits + /// own value. + public func scan(into initial: U, _ combine: @escaping (inout U, Value) -> Void) -> Signal { return Signal { observer in var accumulator = initial return self.observe { event in observer.action(event.map { value in - accumulator = combine(accumulator, value) + combine(&accumulator, value) return accumulator }) } diff --git a/Sources/SignalProducer.swift b/Sources/SignalProducer.swift index b74c51dbf..1d9194a0f 100644 --- a/Sources/SignalProducer.swift +++ b/Sources/SignalProducer.swift @@ -934,6 +934,19 @@ extension SignalProducer { return lift { $0.reduce(initial, combine) } } + /// Send only the final value and then immediately completes. + /// + /// - parameters: + /// - initial: Initial value for the accumulator. + /// - combine: A closure that accepts accumulator and sent value of + /// `self`. + /// + /// - returns: A producer that sends accumulated value after `self` + /// completes. + public func reduce(into initial: U, _ combine: @escaping (inout U, Value) -> Void) -> SignalProducer { + return lift { $0.reduce(into: initial, combine) } + } + /// Aggregate `self`'s values into a single combined value. When `self` /// emits its first value, `combine` is invoked with `initial` as the first /// argument and that emitted value as the second argument. The result is @@ -952,6 +965,24 @@ extension SignalProducer { return lift { $0.scan(initial, combine) } } + /// Aggregate `self`'s values into a single combined value. When `self` + /// emits its first value, `combine` is invoked with `initial` as the first + /// argument and that emitted value as the second argument. The result is + /// emitted from the producer returned from `scan`. That result is then + /// passed to `combine` as the first argument when the next value is + /// emitted, and so on. + /// + /// - parameters: + /// - initial: Initial value for the accumulator. + /// - combine: A closure that accepts accumulator and sent value of + /// `self`. + /// + /// - returns: A producer that sends accumulated value each time `self` + /// emits own value. + public func scan(into initial: U, _ combine: @escaping (inout U, Value) -> Void) -> SignalProducer { + return lift { $0.scan(into: initial, combine) } + } + /// Forward only those values from `self` which do not pass `isRepeat` with /// respect to the previous value. /// diff --git a/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift b/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift index 8a721afaf..bff5763fd 100644 --- a/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift +++ b/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift @@ -360,7 +360,7 @@ class SignalProducerLiftingSpec: QuickSpec { } } - describe("scan") { + describe("scan(_:_:)") { it("should incrementally accumulate a value") { let (baseProducer, observer) = SignalProducer.pipe() let producer = baseProducer.scan("", +) @@ -379,7 +379,26 @@ class SignalProducerLiftingSpec: QuickSpec { } } - describe("reduce") { + describe("scan(into:_:)") { + it("should incrementally accumulate a value") { + let (baseProducer, observer) = SignalProducer.pipe() + let producer = baseProducer.scan(into: "") { $0 += $1 } + + var lastValue: String? + + producer.startWithValues { lastValue = $0 } + + expect(lastValue).to(beNil()) + + observer.send(value: "a") + expect(lastValue) == "a" + + observer.send(value: "bb") + expect(lastValue) == "abb" + } + } + + describe("reduce(_:_:)") { it("should accumulate one value") { let (baseProducer, observer) = SignalProducer.pipe() let producer = baseProducer.reduce(1, +) @@ -441,6 +460,68 @@ class SignalProducerLiftingSpec: QuickSpec { } } + describe("reduce(into:_:)") { + it("should accumulate one value") { + let (baseProducer, observer) = SignalProducer.pipe() + let producer = baseProducer.reduce(into: 1) { $0 += $1 } + + var lastValue: Int? + var completed = false + + producer.start { event in + switch event { + case let .value(value): + lastValue = value + case .completed: + completed = true + case .failed, .interrupted: + break + } + } + + expect(lastValue).to(beNil()) + + observer.send(value: 1) + expect(lastValue).to(beNil()) + + observer.send(value: 2) + expect(lastValue).to(beNil()) + + expect(completed) == false + observer.sendCompleted() + expect(completed) == true + + expect(lastValue) == 4 + } + + it("should send the initial value if none are received") { + let (baseProducer, observer) = SignalProducer.pipe() + let producer = baseProducer.reduce(into: 1) { $0 += $1 } + + var lastValue: Int? + var completed = false + + producer.start { event in + switch event { + case let .value(value): + lastValue = value + case .completed: + completed = true + case .failed, .interrupted: + break + } + } + + expect(lastValue).to(beNil()) + expect(completed) == false + + observer.sendCompleted() + + expect(lastValue) == 1 + expect(completed) == true + } + } + describe("skip") { it("should skip initial values") { let (baseProducer, observer) = SignalProducer.pipe() diff --git a/Tests/ReactiveSwiftTests/SignalSpec.swift b/Tests/ReactiveSwiftTests/SignalSpec.swift index 6d5a099b9..1c5ef8d4e 100755 --- a/Tests/ReactiveSwiftTests/SignalSpec.swift +++ b/Tests/ReactiveSwiftTests/SignalSpec.swift @@ -742,7 +742,7 @@ class SignalSpec: QuickSpec { } } - describe("scan") { + describe("scan(_:_:)") { it("should incrementally accumulate a value") { let (baseSignal, observer) = Signal.pipe() let signal = baseSignal.scan("", +) @@ -761,7 +761,26 @@ class SignalSpec: QuickSpec { } } - describe("reduce") { + describe("scan(into:_:)") { + it("should incrementally accumulate a value") { + let (baseSignal, observer) = Signal.pipe() + let signal = baseSignal.scan(into: "") { $0 += $1 } + + var lastValue: String? + + signal.observeValues { lastValue = $0 } + + expect(lastValue).to(beNil()) + + observer.send(value: "a") + expect(lastValue) == "a" + + observer.send(value: "bb") + expect(lastValue) == "abb" + } + } + + describe("reduce(_:_:)") { it("should accumulate one value") { let (baseSignal, observer) = Signal.pipe() let signal = baseSignal.reduce(1, +) @@ -823,6 +842,68 @@ class SignalSpec: QuickSpec { } } + describe("reduce(into:_:)") { + it("should accumulate one value") { + let (baseSignal, observer) = Signal.pipe() + let signal = baseSignal.reduce(into: 1) { $0 += $1 } + + var lastValue: Int? + var completed = false + + signal.observe { event in + switch event { + case let .value(value): + lastValue = value + case .completed: + completed = true + default: + break + } + } + + expect(lastValue).to(beNil()) + + observer.send(value: 1) + expect(lastValue).to(beNil()) + + observer.send(value: 2) + expect(lastValue).to(beNil()) + + expect(completed) == false + observer.sendCompleted() + expect(completed) == true + + expect(lastValue) == 4 + } + + it("should send the initial value if none are received") { + let (baseSignal, observer) = Signal.pipe() + let signal = baseSignal.reduce(into: 1) { $0 += $1 } + + var lastValue: Int? + var completed = false + + signal.observe { event in + switch event { + case let .value(value): + lastValue = value + case .completed: + completed = true + default: + break + } + } + + expect(lastValue).to(beNil()) + expect(completed) == false + + observer.sendCompleted() + + expect(lastValue) == 1 + expect(completed) == true + } + } + describe("skip") { it("should skip initial values") { let (baseSignal, observer) = Signal.pipe()