tokio_stream/wrappers/mpsc_bounded.rs
1use crate::Stream;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use tokio::sync::mpsc::Receiver;
5
6/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
7///
8/// # Example
9///
10/// ```
11/// use tokio::sync::mpsc;
12/// use tokio_stream::wrappers::ReceiverStream;
13/// use tokio_stream::StreamExt;
14///
15/// # #[tokio::main(flavor = "current_thread")]
16/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError<u8>> {
17/// let (tx, rx) = mpsc::channel(2);
18/// tx.send(10).await?;
19/// tx.send(20).await?;
20/// # // prevent the doc test from hanging
21/// drop(tx);
22///
23/// let mut stream = ReceiverStream::new(rx);
24/// assert_eq!(stream.next().await, Some(10));
25/// assert_eq!(stream.next().await, Some(20));
26/// assert_eq!(stream.next().await, None);
27/// # Ok(())
28/// # }
29/// ```
30///
31/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver
32/// [`Stream`]: trait@crate::Stream
33#[derive(Debug)]
34pub struct ReceiverStream<T> {
35 inner: Receiver<T>,
36}
37
38impl<T> ReceiverStream<T> {
39 /// Create a new `ReceiverStream`.
40 pub fn new(recv: Receiver<T>) -> Self {
41 Self { inner: recv }
42 }
43
44 /// Get back the inner `Receiver`.
45 pub fn into_inner(self) -> Receiver<T> {
46 self.inner
47 }
48
49 /// Closes the receiving half of a channel without dropping it.
50 ///
51 /// This prevents any further messages from being sent on the channel while
52 /// still enabling the receiver to drain messages that are buffered. Any
53 /// outstanding [`Permit`] values will still be able to send messages.
54 ///
55 /// To guarantee no messages are dropped, after calling `close()`, you must
56 /// receive all items from the stream until `None` is returned.
57 ///
58 /// [`Permit`]: struct@tokio::sync::mpsc::Permit
59 pub fn close(&mut self) {
60 self.inner.close();
61 }
62}
63
64impl<T> Stream for ReceiverStream<T> {
65 type Item = T;
66
67 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
68 self.inner.poll_recv(cx)
69 }
70}
71
72impl<T> AsRef<Receiver<T>> for ReceiverStream<T> {
73 fn as_ref(&self) -> &Receiver<T> {
74 &self.inner
75 }
76}
77
78impl<T> AsMut<Receiver<T>> for ReceiverStream<T> {
79 fn as_mut(&mut self) -> &mut Receiver<T> {
80 &mut self.inner
81 }
82}
83
84impl<T> From<Receiver<T>> for ReceiverStream<T> {
85 fn from(recv: Receiver<T>) -> Self {
86 Self::new(recv)
87 }
88}