Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 91 additions & 127 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,7 @@ public final class Signal<Value, Error: Swift.Error> {
/// Nevertheless, this should not cause the termination event being
/// sent multiple times, since `tryTerminate` would not respond to false
/// positives.
private var state: State

/// Used to ensure that state updates are serialized.
private let updateLock: Lock
private let state: Atomic<State>

/// Used to ensure that events are serialized during delivery to observers.
private let sendLock: Lock
Expand All @@ -112,9 +109,8 @@ public final class Signal<Value, Error: Swift.Error> {
private var hasDeinitialized: Bool

fileprivate init(_ generator: (Observer) -> Disposable?) {
state = .alive(AliveState(observers: Bag()))
state = Atomic(.alive(AliveState(observers: Bag())))

updateLock = Lock.make()
sendLock = Lock.make()

hasDeinitialized = false
Expand Down Expand Up @@ -145,62 +141,35 @@ public final class Signal<Value, Error: Swift.Error> {
// the disposal would be delegated to the current sender, or
// occasionally one of the senders waiting on `sendLock`.

self.updateLock.lock()

if case let .alive(state) = self.state {
let newSnapshot = TerminatingState(observers: state.observers, event: event)
self.state = .terminating(newSnapshot)
self.updateLock.unlock()

// Check whether the terminating state has been handled by a
// concurrent sender. If not, handle it.
if self.tryToCommitTermination() == .shouldDispose {
self.disposable.dispose()
state.modify { state in
if case let .alive(aliveState) = state {
let newSnapshot = TerminatingState(observers: aliveState.observers, event: event)
state = .terminating(newSnapshot)
}
} else {
self.updateLock.unlock()
}

// Check whether the terminating state has been handled by a
// concurrent sender. If not, handle it.
if self.tryToCommitTermination() == .shouldDispose {
self.disposable.dispose()
}
} else {
var result = OperationResult.none

// The `terminating` status check is performed twice for two different
// purposes:
//
// 1. Within the main protected section
// It guarantees that a recursive termination event sent by a
// downstream consumer, is immediately processed and need not compete
// with concurrent pending senders (if any).
//
// Termination events sent concurrently may also be caught here, but
// not necessarily all of them due to data races.
//
// 2. After the main protected section
// It ensures the termination event sent concurrently that are not
// caught by (1) due to data races would still be processed.
//
// The related PR on the race conditions:
// https://github.com/ReactiveCocoa/ReactiveSwift/pull/112

self.sendLock.lock()

if case let .alive(state) = self.state {
for observer in state.observers {
if let observers = state.value.aliveState?.observers {
for observer in observers {
observer.action(event)
}

// Check if the status has been bumped to `terminating` due to a
// concurrent or a recursive termination event.
if case .terminating = self.state {
result = self.tryToCommitTermination(acquired: self.sendLock)
}
}

self.sendLock.unlock()

// Check if the status has been bumped to `terminating` due to a
// concurrent termination event that has not been caught in the main
// protected section.
if result == .none, case .terminating = self.state {
// Check if any observer or any concurrent sender has been bumped to
// `terminating` due to a concurrent termination event that has not been
// caught in the main protected section.
if result == .none, state.value.isTerminating {
result = self.tryToCommitTermination()
}

Expand All @@ -220,18 +189,16 @@ public final class Signal<Value, Error: Swift.Error> {
/// - returns: A `Disposable` which can be used to disconnect the observer,
/// or `nil` if the signal has already terminated.
fileprivate func observe(_ observer: Observer) -> Disposable? {
var token: Bag<Observer>.Token?

updateLock.lock()

if case let .alive(state) = state {
var observers = state.observers
token = observers.insert(observer)
self.state = .alive(AliveState(observers: observers))
let token: Bag<Observer>.Token? = state.modify { state in
if case let .alive(aliveState) = state {
var observers = aliveState.observers
let token = observers.insert(observer)
state = .alive(AliveState(observers: observers))
return token
}
return nil
}

updateLock.unlock()

if let token = token {
return AnyDisposable { [weak self] in
self?.removeObserver(with: token)
Expand All @@ -247,37 +214,34 @@ public final class Signal<Value, Error: Swift.Error> {
/// - parameters:
/// - token: The token of the observer to remove.
private func removeObserver(with token: Bag<Observer>.Token) {
updateLock.lock()
var deinitObject: AnyObject?; _ = deinitObject

if case let .alive(state) = state {
var observers = state.observers
let observer = observers.remove(using: token)
self.state = .alive(AliveState(observers: observers))
var result: OperationResult = state.modify { state in
if case let .alive(aliveState) = state {
var observers = aliveState.observers
let observer = observers.remove(using: token)
state = .alive(AliveState(observers: observers))

var result = OperationResult.none
// Ensure `observer` is deallocated after `updateLock` is
// released to avoid deadlocks.
deinitObject = observer

// Ensure `observer` is deallocated after `updateLock` is
// released to avoid deadlocks.
withExtendedLifetime(observer) {
// Start the disposal of the `Signal` core if the `Signal` has
// deinitialized and there is no active observer.
result = tryToDisposeSilentlyIfQualified()

updateLock.unlock()
return tryToDisposeSilentlyIfQualified(state: &state)
}
return .none
}

if result == .shouldTryCommitTermination {
result = tryToCommitTermination()
}
if result == .shouldTryCommitTermination {
result = tryToCommitTermination()
}

if result == .shouldDispose {
// Disposing of `disposable` is assumed to remove the generator
// observer from its attached `Signal`, so that the generator observer
// as the last +1 retain of the `Signal` core may deinitialize.
disposable.dispose()
}
} else {
updateLock.unlock()
if result == .shouldDispose {
// Disposing of `disposable` is assumed to remove the generator
// observer from its attached `Signal`, so that the generator observer
// as the last +1 retain of the `Signal` core may deinitialize.
disposable.dispose()
}
}

Expand All @@ -294,44 +258,34 @@ public final class Signal<Value, Error: Swift.Error> {
/// `tryTerminate` would attempt to acquire the `sendLock`.
///
/// - returns: `.shouldDispose` if the attempt succeeds. `.none` otherwise.
private func tryToCommitTermination(acquired sendLock: Lock? = nil) -> OperationResult {
assert(sendLock == nil || sendLock === self.sendLock,
"`tryTerminate` receives a lock that is not the `sendLock` of the signal.")
func commit() -> OperationResult {
// Acquire `updateLock`. If the termination has still not yet been
// handled, take it over and bump the status to `terminated`.
self.updateLock.lock()

if case let .terminating(state) = self.state {
self.state = .terminated
self.updateLock.unlock()

if let event = state.event {
for observer in state.observers {
observer.action(event)
}
}

return .shouldDispose
}

self.updateLock.unlock()
private func tryToCommitTermination() -> OperationResult {
guard self.sendLock.try() else {
return .none
}

// If the caller declares prior acquisition of `sendLock`, go straight to
// the termination committing routine.
guard sendLock == nil else {
return commit()
defer { self.sendLock.unlock() }

// Acquire `updateLock`. If the termination has still not yet been
// handled, take it over and bump the status to `terminated`.
let terminatingState: TerminatingState? = state.modify { state in
if case let .terminating(terminatingState) = state {
state = .terminated
return terminatingState
}
return nil
}

guard self.sendLock.try() else {
// The current sender would commit the termination anyway.
return .none
if let state = terminatingState {
if let event = state.event {
for observer in state.observers {
observer.action(event)
}
}

return .shouldDispose
}

defer { self.sendLock.unlock() }
return commit()
return .none
}

/// Try to dispose of the signal silently if the `Signal` has deinitialized and
Expand All @@ -346,19 +300,17 @@ public final class Signal<Value, Error: Swift.Error> {
/// `.shouldTryCommitTermination` if the signal has transitioned to
/// `terminating` state, and the caller should try to commit the
/// termination with `tryToCommitTermination`. `.none` otherwise.
private func tryToDisposeSilentlyIfQualified() -> OperationResult {
assert(!updateLock.try(), "Calling `unconditionallyTerminate` without acquiring `updateLock`.")

if hasDeinitialized, case let .alive(state) = state, state.observers.isEmpty {
private func tryToDisposeSilentlyIfQualified(state: inout State) -> OperationResult {
if hasDeinitialized, case let .alive(aliveState) = state, aliveState.observers.isEmpty {
// Transition to `terminated` directly only if there is no event delivery
// on going.
if sendLock.try() {
defer { sendLock.unlock() }
self.state = .terminated
state = .terminated
return .shouldDispose
}

self.state = .terminating(TerminatingState(observers: Bag(), event: nil))
state = .terminating(TerminatingState(observers: Bag(), event: nil))
return .shouldTryCommitTermination
}

Expand All @@ -367,15 +319,13 @@ public final class Signal<Value, Error: Swift.Error> {

/// Acknowledge the deinitialization of the `Signal`.
fileprivate func signalDidDeinitialize() {
updateLock.lock()

// Mark the `Signal` has now deinitialized.
hasDeinitialized = true
var result: OperationResult = state.modify { state in
// Mark the `Signal` has now deinitialized.
hasDeinitialized = true

// Attempt to start the disposal of the signal if it has no active observer.
var result = tryToDisposeSilentlyIfQualified()

updateLock.unlock()
// Attempt to start the disposal of the signal if it has no active observer.
return tryToDisposeSilentlyIfQualified(state: &state)
}

if result == .shouldTryCommitTermination {
result = tryToCommitTermination()
Expand Down Expand Up @@ -441,6 +391,20 @@ public final class Signal<Value, Error: Swift.Error> {

/// The `Signal` has terminated.
case terminated

var aliveState: AliveState? {
if case let .alive(state) = self {
return state
}
return nil
}

var isTerminating: Bool {
if case .terminating = self {
return true
}
return false
}
}

// As the amount of state would definitely span over a cache line,
Expand Down