Skip to content

Commit b5bd8fe

Browse files
committed
Introduce futures_util::stream::select_bias.
The current select function only supports round robin. To change that would be a breaking change, so for now I propose a `select_bias` which takes a closure that defines the selection strategy. - When a breaking change is planned, we could re-consider this, but it does have 4 generic parameters of which one is a closure, compared to the 2 streams in `select`. - note the name select_bias. It's not consistent with `select_biased` used elsewhere in the lib. This can obviously be changed, but I feel the "ed" at the end does not add any semantic value but makes function names longer. - There isn't really integration tests included right now, but the examples in the docs get tested by doc test.
1 parent 06d21cd commit b5bd8fe

File tree

2 files changed

+223
-0
lines changed

2 files changed

+223
-0
lines changed

futures-util/src/stream/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ pub use self::poll_fn::{poll_fn, PollFn};
9292
mod select;
9393
pub use self::select::{select, Select};
9494

95+
mod select_bias;
96+
pub use self::select_bias::{select_bias, PollNext, SelectBias};
97+
9598
mod unfold;
9699
pub use self::unfold::{unfold, Unfold};
97100

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
use super::assert_stream;
2+
use crate::stream::{Fuse, StreamExt};
3+
use core::pin::Pin;
4+
use futures_core::stream::{FusedStream, Stream};
5+
use futures_core::task::{Context, Poll};
6+
use pin_project_lite::pin_project;
7+
8+
/// Type to tell [`SelectBias`] which stream to poll next.
9+
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
10+
pub enum PollNext {
11+
/// Poll the first stream.
12+
Left,
13+
/// Poll the second stream.
14+
Right,
15+
}
16+
17+
impl PollNext {
18+
/// Toggle the value and return the old one.
19+
pub fn toggle(&mut self) -> Self {
20+
let old = *self;
21+
22+
match self {
23+
PollNext::Left => *self = PollNext::Right,
24+
PollNext::Right => *self = PollNext::Left,
25+
}
26+
27+
old
28+
}
29+
}
30+
31+
impl Default for PollNext {
32+
fn default() -> Self {
33+
PollNext::Left
34+
}
35+
}
36+
37+
pin_project! {
38+
/// Stream for the [`select_bias()`] function. See function docs for details.
39+
#[derive(Debug)]
40+
#[must_use = "streams do nothing unless polled"]
41+
pub struct SelectBias<St1, St2, Clos, State> {
42+
#[pin]
43+
stream1: Fuse<St1>,
44+
#[pin]
45+
stream2: Fuse<St2>,
46+
state: State,
47+
clos: Clos,
48+
}
49+
}
50+
51+
/// This function will attempt to pull items from both streams. You provide a
52+
/// closure to tell the [`SelectBias`] which stream to poll. The closure can
53+
/// store state on `SelectBias` to which it will receive a `&mut` on every
54+
/// invocation. This allows basing the strategy on prior choices.
55+
///
56+
/// After one of the two input stream completes, the remaining one will be
57+
/// polled exclusively. The returned stream completes when both input
58+
/// streams have completed.
59+
///
60+
/// Note that this function consumes both streams and returns a wrapped
61+
/// version of them.
62+
///
63+
/// ## Examples
64+
///
65+
/// ### Priority
66+
/// This example shows how to always prioritize the left stream.
67+
///
68+
/// ```rust
69+
///
70+
/// # futures::executor::block_on(async {
71+
/// use futures_util::stream::{ select_bias, PollNext, StreamExt };
72+
/// use futures::stream::repeat;
73+
///
74+
/// let left = repeat(1);
75+
/// let right = repeat(2);
76+
///
77+
/// // We don't need any state, so lets make it an empty tuple.
78+
/// // We must provide some type here, as there is no way for the compiler
79+
/// // to infer it.
80+
/// let prio_left = |_: &mut ()| PollNext::Left;
81+
///
82+
/// let mut out = select_bias(left, right, prio_left);
83+
///
84+
/// for _ in 0..100 {
85+
/// // Whenever we poll out, we will alwasy get `1`.
86+
/// assert_eq!(1, out.select_next_some().await);
87+
/// }
88+
///
89+
/// });
90+
/// ```
91+
///
92+
/// ### Round Robin
93+
/// This example shows how to select from both streams round robin.
94+
///
95+
/// ```rust
96+
///
97+
/// # futures::executor::block_on(async {
98+
/// use futures_util::stream::{ select_bias, PollNext, StreamExt };
99+
/// use futures::stream::repeat;
100+
///
101+
/// let left = repeat(1);
102+
/// let right = repeat(2);
103+
///
104+
/// // We don't need any state, so lets make it an empty tuple.
105+
/// let rrobin = |last: &mut PollNext| last.toggle();
106+
///
107+
/// let mut out = select_bias(left, right, rrobin);
108+
///
109+
/// for _ in 0..100 {
110+
/// // We should be alternating now.
111+
/// assert_eq!(1, out.select_next_some().await);
112+
/// assert_eq!(2, out.select_next_some().await);
113+
/// }
114+
///
115+
/// });
116+
/// ```
117+
pub fn select_bias<St1, St2, Clos, State>(
118+
stream1: St1,
119+
stream2: St2,
120+
which: Clos,
121+
) -> SelectBias<St1, St2, Clos, State>
122+
where
123+
St1: Stream,
124+
St2: Stream<Item = St1::Item>,
125+
Clos: FnMut(&mut State) -> PollNext,
126+
State: Default,
127+
{
128+
assert_stream::<St1::Item, _>(SelectBias {
129+
stream1: stream1.fuse(),
130+
stream2: stream2.fuse(),
131+
state: Default::default(),
132+
clos: which,
133+
})
134+
}
135+
136+
impl<St1, St2, Clos, State> SelectBias<St1, St2, Clos, State> {
137+
/// Acquires a reference to the underlying streams that this combinator is
138+
/// pulling from.
139+
pub fn get_ref(&self) -> (&St1, &St2) {
140+
(self.stream1.get_ref(), self.stream2.get_ref())
141+
}
142+
143+
/// Acquires a mutable reference to the underlying streams that this
144+
/// combinator is pulling from.
145+
///
146+
/// Note that care must be taken to avoid tampering with the state of the
147+
/// stream which may otherwise confuse this combinator.
148+
pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
149+
(self.stream1.get_mut(), self.stream2.get_mut())
150+
}
151+
152+
/// Acquires a pinned mutable reference to the underlying streams that this
153+
/// combinator is pulling from.
154+
///
155+
/// Note that care must be taken to avoid tampering with the state of the
156+
/// stream which may otherwise confuse this combinator.
157+
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
158+
let this = self.project();
159+
(this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
160+
}
161+
162+
/// Consumes this combinator, returning the underlying streams.
163+
///
164+
/// Note that this may discard intermediate state of this combinator, so
165+
/// care should be taken to avoid losing resources when this is called.
166+
pub fn into_inner(self) -> (St1, St2) {
167+
(self.stream1.into_inner(), self.stream2.into_inner())
168+
}
169+
}
170+
171+
impl<St1, St2, Clos, State> FusedStream for SelectBias<St1, St2, Clos, State>
172+
where
173+
St1: Stream,
174+
St2: Stream<Item = St1::Item>,
175+
Clos: FnMut(&mut State) -> PollNext,
176+
{
177+
fn is_terminated(&self) -> bool {
178+
self.stream1.is_terminated() && self.stream2.is_terminated()
179+
}
180+
}
181+
182+
impl<St1, St2, Clos, State> Stream for SelectBias<St1, St2, Clos, State>
183+
where
184+
St1: Stream,
185+
St2: Stream<Item = St1::Item>,
186+
Clos: FnMut(&mut State) -> PollNext,
187+
{
188+
type Item = St1::Item;
189+
190+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
191+
let this = self.project();
192+
193+
match (this.clos)(this.state) {
194+
PollNext::Left => poll_inner(this.stream1, this.stream2, cx),
195+
PollNext::Right => poll_inner(this.stream2, this.stream1, cx),
196+
}
197+
}
198+
}
199+
200+
fn poll_inner<St1, St2>(
201+
a: Pin<&mut St1>,
202+
b: Pin<&mut St2>,
203+
cx: &mut Context<'_>,
204+
) -> Poll<Option<St1::Item>>
205+
where
206+
St1: Stream,
207+
St2: Stream<Item = St1::Item>,
208+
{
209+
let a_done = match a.poll_next(cx) {
210+
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
211+
Poll::Ready(None) => true,
212+
Poll::Pending => false,
213+
};
214+
215+
match b.poll_next(cx) {
216+
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
217+
Poll::Ready(None) if a_done => Poll::Ready(None),
218+
Poll::Ready(None) | Poll::Pending => Poll::Pending,
219+
}
220+
}

0 commit comments

Comments
 (0)