diff --git a/CHANGELOG.md b/CHANGELOG.md index ba0cac81b..59b0c90e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # master *Please add new entries at the top.* +1. Fixed a deadlock upon disposal when combining operators, i.e. `zip` and `combineLatest`, are used. (#471, kudos to @stevebrambilla for catching the bug) + # 2.0.0-rc.1 1. If the input observer of a `Signal` deinitializes while the `Signal` has not yet terminated, an `interrupted` event would now be automatically sent. (#463, kudos to @andersio) diff --git a/Sources/Signal.swift b/Sources/Signal.swift index 8eb162a07..c483ed9e8 100644 --- a/Sources/Signal.swift +++ b/Sources/Signal.swift @@ -1959,42 +1959,48 @@ private enum ThrottleWhileState { } } -private protocol SignalAggregateStrategy { +private protocol SignalAggregateStrategy: class { /// Update the latest value of the signal at `position` to be `value`. /// /// - parameters: /// - value: The latest value emitted by the signal at `position`. /// - position: The position of the signal. - /// - /// - returns: `true` if the aggregating signal should terminate as a result of the - /// update. `false` otherwise. - mutating func update(_ value: Any, at position: Int) -> Bool + func update(_ value: Any, at position: Int) /// Record the completion of the signal at `position`. /// /// - parameters: /// - position: The position of the signal. - /// - /// - returns: `true` if the aggregating signal should terminate as a result of the - /// completion. `false` otherwise. - mutating func complete(at position: Int) -> Bool + func complete(at position: Int) + + init(count: Int, action: @escaping (AggregateStrategyEvent) -> Void) +} - init(count: Int, action: @escaping (ContiguousArray) -> Void) +private enum AggregateStrategyEvent { + case value(ContiguousArray) + case completed } extension Signal { - private struct CombineLatestStrategy: SignalAggregateStrategy { + // Threading of `CombineLatestStrategy` and `ZipStrategy`. + // + // The threading models of these strategies mirror that of `Signal.Core` to allow + // recursive termial event from the upstreams that is triggered by the combined + // values. + // + // The strategies do not unique the delivery of `completed`, since `Signal` already + // guarantees that no event would ever be delivered after a terminal event. + + private final class CombineLatestStrategy: SignalAggregateStrategy { private enum Placeholder { case none } - private var values: ContiguousArray - private var completionCount: Int - private let action: (ContiguousArray) -> Void + var values: ContiguousArray private var _haveAllSentInitial: Bool private var haveAllSentInitial: Bool { - mutating get { + get { if _haveAllSentInitial { return true } @@ -2004,47 +2010,74 @@ extension Signal { } } - mutating func update(_ value: Any, at position: Int) -> Bool { + private let count: Int + private let lock: Lock + + private let completion: Atomic + private let action: (AggregateStrategyEvent) -> Void + + func update(_ value: Any, at position: Int) { + lock.lock() values[position] = value if haveAllSentInitial { - action(values) + action(.value(values)) } - return false + lock.unlock() + + if completion.value == self.count, lock.try() { + action(.completed) + lock.unlock() + } } - mutating func complete(at position: Int) -> Bool { - completionCount += 1 - return completionCount == values.count + func complete(at position: Int) { + let count: Int = completion.modify { count in + count += 1 + return count + } + + if count == self.count, lock.try() { + action(.completed) + lock.unlock() + } } - init(count: Int, action: @escaping (ContiguousArray) -> Void) { - values = ContiguousArray(repeating: Placeholder.none, count: count) - completionCount = 0 - _haveAllSentInitial = false + init(count: Int, action: @escaping (AggregateStrategyEvent) -> Void) { + self.count = count + self.lock = Lock.make() + self.values = ContiguousArray(repeating: Placeholder.none, count: count) + self._haveAllSentInitial = false + self.completion = Atomic(0) self.action = action } } - private struct ZipStrategy: SignalAggregateStrategy { + private final class ZipStrategy: SignalAggregateStrategy { + private let stateLock: Lock + private let sendLock: Lock + private var values: ContiguousArray<[Any]> + private var canEmit: Bool { + return values.reduce(true) { $0 && !$1.isEmpty } + } + + private var hasConcurrentlyCompleted: Bool private var isCompleted: ContiguousArray - private let action: (ContiguousArray) -> Void private var hasCompletedAndEmptiedSignal: Bool { return Swift.zip(values, isCompleted).contains(where: { $0.0.isEmpty && $0.1 }) } - private var canEmit: Bool { - return values.reduce(true) { $0 && !$1.isEmpty } - } - private var areAllCompleted: Bool { return isCompleted.reduce(true) { $0 && $1 } } - mutating func update(_ value: Any, at position: Int) -> Bool { + private let action: (AggregateStrategyEvent) -> Void + + func update(_ value: Any, at position: Int) { + stateLock.lock() values[position].append(value) if canEmit { @@ -2055,33 +2088,61 @@ extension Signal { buffer.append(values[index].removeFirst()) } - action(buffer) + let shouldComplete = areAllCompleted || hasCompletedAndEmptiedSignal + sendLock.lock() + stateLock.unlock() + + action(.value(buffer)) - if hasCompletedAndEmptiedSignal { - return true + if shouldComplete { + action(.completed) + } + + sendLock.unlock() + + stateLock.lock() + + if hasConcurrentlyCompleted { + sendLock.lock() + action(.completed) + sendLock.unlock() } } - return false + stateLock.unlock() } - mutating func complete(at position: Int) -> Bool { + func complete(at position: Int) { + stateLock.lock() isCompleted[position] = true - // `zip` completes when all signals has completed, or any of the signals - // has completed without any buffered value. - return hasCompletedAndEmptiedSignal || areAllCompleted + if hasConcurrentlyCompleted || areAllCompleted || hasCompletedAndEmptiedSignal { + if sendLock.try() { + stateLock.unlock() + + action(.completed) + sendLock.unlock() + return + } + + hasConcurrentlyCompleted = true + } + + stateLock.unlock() } - init(count: Int, action: @escaping (ContiguousArray) -> Void) { - values = ContiguousArray(repeating: [], count: count) - isCompleted = ContiguousArray(repeating: false, count: count) + init(count: Int, action: @escaping (AggregateStrategyEvent) -> Void) { + self.values = ContiguousArray(repeating: [], count: count) + self.hasConcurrentlyCompleted = false + self.isCompleted = ContiguousArray(repeating: false, count: count) self.action = action + self.sendLock = Lock.make() + self.stateLock = Lock.make() } } private final class AggregateBuilder { - fileprivate var startHandlers: [(_ index: Int, _ strategy: Atomic, _ action: @escaping (Signal.Event) -> Void) -> Disposable?] + fileprivate var startHandlers: [(_ index: Int, _ strategy: Strategy, _ action: @escaping (Signal.Event) -> Void) -> Disposable?] init() { self.startHandlers = [] @@ -2093,22 +2154,10 @@ extension Signal { return signal.observe { event in switch event { case let .value(value): - let shouldComplete = strategy.modify { - return $0.update(value, at: index) - } - - if shouldComplete { - action(.completed) - } + strategy.update(value, at: index) case .completed: - let shouldComplete = strategy.modify { - return $0.complete(at: index) - } - - if shouldComplete { - action(.completed) - } + strategy.complete(at: index) case .interrupted: action(.interrupted) @@ -2126,17 +2175,20 @@ extension Signal { private convenience init(_ builder: AggregateBuilder, _ transform: @escaping (ContiguousArray) -> Value) { self.init { observer in let disposables = CompositeDisposable() - let strategy = Atomic(Strategy(count: builder.startHandlers.count) { observer.send(value: transform($0)) }) + let strategy = Strategy(count: builder.startHandlers.count) { event in + switch event { + case let .value(value): + observer.send(value: transform(value)) + case .completed: + observer.sendCompleted() + } + } for (index, action) in builder.startHandlers.enumerated() where !disposables.isDisposed { disposables += action(index, strategy) { observer.action($0.map { _ in fatalError() }) } } - return AnyDisposable { - strategy.modify { _ in - disposables.dispose() - } - } + return disposables } } diff --git a/Tests/ReactiveSwiftTests/SignalSpec.swift b/Tests/ReactiveSwiftTests/SignalSpec.swift index 423b642eb..8a4d6f93f 100755 --- a/Tests/ReactiveSwiftTests/SignalSpec.swift +++ b/Tests/ReactiveSwiftTests/SignalSpec.swift @@ -14,27 +14,8 @@ import Nimble import Quick @testable import ReactiveSwift -private class StateA {} -private class StateB {} -private enum SignalState { - case alive(StateA) - case terminating(StateB) - case terminated -} - class SignalSpec: QuickSpec { override func spec() { - describe("thread safety") { - it("should have the same memory layout as a native pointer") { - let enumLayout = MemoryLayout>.self - let pointerLayout = MemoryLayout.self - - expect(enumLayout.alignment) == pointerLayout.alignment - expect(enumLayout.size) == pointerLayout.size - expect(enumLayout.stride) == pointerLayout.stride - } - } - describe("init") { var testScheduler: TestScheduler! @@ -2798,6 +2779,110 @@ class SignalSpec: QuickSpec { } } + describe("AggregateBuilder") { + it("should not deadlock upon disposal") { + let (a, aObserver) = Signal<(), NoError>.pipe() + let (b, bObserver) = Signal<(), NoError>.pipe() + + Signal.zip(a, b) + .take(first: 1) + .observeValues { _ in } + + aObserver.send(value: ()) + bObserver.send(value: ()) + } + + it("should not deadlock upon recursive completion of the sources") { + let (a, aObserver) = Signal<(), NoError>.pipe() + let (b, bObserver) = Signal<(), NoError>.pipe() + + Signal.zip(a, b) + .observeValues { _ in + aObserver.sendCompleted() + } + + aObserver.send(value: ()) + bObserver.send(value: ()) + } + + it("should not deadlock upon recursive interruption of the sources") { + let (a, aObserver) = Signal<(), NoError>.pipe() + let (b, bObserver) = Signal<(), NoError>.pipe() + + Signal.zip(a, b) + .observeResult { _ in + aObserver.sendInterrupted() + } + + aObserver.send(value: ()) + bObserver.send(value: ()) + } + + it("should not deadlock upon recursive failure of the sources") { + let (a, aObserver) = Signal<(), TestError>.pipe() + let (b, bObserver) = Signal<(), TestError>.pipe() + + Signal.zip(a, b) + .observeResult { _ in + aObserver.send(error: .default) + } + + aObserver.send(value: ()) + bObserver.send(value: ()) + } + + it("should not deadlock upon disposal") { + let (a, aObserver) = Signal<(), NoError>.pipe() + let (b, bObserver) = Signal<(), NoError>.pipe() + + Signal.combineLatest(a, b) + .take(first: 1) + .observeValues { _ in } + + aObserver.send(value: ()) + bObserver.send(value: ()) + } + + it("should not deadlock upon recursive completion of the sources") { + let (a, aObserver) = Signal<(), NoError>.pipe() + let (b, bObserver) = Signal<(), NoError>.pipe() + + Signal.combineLatest(a, b) + .observeValues { _ in + aObserver.sendCompleted() + } + + aObserver.send(value: ()) + bObserver.send(value: ()) + } + + it("should not deadlock upon recursive interruption of the sources") { + let (a, aObserver) = Signal<(), NoError>.pipe() + let (b, bObserver) = Signal<(), NoError>.pipe() + + Signal.combineLatest(a, b) + .observeResult { _ in + aObserver.sendInterrupted() + } + + aObserver.send(value: ()) + bObserver.send(value: ()) + } + + it("should not deadlock upon recursive failure of the sources") { + let (a, aObserver) = Signal<(), TestError>.pipe() + let (b, bObserver) = Signal<(), TestError>.pipe() + + Signal.combineLatest(a, b) + .observeResult { _ in + aObserver.send(error: .default) + } + + aObserver.send(value: ()) + bObserver.send(value: ()) + } + } + describe("combineLatest") { var signalA: Signal! var signalB: Signal!