tokio_stream/wrappers/
mpsc_bounded.rs

1use crate::Stream;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use tokio::sync::mpsc::Receiver;
5
6/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
7///
8/// # Example
9///
10/// ```
11/// use tokio::sync::mpsc;
12/// use tokio_stream::wrappers::ReceiverStream;
13/// use tokio_stream::StreamExt;
14///
15/// # #[tokio::main(flavor = "current_thread")]
16/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError<u8>> {
17/// let (tx, rx) = mpsc::channel(2);
18/// tx.send(10).await?;
19/// tx.send(20).await?;
20/// # // prevent the doc test from hanging
21/// drop(tx);
22///
23/// let mut stream = ReceiverStream::new(rx);
24/// assert_eq!(stream.next().await, Some(10));
25/// assert_eq!(stream.next().await, Some(20));
26/// assert_eq!(stream.next().await, None);
27/// # Ok(())
28/// # }
29/// ```
30///
31/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver
32/// [`Stream`]: trait@crate::Stream
33#[derive(Debug)]
34pub struct ReceiverStream<T> {
35    inner: Receiver<T>,
36}
37
38impl<T> ReceiverStream<T> {
39    /// Create a new `ReceiverStream`.
40    pub fn new(recv: Receiver<T>) -> Self {
41        Self { inner: recv }
42    }
43
44    /// Get back the inner `Receiver`.
45    pub fn into_inner(self) -> Receiver<T> {
46        self.inner
47    }
48
49    /// Closes the receiving half of a channel without dropping it.
50    ///
51    /// This prevents any further messages from being sent on the channel while
52    /// still enabling the receiver to drain messages that are buffered. Any
53    /// outstanding [`Permit`] values will still be able to send messages.
54    ///
55    /// To guarantee no messages are dropped, after calling `close()`, you must
56    /// receive all items from the stream until `None` is returned.
57    ///
58    /// [`Permit`]: struct@tokio::sync::mpsc::Permit
59    pub fn close(&mut self) {
60        self.inner.close();
61    }
62}
63
64impl<T> Stream for ReceiverStream<T> {
65    type Item = T;
66
67    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
68        self.inner.poll_recv(cx)
69    }
70}
71
72impl<T> AsRef<Receiver<T>> for ReceiverStream<T> {
73    fn as_ref(&self) -> &Receiver<T> {
74        &self.inner
75    }
76}
77
78impl<T> AsMut<Receiver<T>> for ReceiverStream<T> {
79    fn as_mut(&mut self) -> &mut Receiver<T> {
80        &mut self.inner
81    }
82}
83
84impl<T> From<Receiver<T>> for ReceiverStream<T> {
85    fn from(recv: Receiver<T>) -> Self {
86        Self::new(recv)
87    }
88}