-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Update mpsc capacity docs #7703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
tokio/src/sync/mpsc/bounded.rs
Outdated
| /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity | ||
| /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`]. | ||
| /// with [`reserve`]. | ||
| /// | ||
| /// The capacity goes up when values are received, unless there are | ||
| /// existing, non-cancelled calls to [`Sender::send`] or [`Sender::reserve`] | ||
| /// which have returned [`Poll::Pending`]. While those calls exist, reading | ||
| /// values from the [`Receiver`] gives access to a channel slot directly to | ||
| /// those callers, in FIFO order, without modifying the capacity. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this wording is too pedantic, and adding this would be confusing. If you receive a value and there is a call to reserve, then from the caller's perspective it just went up and then immediately went down again. The fact that we are able to avoid the up-then-down is an implementation detail.
The primary purpose of this section is to explain how capacity() and len() are different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the wording is a bit pedantic. To back up a second, I'm trying to cope with the following case:
let (tx, rx) = tokio::mpsc::channel(1);
...
loop {
tokio::select! {
// Suppose this is queued first, but is only returning `Poll::pending`
_ = rx.send(...) => { return }
// Suppose instead, this select! branch triggers...
_ = yield_interval.tick() => {
// ... Then this call to `send` will be stuck forever, queued behind the first rx.send,
// which is no longer polled.
let result = rx.send(...).await;
if result = ... { return }
}
}
}We've hit this issue in our codebase (oxidecomputer/omicron#9272).
This deadlock makes sense to me with the knowledge that:
tokio::mpsc::Senderuses a semaphore queue for FIFO access to capacity, and callers are enqueued for access to this semaphore when they callsendorreserveand get backPoll::Pending.- When a
tokio::mpsc::Receivercallsrecv, if there are any pending waiters, it grants capacity directly to pendingtokio::mpsc::Sender::sendfutures, if any exist, regardless of whether or not they are being polled. Granting capacity is distinct from granting access to a particular slot.
There is a comment on send and reserve about this:
This channel uses a queue to ensure that calls to send and reserve complete in the order they were requested.
But I think there's important nuance in my aforementioned case: when we say "calls to send and reserve complete in the order they were requested", we're not saying "they will return Poll::Ok" in a particular order, nor are we saying "we will enforce that they send messages in the mpsc with an ordering of when they were first polled". Rather, this statement actually means:
- Capacity from the channel will be given to callers of
sendandreceivein this FIFO order (internally: permits from the semaphore queue) based on their calling order - Once permits are granted, order of sending messages is based off of "when the caller next polls". It's possible for a future created by
sendto get a permit, but yet not be re-polled for a while - it would not necessarily block the order of the mpsc by taking a specific slot, but it would be holding on to a portion of the total capacity.
I created a playground link for this case here: https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=c4f2c5b7b0616eee84eee4ba63193fdc
If you receive a value and there is a call to reserve, then from the caller's perspective it just went up and then immediately went down again. The fact that we are able to avoid the up-then-down is an implementation detail.
I'm struggling with treating this as an implementation detail - "how capacity works", along with the doc comment about "calls to send and reserve complete in the order they were requested", leave a few user-visible oddities:
- As shown in the playground link, the a call to
recvcan cause the capacity to go from zero -> zero, even with no concurrent operations, just with the existence of a prior incompletesend/reservefuture in existence somewhere. This feels at odds with the comment that "The capacity goes up when values are received by the Receiver". - Calling
send/reserveand getting back aPoll::Pendingfuture has a lot of implications for ordering, which are also user-visible: as long as the future remains non-cancelled, the guarantee provided here is that capacity is granted to the futures in FIFO order, not that the calls tosend/reservewill actually complete in a particular order.
Both of these aspects combined led me to "wanting to clarify what capacity means, and how it is provided to callers, especially in the context of a saturated channel".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- As shown in the playground link, the a call to
recvcan cause the capacity to go from zero -> zero, even with no concurrent operations, just with the existence of a prior incompletesend/reservefuture in existence somewhere. This feels at odds with the comment that "The capacity goes up when values are received by the Receiver".
At the risk of being even more pedantic (sorry), I don't agree that there are "no concurrent operations" in this case; I think "the existence of a prior incomplete send/reserve future" is reasonably considered a concurrent operation, even if it is not executing in parallel. When multiple futures are multiplexed into a single task using concurrency primitives like select!, FuturesUnordered, join!, et cetera, we generally think of this as a form of concurrency even if it is not parallelism.
I do, however, agree with @smklein that we should make sure it's clear to the user of the MPSC that capacity is always assigned to outstanding concurrent send operations in the order that they started to wait on send/reserve calls, even if those send/reserve calls do not complete in that order. And, we should make sure that it is clear to the caller that in situations where there are outstanding send/reserve calls, the capacity may not be visible in calls to capacity() if it has already been assigned to a waiting sender.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the concurrency note - valid point, especially with the example above using a timer. That's definitely concurrent, but not parallel. But I thiiiiiiink the playground example (https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=c4f2c5b7b0616eee84eee4ba63193fdc) is not concurrent, even though there is a biased select! operation in-play. I'm just using it as a shorthand for:
- Poll the pinned send
- Then go do the other branch
Which I expect it'll do in a deterministic ordering, because of the biased keyword.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to see the things you mentioned be documented. But I don't think the fn capacity() docs are the right place. Perhaps you could add a new section to the struct docs instead?
…pacity. Tested this with a larger saturated mpsc
Motivation
The docs for
tokio::mpsc::Senderandtokio::mpsc::Receiverboth includecapacity, which only describes behavior accurately when there are no pendingsendorreservefutures that have been queued.In the case that these futures exist, some of the documentation on
capacityis no longer accurate. For example: it's possible for a receiver torecva message, but for the capacity to appear unchanged! This can happen when a slot is transferred directly to one of the waiting calls tosend, which had not previously returnedPoll::Ok.Solution
This PR re-writes the capacity documentation to more explicitly handle these cases.
This aligns with some of the existing documentation on
sendandreservefor theSender, which mention the following: