Skip to content
Merged
20 changes: 12 additions & 8 deletions tokio/src/io/driver/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
use crate::util::slab::Entry;
use core::mem::MaybeUninit;

use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};
Expand Down Expand Up @@ -214,7 +215,10 @@ impl ScheduledIo {
fn wake0(&self, ready: Ready, shutdown: bool) {
const NUM_WAKERS: usize = 32;

let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
let mut wakers: [MaybeUninit<Option<Waker>>; NUM_WAKERS] = unsafe {
core::mem::MaybeUninit::uninit().assume_init()
};

let mut curr = 0;

let mut waiters = self.waiters.lock();
Expand All @@ -224,15 +228,15 @@ impl ScheduledIo {
// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
wakers[curr] = Some(waker);
wakers[curr] = MaybeUninit::new(Some(waker));
curr += 1;
}
}

// check for AsyncWrite slot
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
wakers[curr] = Some(waker);
wakers[curr] = MaybeUninit::new(Some(waker));
curr += 1;
}
}
Expand All @@ -248,7 +252,7 @@ impl ScheduledIo {

if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
wakers[curr] = Some(waker);
wakers[curr] = MaybeUninit::new(Some(waker));
curr += 1;
}
}
Expand All @@ -260,8 +264,8 @@ impl ScheduledIo {

drop(waiters);

for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
for waker in &mut wakers[..curr] {
unsafe { &mut *waker.as_mut_ptr() }.take().unwrap().wake();
}

curr = 0;
Expand All @@ -273,8 +277,8 @@ impl ScheduledIo {
// Release the lock before notifying
drop(waiters);

for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
for waker in &mut wakers[..curr] {
unsafe { &mut *waker.as_mut_ptr() }.take().unwrap().wake();
}
}

Expand Down