Skip to content

Commit ab0e60d

Browse files
committed
sync: fix racy UnsafeCell access on a closed oneshot (#4226)
1 parent ac89d89 commit ab0e60d

File tree

2 files changed

+61
-5
lines changed

2 files changed

+61
-5
lines changed

tokio/src/sync/oneshot.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,11 +1031,38 @@ impl State {
10311031
}
10321032

10331033
fn set_complete(cell: &AtomicUsize) -> State {
1034-
// TODO: This could be `Release`, followed by an `Acquire` fence *if*
1035-
// the `RX_TASK_SET` flag is set. However, `loom` does not support
1036-
// fences yet.
1037-
let val = cell.fetch_or(VALUE_SENT, AcqRel);
1038-
State(val)
1034+
// This method is a compare-and-swap loop rather than a fetch-or like
1035+
// other `set_$WHATEVER` methods on `State`. This is because we must
1036+
// check if the state has been closed before setting the `VALUE_SENT`
1037+
// bit.
1038+
//
1039+
// We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1040+
// bit is already set, because `VALUE_SENT` will tell the receiver that
1041+
// it's okay to access the inner `UnsafeCell`. Immediately after calling
1042+
// `set_complete`, if the channel was closed, the sender will _also_
1043+
// access the `UnsafeCell` to take the value back out, so if a
1044+
// `poll_recv` or `try_recv` call is occurring concurrently, both
1045+
// threads may try to access the `UnsafeCell` if we were to set the
1046+
// `VALUE_SENT` bit on a closed channel.
1047+
let mut state = cell.load(Ordering::Relaxed);
1048+
loop {
1049+
if State(state).is_closed() {
1050+
break;
1051+
}
1052+
// TODO: This could be `Release`, followed by an `Acquire` fence *if*
1053+
// the `RX_TASK_SET` flag is set. However, `loom` does not support
1054+
// fences yet.
1055+
match cell.compare_exchange_weak(
1056+
state,
1057+
state | VALUE_SENT,
1058+
Ordering::AcqRel,
1059+
Ordering::Acquire,
1060+
) {
1061+
Ok(_) => break,
1062+
Err(actual) => state = actual,
1063+
}
1064+
}
1065+
State(state)
10391066
}
10401067

10411068
fn is_rx_task_set(self) -> bool {

tokio/src/sync/tests/loom_oneshot.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,35 @@ fn changing_rx_task() {
5555
});
5656
}
5757

58+
#[test]
59+
fn try_recv_close() {
60+
// reproduces https://github.com/tokio-rs/tokio/issues/4225
61+
loom::model(|| {
62+
let (tx, mut rx) = oneshot::channel();
63+
thread::spawn(move || {
64+
let _ = tx.send(());
65+
});
66+
67+
rx.close();
68+
let _ = rx.try_recv();
69+
})
70+
}
71+
72+
#[test]
73+
fn recv_closed() {
74+
// reproduces https://github.com/tokio-rs/tokio/issues/4225
75+
loom::model(|| {
76+
let (tx, mut rx) = oneshot::channel();
77+
78+
thread::spawn(move || {
79+
let _ = tx.send(1);
80+
});
81+
82+
rx.close();
83+
let _ = block_on(rx);
84+
});
85+
}
86+
5887
// TODO: Move this into `oneshot` proper.
5988

6089
use std::future::Future;

0 commit comments

Comments
 (0)