1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
//! Asynchronous streams. use Poll; use task; /// A stream of values produced asynchronously. /// /// If `Future` is an asynchronous version of `Result`, then `Stream` is an /// asynchronous version of `Iterator`. A stream represents a sequence of /// value-producing events that occur asynchronously to the caller. /// /// The trait is modeled after `Future`, but allows `poll_next` to be called /// even after a value has been produced, yielding `None` once the stream has /// been fully exhausted. /// /// # Errors /// /// Streams, like futures, also bake in errors through an associated `Error` /// type. An error on a stream **does not terminate the stream**. That is, /// after one error is received, another value may be received from the same /// stream (it's valid to keep polling). Thus a stream is somewhat like an /// `Iterator<Item = Result<T, E>>`, and is always terminated by returning /// `None`. pub trait Stream { /// Values yielded by the stream. type Item; /// Errors yielded by the stream. type Error; /// Attempt to pull out the next value of this stream, registering the /// current task for wakeup if the value is not yet available, and returning /// `None` if the stream is exhausted. /// /// # Return value /// /// There are several possible return values, each indicating a distinct /// stream state: /// /// - [`Ok(Pending)`](::Async) means that this stream's next value is not /// ready yet. Implementations will ensure that the current task will be /// notified when the next value may be ready. /// /// - [`Ok(Ready(Some(val)))`](::Async) means that the stream has /// successfully produced a value, `val`, and may produce further values /// on subsequent `poll_next` calls. /// /// - [`Ok(Ready(None))`](::Async) means that the stream has terminated, and /// `poll_next` should not be invoked again. /// /// - `Err(err)` means that the stream encountered an error while trying to /// `poll_next`. Subsequent calls to `poll_next` *are* allowed, and may /// return further values or errors. /// /// # Panics /// /// Once a stream is finished, i.e. `Ready(None)` has been returned, further /// calls to `poll_next` may result in a panic or other "bad behavior". If this /// is difficult to guard against then the `fuse` adapter can be used to /// ensure that `poll_next` always returns `Ready(None)` in subsequent calls. fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error>; } impl<'a, S: ?Sized + Stream> Stream for &'a mut S { type Item = S::Item; type Error = S::Error; fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> { (**self).poll_next(cx) } } if_std! { use Async; use never::Never; impl<S: ?Sized + Stream> Stream for ::std::boxed::Box<S> { type Item = S::Item; type Error = S::Error; fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> { (**self).poll_next(cx) } } #[cfg(feature = "nightly")] impl<S: ?Sized + Stream> Stream for ::std::boxed::PinBox<S> { type Item = S::Item; type Error = S::Error; fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> { unsafe { ::core::mem::PinMut::get_mut_unchecked(self.as_pin_mut()).poll_next(cx) } } } impl<S: Stream> Stream for ::std::panic::AssertUnwindSafe<S> { type Item = S::Item; type Error = S::Error; fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> { self.0.poll_next(cx) } } impl<T> Stream for ::std::collections::VecDeque<T> { type Item = T; type Error = Never; fn poll_next(&mut self, _cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> { Ok(Async::Ready(self.pop_front())) } } }