diff --git a/Sources/Signal.swift b/Sources/Signal.swift index 0a0738b82..e51511e1c 100644 --- a/Sources/Signal.swift +++ b/Sources/Signal.swift @@ -46,78 +46,27 @@ public final class Signal { private let core: Core private final class Core { - private enum OperationResult { - case shouldDispose - case shouldTryCommitTermination - case none - } - /// The disposable associated with the signal. + /// + /// Disposing of `disposable` is assumed to remove the generator + /// observer from its attached `Signal`, so that the generator observer + /// as the last +1 retain of the `Signal` core may deinitialize. private let disposable: SerialDisposable /// The state of the signal. - /// - /// `state` synchronizes using Read-Copy-Update. Reads on the event delivery - /// routine are thus wait-free. But modifications, e.g. inserting observers, - /// still have to be serialized, and are required not to mutate in place. - /// - /// This suits `Signal` as reads to `status` happens on the critical path of - /// event delivery, while observers bag manipulation or termination generally - /// has a constant occurrence. - /// - /// As `SignalState` is a packed object reference (a tagged pointer) that is - /// naturally aligned, reads to are guaranteed to be atomic on all supported - /// hardware architectures of Swift (ARM and x86). - /// - /// # Thread Safety Notes - /// - /// - Check if the signal is at a specific state. - /// * Read directly. - /// - /// - Deliver `value` events with the alive state. - /// * `sendLock` must be acquired. - /// - /// - Replace the alive state with another. (e.g. observers bag manipulation) - /// * `updateLock` must be acquired. - /// - /// - Transition from `alive` to `terminating` as a result of receiving - /// a termination event. - /// * `updateLock` must be acquired, and should fail gracefully if the - /// signal has terminated. - /// - /// - Check if the signal is terminating. If it is, invoke `tryTerminate` - /// which transitions the state from `terminating` to `terminated`, and - /// delivers the termination event. - /// * Both `sendLock` and `updateLock` must be acquired. The check can be - /// relaxed, but the state must be checked again after the locks are - /// acquired. Fail gracefully if the state has changed since the relaxed - /// read, i.e. a concurrent sender has already handled the termination - /// event. - /// - /// Exploiting the relaxation of reads, please note that false positives - /// are intentionally allowed in the `terminating` checks below. As a - /// result, normal event deliveries need not acquire `updateLock`. - /// Nevertheless, this should not cause the termination event being - /// sent multiple times, since `tryTerminate` would not respond to false - /// positives. private var state: State - /// Used to ensure that state updates are serialized. - private let updateLock: Lock + /// Used to ensure that all state accesses are serialized. + private let stateLock: Lock /// Used to ensure that events are serialized during delivery to observers. private let sendLock: Lock - /// Used to indicate if the `Signal` has deinitialized. - private var hasDeinitialized: Bool - fileprivate init(_ generator: (Observer) -> Disposable?) { - state = .alive(AliveState(observers: Bag())) + state = .alive(Bag(), hasDeinitialized: false) - updateLock = Lock.make() + stateLock = Lock.make() sendLock = Lock.make() - - hasDeinitialized = false disposable = SerialDisposable() // The generator observer retains the `Signal` core. @@ -145,69 +94,52 @@ public final class Signal { // the disposal would be delegated to the current sender, or // occasionally one of the senders waiting on `sendLock`. - self.updateLock.lock() + self.stateLock.lock() - if case let .alive(state) = self.state { - let newSnapshot = TerminatingState(observers: state.observers, event: event) - self.state = .terminating(newSnapshot) - self.updateLock.unlock() - - // Check whether the terminating state has been handled by a - // concurrent sender. If not, handle it. - if self.tryToCommitTermination() == .shouldDispose { - self.disposable.dispose() - } + if case let .alive(observers, _) = state { + self.state = .terminating(observers, .init(event)) + self.stateLock.unlock() } else { - self.updateLock.unlock() + self.stateLock.unlock() } - } else { - var result = OperationResult.none - - // The `terminating` status check is performed twice for two different - // purposes: - // - // 1. Within the main protected section - // It guarantees that a recursive termination event sent by a - // downstream consumer, is immediately processed and need not compete - // with concurrent pending senders (if any). - // - // Termination events sent concurrently may also be caught here, but - // not necessarily all of them due to data races. - // - // 2. After the main protected section - // It ensures the termination event sent concurrently that are not - // caught by (1) due to data races would still be processed. - // - // The related PR on the race conditions: - // https://github.com/ReactiveCocoa/ReactiveSwift/pull/112 + tryToCommitTermination() + } else { self.sendLock.lock() + self.stateLock.lock() - if case let .alive(state) = self.state { - for observer in state.observers { - observer.action(event) - } + if case let .alive(observers, _) = self.state { + self.stateLock.unlock() - // Check if the status has been bumped to `terminating` due to a - // concurrent or a recursive termination event. - if case .terminating = self.state { - result = self.tryToCommitTermination(acquired: self.sendLock) + for observer in observers { + observer.action(event) } + } else { + self.stateLock.unlock() } self.sendLock.unlock() // Check if the status has been bumped to `terminating` due to a - // concurrent termination event that has not been caught in the main - // protected section. - if result == .none, case .terminating = self.state { - result = self.tryToCommitTermination() - } - - // Dispose only after notifying observers, so disposal - // logic is consistently the last thing to run. - if result == .shouldDispose { - self.disposable.dispose() + // terminal event being sent concurrently or recursively. + // + // The check is deliberately made outside of the `sendLock` so that it + // covers also any potential concurrent terminal event in one shot. + // + // Related PR: + // https://github.com/ReactiveCocoa/ReactiveSwift/pull/112 + // + // While calling `tryToCommitTermination` is sufficient, this is a fast + // path for the recurring value delivery. + // + // Note that this cannot be `try` since any concurrent observer bag + // manipulation might then cause the terminating state being missed. + stateLock.lock() + if case .terminating = state { + stateLock.unlock() + tryToCommitTermination() + } else { + stateLock.unlock() } } } @@ -222,15 +154,15 @@ public final class Signal { fileprivate func observe(_ observer: Observer) -> Disposable? { var token: Bag.Token? - updateLock.lock() + stateLock.lock() - if case let .alive(state) = state { - var observers = state.observers - token = observers.insert(observer) - self.state = .alive(AliveState(observers: observers)) + if case let .alive(observers, hasDeinitialized) = state { + var newObservers = observers + token = newObservers.insert(observer) + self.state = .alive(newObservers, hasDeinitialized: hasDeinitialized) } - updateLock.unlock() + stateLock.unlock() if let token = token { return AnyDisposable { [weak self] in @@ -247,37 +179,22 @@ public final class Signal { /// - parameters: /// - token: The token of the observer to remove. private func removeObserver(with token: Bag.Token) { - updateLock.lock() - - if case let .alive(state) = state { - var observers = state.observers - let observer = observers.remove(using: token) - self.state = .alive(AliveState(observers: observers)) + stateLock.lock() - var result = OperationResult.none + if case let .alive(observers, hasDeinitialized) = state { + var newObservers = observers + let observer = newObservers.remove(using: token) + self.state = .alive(newObservers, hasDeinitialized: hasDeinitialized) - // Ensure `observer` is deallocated after `updateLock` is + // Ensure `observer` is deallocated after `stateLock` is // released to avoid deadlocks. withExtendedLifetime(observer) { // Start the disposal of the `Signal` core if the `Signal` has // deinitialized and there is no active observer. - result = tryToDisposeSilentlyIfQualified() - - updateLock.unlock() - } - - if result == .shouldTryCommitTermination { - result = tryToCommitTermination() - } - - if result == .shouldDispose { - // Disposing of `disposable` is assumed to remove the generator - // observer from its attached `Signal`, so that the generator observer - // as the last +1 retain of the `Signal` core may deinitialize. - disposable.dispose() + tryToDisposeSilentlyIfQualified(unlocking: stateLock) } } else { - updateLock.unlock() + stateLock.unlock() } } @@ -287,51 +204,32 @@ public final class Signal { /// It fails gracefully if the signal is alive or has terminated. Calling this /// method as a result of a false positive `terminating` check is permitted. /// - /// - precondition: `updateLock` must not be acquired by the caller. - /// - /// - parameters: - /// - sendLock: `sendLock` if the caller has acquired it. If not specified, - /// `tryTerminate` would attempt to acquire the `sendLock`. - /// - /// - returns: `.shouldDispose` if the attempt succeeds. `.none` otherwise. - private func tryToCommitTermination(acquired sendLock: Lock? = nil) -> OperationResult { - assert(sendLock == nil || sendLock === self.sendLock, - "`tryTerminate` receives a lock that is not the `sendLock` of the signal.") - func commit() -> OperationResult { - // Acquire `updateLock`. If the termination has still not yet been - // handled, take it over and bump the status to `terminated`. - self.updateLock.lock() - - if case let .terminating(state) = self.state { - self.state = .terminated - self.updateLock.unlock() + /// - precondition: `stateLock` must not be acquired by the caller. + private func tryToCommitTermination() { + // Acquire `stateLock`. If the termination has still not yet been + // handled, take it over and bump the status to `terminated`. + stateLock.lock() + + if case let .terminating(observers, terminationKind) = state { + // Try to acquire the `sendLock`, and fail gracefully since the current + // lock holder would attempt to commit after it is done anyway. + if sendLock.try() { + state = .terminated + stateLock.unlock() - if let event = state.event { - for observer in state.observers { + if let event = terminationKind.materialize() { + for observer in observers { observer.action(event) } } - return .shouldDispose + sendLock.unlock() + disposable.dispose() + return } - - self.updateLock.unlock() - return .none - } - - // If the caller declares prior acquisition of `sendLock`, go straight to - // the termination committing routine. - guard sendLock == nil else { - return commit() } - guard self.sendLock.try() else { - // The current sender would commit the termination anyway. - return .none - } - - defer { self.sendLock.unlock() } - return commit() + stateLock.unlock() } /// Try to dispose of the signal silently if the `Signal` has deinitialized and @@ -340,50 +238,46 @@ public final class Signal { /// It fails gracefully if the signal is terminating or terminated, has one or /// more observers, or has not deinitialized. /// - /// - precondition: `updateLock` must have been acquired by the caller. + /// - precondition: `stateLock` must have been acquired by the caller. /// - /// - returns: `.shouldDispose` if the signal has terminated immediately. - /// `.shouldTryCommitTermination` if the signal has transitioned to - /// `terminating` state, and the caller should try to commit the - /// termination with `tryToCommitTermination`. `.none` otherwise. - private func tryToDisposeSilentlyIfQualified() -> OperationResult { - assert(!updateLock.try(), "Calling `unconditionallyTerminate` without acquiring `updateLock`.") - - if hasDeinitialized, case let .alive(state) = state, state.observers.isEmpty { + /// - parameters: + /// - stateLock: The `stateLock` acquired by the caller. + private func tryToDisposeSilentlyIfQualified(unlocking stateLock: Lock) { + assert(!stateLock.try(), "Calling `unconditionallyTerminate` without acquiring `stateLock`.") + + if case let .alive(observers, true) = state, observers.isEmpty { // Transition to `terminated` directly only if there is no event delivery // on going. if sendLock.try() { - defer { sendLock.unlock() } self.state = .terminated - return .shouldDispose + stateLock.unlock() + sendLock.unlock() + + disposable.dispose() + return } - self.state = .terminating(TerminatingState(observers: Bag(), event: nil)) - return .shouldTryCommitTermination + self.state = .terminating(Bag(), .silent) + stateLock.unlock() + + tryToCommitTermination() + return } - return .none + stateLock.unlock() } /// Acknowledge the deinitialization of the `Signal`. fileprivate func signalDidDeinitialize() { - updateLock.lock() + stateLock.lock() // Mark the `Signal` has now deinitialized. - hasDeinitialized = true - - // Attempt to start the disposal of the signal if it has no active observer. - var result = tryToDisposeSilentlyIfQualified() - - updateLock.unlock() - - if result == .shouldTryCommitTermination { - result = tryToCommitTermination() + if case let .alive(observers, false) = state { + state = .alive(observers, hasDeinitialized: true) } - if result == .shouldDispose { - disposable.dispose() - } + // Attempt to start the disposal of the signal if it has no active observer. + tryToDisposeSilentlyIfQualified(unlocking: stateLock) } deinit { @@ -432,56 +326,51 @@ public final class Signal { /// The Swift compiler has also an optimization for enums with payloads that are /// all reference counted, and at most one no-payload case. private enum State { + // `TerminationKind` is constantly pointer-size large to keep `Signal.Core` + // allocation size independent of the actual `Value` and `Error` types. + enum TerminationKind { + case completed + case interrupted + case failed(Swift.Error) + case silent + + init(_ event: Event) { + switch event { + case .value: + fatalError() + case .interrupted: + self = .interrupted + case let .failed(error): + self = .failed(error) + case .completed: + self = .completed + } + } + + func materialize() -> Event? { + switch self { + case .completed: + return .completed + case .interrupted: + return .interrupted + case let .failed(error): + return .failed(error as! Error) + case .silent: + return nil + } + } + } + /// The `Signal` is alive. - case alive(AliveState) + case alive(Bag, hasDeinitialized: Bool) /// The `Signal` has received a termination event, and is about to be /// terminated. - case terminating(TerminatingState) + case terminating(Bag, TerminationKind) /// The `Signal` has terminated. case terminated } - - // As the amount of state would definitely span over a cache line, - // `AliveState` and `TerminatingState` is set to be a reference type so - // that we can atomically update the reference instead. - - /// The state of a `Signal` that is alive. It contains a bag of observers and - /// an optional self-retaining reference. - private final class AliveState { - /// The observers of the `Signal`. - /// - /// - important: `observer` should not be mutated directly given the layout of - /// `Bag`. Copy the bag, and replace the current `AliveState` with - /// a new one created with the bag instead. - fileprivate let observers: Bag - - /// Create an alive state. - init(observers: Bag) { - self.observers = observers - } - } - - /// The state of a terminating `Signal`. It contains a bag of observers and the - /// termination event. - private final class TerminatingState { - /// The observers of the `Signal`. - fileprivate let observers: Bag - - /// The termination event. - fileprivate let event: Event? - - /// Create a terminating state. - /// - /// - parameters: - /// - observers: The latest bag of observers. - /// - event: The termination event. - init(observers: Bag, event: Event?) { - self.observers = observers - self.event = event - } - } } extension Signal {