|  | 
|  | 1 | +use core::fmt; | 
|  | 2 | +use core::pin::Pin; | 
|  | 3 | +use futures_core::future::{FusedFuture, Future}; | 
|  | 4 | +use futures_core::ready; | 
|  | 5 | +use futures_core::stream::Stream; | 
|  | 6 | +use futures_core::task::{Context, Poll}; | 
|  | 7 | +use pin_project_lite::pin_project; | 
|  | 8 | + | 
|  | 9 | +pin_project! { | 
|  | 10 | +    /// Future for the [`any`](super::StreamExt::any) method. | 
|  | 11 | +    #[must_use = "futures do nothing unless you `.await` or poll them"] | 
|  | 12 | +    pub struct Any<St, Fut, F> { | 
|  | 13 | +        #[pin] | 
|  | 14 | +        stream: St, | 
|  | 15 | +        f: F, | 
|  | 16 | +        accum: Option<bool>, | 
|  | 17 | +        #[pin] | 
|  | 18 | +        future: Option<Fut>, | 
|  | 19 | +    } | 
|  | 20 | +} | 
|  | 21 | + | 
|  | 22 | +impl<St, Fut, F> fmt::Debug for Any<St, Fut, F> | 
|  | 23 | +where | 
|  | 24 | +    St: fmt::Debug, | 
|  | 25 | +    Fut: fmt::Debug, | 
|  | 26 | +{ | 
|  | 27 | +    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|  | 28 | +        f.debug_struct("Any") | 
|  | 29 | +            .field("stream", &self.stream) | 
|  | 30 | +            .field("accum", &self.accum) | 
|  | 31 | +            .field("future", &self.future) | 
|  | 32 | +            .finish() | 
|  | 33 | +    } | 
|  | 34 | +} | 
|  | 35 | + | 
|  | 36 | +impl<St, Fut, F> Any<St, Fut, F> | 
|  | 37 | +where | 
|  | 38 | +    St: Stream, | 
|  | 39 | +    F: FnMut(St::Item) -> Fut, | 
|  | 40 | +    Fut: Future<Output = bool>, | 
|  | 41 | +{ | 
|  | 42 | +    pub(super) fn new(stream: St, f: F) -> Self { | 
|  | 43 | +        Self { stream, f, accum: Some(false), future: None } | 
|  | 44 | +    } | 
|  | 45 | +} | 
|  | 46 | + | 
|  | 47 | +impl<St, Fut, F> FusedFuture for Any<St, Fut, F> | 
|  | 48 | +where | 
|  | 49 | +    St: Stream, | 
|  | 50 | +    F: FnMut(St::Item) -> Fut, | 
|  | 51 | +    Fut: Future<Output = bool>, | 
|  | 52 | +{ | 
|  | 53 | +    fn is_terminated(&self) -> bool { | 
|  | 54 | +        self.accum.is_none() && self.future.is_none() | 
|  | 55 | +    } | 
|  | 56 | +} | 
|  | 57 | + | 
|  | 58 | +impl<St, Fut, F> Future for Any<St, Fut, F> | 
|  | 59 | +where | 
|  | 60 | +    St: Stream, | 
|  | 61 | +    F: FnMut(St::Item) -> Fut, | 
|  | 62 | +    Fut: Future<Output = bool>, | 
|  | 63 | +{ | 
|  | 64 | +    type Output = bool; | 
|  | 65 | + | 
|  | 66 | +    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> { | 
|  | 67 | +        let mut this = self.project(); | 
|  | 68 | +        Poll::Ready(loop { | 
|  | 69 | +            if let Some(fut) = this.future.as_mut().as_pin_mut() { | 
|  | 70 | +                // we're currently processing a future to produce a new accum value | 
|  | 71 | +                let acc = this.accum.unwrap() || ready!(fut.poll(cx)); | 
|  | 72 | +                if acc { | 
|  | 73 | +                    break true; | 
|  | 74 | +                } // early exit | 
|  | 75 | +                *this.accum = Some(acc); | 
|  | 76 | +                this.future.set(None); | 
|  | 77 | +            } else if this.accum.is_some() { | 
|  | 78 | +                // we're waiting on a new item from the stream | 
|  | 79 | +                match ready!(this.stream.as_mut().poll_next(cx)) { | 
|  | 80 | +                    Some(item) => { | 
|  | 81 | +                        this.future.set(Some((this.f)(item))); | 
|  | 82 | +                    } | 
|  | 83 | +                    None => { | 
|  | 84 | +                        break this.accum.take().unwrap(); | 
|  | 85 | +                    } | 
|  | 86 | +                } | 
|  | 87 | +            } else { | 
|  | 88 | +                panic!("Any polled after completion") | 
|  | 89 | +            } | 
|  | 90 | +        }) | 
|  | 91 | +    } | 
|  | 92 | +} | 
0 commit comments