tokio_stream/wrappers/
tcp_listener.rs

1use crate::Stream;
2use std::io;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::net::{TcpListener, TcpStream};
6
7/// A wrapper around [`TcpListener`] that implements [`Stream`].
8///
9/// # Example
10///
11/// Accept connections from both IPv4 and IPv6 listeners in the same loop:
12///
13/// ```no_run
14/// use std::net::{Ipv4Addr, Ipv6Addr};
15///
16/// use tokio::net::TcpListener;
17/// use tokio_stream::{StreamExt, wrappers::TcpListenerStream};
18///
19/// # #[tokio::main(flavor = "current_thread")]
20/// # async fn main() -> std::io::Result<()> {
21/// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?;
22/// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?;
23/// let ipv4_connections = TcpListenerStream::new(ipv4_listener);
24/// let ipv6_connections = TcpListenerStream::new(ipv6_listener);
25///
26/// let mut connections = ipv4_connections.chain(ipv6_connections);
27/// while let Some(tcp_stream) = connections.next().await {
28///     let stream = tcp_stream?;
29///     let peer_addr = stream.peer_addr()?;
30///     println!("accepted connection; peer address = {peer_addr}");
31/// }
32/// # Ok(())
33/// # }
34/// ```
35///
36/// [`TcpListener`]: struct@tokio::net::TcpListener
37/// [`Stream`]: trait@crate::Stream
38#[derive(Debug)]
39#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
40pub struct TcpListenerStream {
41    inner: TcpListener,
42}
43
44impl TcpListenerStream {
45    /// Create a new `TcpListenerStream`.
46    pub fn new(listener: TcpListener) -> Self {
47        Self { inner: listener }
48    }
49
50    /// Get back the inner `TcpListener`.
51    pub fn into_inner(self) -> TcpListener {
52        self.inner
53    }
54}
55
56impl Stream for TcpListenerStream {
57    type Item = io::Result<TcpStream>;
58
59    fn poll_next(
60        self: Pin<&mut Self>,
61        cx: &mut Context<'_>,
62    ) -> Poll<Option<io::Result<TcpStream>>> {
63        match self.inner.poll_accept(cx) {
64            Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
65            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
66            Poll::Pending => Poll::Pending,
67        }
68    }
69}
70
71impl AsRef<TcpListener> for TcpListenerStream {
72    fn as_ref(&self) -> &TcpListener {
73        &self.inner
74    }
75}
76
77impl AsMut<TcpListener> for TcpListenerStream {
78    fn as_mut(&mut self) -> &mut TcpListener {
79        &mut self.inner
80    }
81}