Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 67 additions & 73 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,11 @@ public final class Signal<Value, Error: Swift.Error> {
/// Used to ensure that events are serialized during delivery to observers.
private let sendLock: Lock

/// Used to indicate if the `Signal` has deinitialized.
private var hasDeinitialized: Bool

fileprivate init(_ generator: (Observer) -> Disposable?) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above documentation needs to be updated to reflect that the lock must be held to read the state.

state = .alive(AliveState(observers: Bag()))
state = .alive(Bag(), hasDeinitialized: false)

updateLock = Lock.make()
sendLock = Lock.make()

hasDeinitialized = false
disposable = SerialDisposable()

// The generator observer retains the `Signal` core.
Expand Down Expand Up @@ -147,9 +142,8 @@ public final class Signal<Value, Error: Swift.Error> {

self.updateLock.lock()

if case let .alive(state) = self.state {
let newSnapshot = TerminatingState(observers: state.observers, event: event)
self.state = .terminating(newSnapshot)
if case let .alive(observers, _) = state {
self.state = .terminating(observers, .init(event))
self.updateLock.unlock()

// Check whether the terminating state has been handled by a
Expand Down Expand Up @@ -182,26 +176,29 @@ public final class Signal<Value, Error: Swift.Error> {
// https://github.com/ReactiveCocoa/ReactiveSwift/pull/112

self.sendLock.lock()
self.updateLock.lock()

if case let .alive(state) = self.state {
for observer in state.observers {
observer.action(event)
}
if case let .alive(observers, _) = self.state {
self.updateLock.unlock()

// Check if the status has been bumped to `terminating` due to a
// concurrent or a recursive termination event.
if case .terminating = self.state {
result = self.tryToCommitTermination(acquired: self.sendLock)
for observer in observers {
observer.action(event)
}
} else {
self.updateLock.unlock()
}

self.sendLock.unlock()

// Check if the status has been bumped to `terminating` due to a
// concurrent termination event that has not been caught in the main
// protected section.
self.updateLock.lock()
if result == .none, case .terminating = self.state {
self.updateLock.unlock()
result = self.tryToCommitTermination()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tryToCommitTermination already locks the state to check if it is terminating or not. So the locking here is pointless.

} else {
self.updateLock.unlock()
}

// Dispose only after notifying observers, so disposal
Expand All @@ -224,10 +221,10 @@ public final class Signal<Value, Error: Swift.Error> {

updateLock.lock()

if case let .alive(state) = state {
var observers = state.observers
token = observers.insert(observer)
self.state = .alive(AliveState(observers: observers))
if case let .alive(observers, hasDeinitialized) = state {
var newObservers = observers
token = newObservers.insert(observer)
self.state = .alive(newObservers, hasDeinitialized: hasDeinitialized)
}

updateLock.unlock()
Expand All @@ -249,10 +246,10 @@ public final class Signal<Value, Error: Swift.Error> {
private func removeObserver(with token: Bag<Observer>.Token) {
updateLock.lock()

if case let .alive(state) = state {
var observers = state.observers
let observer = observers.remove(using: token)
self.state = .alive(AliveState(observers: observers))
if case let .alive(observers, hasDeinitialized) = state {
var newObservers = observers
let observer = newObservers.remove(using: token)
self.state = .alive(newObservers, hasDeinitialized: hasDeinitialized)

var result = OperationResult.none

Expand Down Expand Up @@ -302,12 +299,12 @@ public final class Signal<Value, Error: Swift.Error> {
// handled, take it over and bump the status to `terminated`.
self.updateLock.lock()

if case let .terminating(state) = self.state {
if case let .terminating(observers, terminationKind) = self.state {
self.state = .terminated
self.updateLock.unlock()

if let event = state.event {
for observer in state.observers {
if let event = terminationKind.materialize() {
for observer in observers {
observer.action(event)
}
}
Expand Down Expand Up @@ -349,7 +346,7 @@ public final class Signal<Value, Error: Swift.Error> {
private func tryToDisposeSilentlyIfQualified() -> OperationResult {
assert(!updateLock.try(), "Calling `unconditionallyTerminate` without acquiring `updateLock`.")

if hasDeinitialized, case let .alive(state) = state, state.observers.isEmpty {
if case let .alive(observers, true) = state, observers.isEmpty {
// Transition to `terminated` directly only if there is no event delivery
// on going.
if sendLock.try() {
Expand All @@ -358,7 +355,7 @@ public final class Signal<Value, Error: Swift.Error> {
return .shouldDispose
}

self.state = .terminating(TerminatingState(observers: Bag(), event: nil))
self.state = .terminating(Bag(), .silent)
return .shouldTryCommitTermination
}

Expand All @@ -370,7 +367,9 @@ public final class Signal<Value, Error: Swift.Error> {
updateLock.lock()

// Mark the `Signal` has now deinitialized.
hasDeinitialized = true
if case let .alive(observers, false) = state {
state = .alive(observers, hasDeinitialized: true)
}

// Attempt to start the disposal of the signal if it has no active observer.
var result = tryToDisposeSilentlyIfQualified()
Expand Down Expand Up @@ -432,56 +431,51 @@ public final class Signal<Value, Error: Swift.Error> {
/// The Swift compiler has also an optimization for enums with payloads that are
/// all reference counted, and at most one no-payload case.
private enum State {
// `TerminationKind` is constantly pointer-size large to keep `Signal.Core`
// allocation size independent of the actual `Value` and `Error` types.
enum TerminationKind {
case completed
case interrupted
case failed(Swift.Error)
case silent

init(_ event: Event) {
switch event {
case .value:
fatalError()
case .interrupted:
self = .interrupted
case let .failed(error):
self = .failed(error)
case .completed:
self = .completed
}
}

func materialize() -> Event? {
switch self {
case .completed:
return .completed
case .interrupted:
return .interrupted
case let .failed(error):
return .failed(error as! Error)
case .silent:
return nil
}
}
}

/// The `Signal` is alive.
case alive(AliveState)
case alive(Bag<Observer>, hasDeinitialized: Bool)

/// The `Signal` has received a termination event, and is about to be
/// terminated.
case terminating(TerminatingState)
case terminating(Bag<Observer>, TerminationKind)

/// The `Signal` has terminated.
case terminated
}

// As the amount of state would definitely span over a cache line,
// `AliveState` and `TerminatingState` is set to be a reference type so
// that we can atomically update the reference instead.

/// The state of a `Signal` that is alive. It contains a bag of observers and
/// an optional self-retaining reference.
private final class AliveState {
/// The observers of the `Signal`.
///
/// - important: `observer` should not be mutated directly given the layout of
/// `Bag`. Copy the bag, and replace the current `AliveState` with
/// a new one created with the bag instead.
fileprivate let observers: Bag<Observer>

/// Create an alive state.
init(observers: Bag<Observer>) {
self.observers = observers
}
}

/// The state of a terminating `Signal`. It contains a bag of observers and the
/// termination event.
private final class TerminatingState {
/// The observers of the `Signal`.
fileprivate let observers: Bag<Observer>

/// The termination event.
fileprivate let event: Event?

/// Create a terminating state.
///
/// - parameters:
/// - observers: The latest bag of observers.
/// - event: The termination event.
init(observers: Bag<Observer>, event: Event?) {
self.observers = observers
self.event = event
}
}
}

extension Signal {
Expand Down