1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//! Asynchronous streams.

use Poll;
use task;

/// A stream of values produced asynchronously.
///
/// If `Future` is an asynchronous version of `Result`, then `Stream` is an
/// asynchronous version of `Iterator`. A stream represents a sequence of
/// value-producing events that occur asynchronously to the caller.
///
/// The trait is modeled after `Future`, but allows `poll_next` to be called
/// even after a value has been produced, yielding `None` once the stream has
/// been fully exhausted.
///
/// # Errors
///
/// Streams, like futures, also bake in errors through an associated `Error`
/// type. An error on a stream **does not terminate the stream**. That is,
/// after one error is received, another value may be received from the same
/// stream (it's valid to keep polling). Thus a stream is somewhat like an
/// `Iterator<Item = Result<T, E>>`, and is always terminated by returning
/// `None`.
pub trait Stream {
    /// Values yielded by the stream.
    type Item;

    /// Errors yielded by the stream.
    type Error;

    /// Attempt to pull out the next value of this stream, registering the
    /// current task for wakeup if the value is not yet available, and returning
    /// `None` if the stream is exhausted.
    ///
    /// # Return value
    ///
    /// There are several possible return values, each indicating a distinct
    /// stream state:
    ///
    /// - [`Ok(Pending)`](::Async) means that this stream's next value is not
    /// ready yet. Implementations will ensure that the current task will be
    /// notified when the next value may be ready.
    ///
    /// - [`Ok(Ready(Some(val)))`](::Async) means that the stream has
    /// successfully produced a value, `val`, and may produce further values
    /// on subsequent `poll_next` calls.
    ///
    /// - [`Ok(Ready(None))`](::Async) means that the stream has terminated, and
    /// `poll_next` should not be invoked again.
    ///
    /// - `Err(err)` means that the stream encountered an error while trying to
    /// `poll_next`. Subsequent calls to `poll_next` *are* allowed, and may
    /// return further values or errors.
    ///
    /// # Panics
    ///
    /// Once a stream is finished, i.e. `Ready(None)` has been returned, further
    /// calls to `poll_next` may result in a panic or other "bad behavior".  If this
    /// is difficult to guard against then the `fuse` adapter can be used to
    /// ensure that `poll_next` always returns `Ready(None)` in subsequent calls.
    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error>;
}

impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
    type Item = S::Item;
    type Error = S::Error;

    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
        (**self).poll_next(cx)
    }
}

if_std! {
    use Async;
    use never::Never;

    impl<S: ?Sized + Stream> Stream for ::std::boxed::Box<S> {
        type Item = S::Item;
        type Error = S::Error;

        fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
            (**self).poll_next(cx)
        }
    }

    #[cfg(feature = "nightly")]
    impl<S: ?Sized + Stream> Stream for ::std::boxed::PinBox<S> {
        type Item = S::Item;
        type Error = S::Error;

        fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
            unsafe { ::core::mem::PinMut::get_mut_unchecked(self.as_pin_mut()).poll_next(cx) }
        }
    }

    impl<S: Stream> Stream for ::std::panic::AssertUnwindSafe<S> {
        type Item = S::Item;
        type Error = S::Error;

        fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
            self.0.poll_next(cx)
        }
    }

    impl<T> Stream for ::std::collections::VecDeque<T> {
        type Item = T;
        type Error = Never;

        fn poll_next(&mut self, _cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
            Ok(Async::Ready(self.pop_front()))
        }
    }
}