tokio_stream/wrappers/mpsc_unbounded.rs
1use crate::Stream;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use tokio::sync::mpsc::UnboundedReceiver;
5
6/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
7///
8/// # Example
9///
10/// ```
11/// use tokio::sync::mpsc;
12/// use tokio_stream::wrappers::UnboundedReceiverStream;
13/// use tokio_stream::StreamExt;
14///
15/// # #[tokio::main(flavor = "current_thread")]
16/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError<u8>> {
17/// let (tx, rx) = mpsc::unbounded_channel();
18/// tx.send(10)?;
19/// tx.send(20)?;
20/// # // prevent the doc test from hanging
21/// drop(tx);
22///
23/// let mut stream = UnboundedReceiverStream::new(rx);
24/// assert_eq!(stream.next().await, Some(10));
25/// assert_eq!(stream.next().await, Some(20));
26/// assert_eq!(stream.next().await, None);
27/// # Ok(())
28/// # }
29/// ```
30///
31/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
32/// [`Stream`]: trait@crate::Stream
33#[derive(Debug)]
34pub struct UnboundedReceiverStream<T> {
35 inner: UnboundedReceiver<T>,
36}
37
38impl<T> UnboundedReceiverStream<T> {
39 /// Create a new `UnboundedReceiverStream`.
40 pub fn new(recv: UnboundedReceiver<T>) -> Self {
41 Self { inner: recv }
42 }
43
44 /// Get back the inner `UnboundedReceiver`.
45 pub fn into_inner(self) -> UnboundedReceiver<T> {
46 self.inner
47 }
48
49 /// Closes the receiving half of a channel without dropping it.
50 ///
51 /// This prevents any further messages from being sent on the channel while
52 /// still enabling the receiver to drain messages that are buffered.
53 pub fn close(&mut self) {
54 self.inner.close();
55 }
56}
57
58impl<T> Stream for UnboundedReceiverStream<T> {
59 type Item = T;
60
61 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62 self.inner.poll_recv(cx)
63 }
64}
65
66impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
67 fn as_ref(&self) -> &UnboundedReceiver<T> {
68 &self.inner
69 }
70}
71
72impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
73 fn as_mut(&mut self) -> &mut UnboundedReceiver<T> {
74 &mut self.inner
75 }
76}
77
78impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
79 fn from(recv: UnboundedReceiver<T>) -> Self {
80 Self::new(recv)
81 }
82}