tokio_stream/wrappers/tcp_listener.rs
1use crate::Stream;
2use std::io;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::net::{TcpListener, TcpStream};
6
7/// A wrapper around [`TcpListener`] that implements [`Stream`].
8///
9/// # Example
10///
11/// Accept connections from both IPv4 and IPv6 listeners in the same loop:
12///
13/// ```no_run
14/// use std::net::{Ipv4Addr, Ipv6Addr};
15///
16/// use tokio::net::TcpListener;
17/// use tokio_stream::{StreamExt, wrappers::TcpListenerStream};
18///
19/// # #[tokio::main(flavor = "current_thread")]
20/// # async fn main() -> std::io::Result<()> {
21/// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?;
22/// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?;
23/// let ipv4_connections = TcpListenerStream::new(ipv4_listener);
24/// let ipv6_connections = TcpListenerStream::new(ipv6_listener);
25///
26/// let mut connections = ipv4_connections.chain(ipv6_connections);
27/// while let Some(tcp_stream) = connections.next().await {
28/// let stream = tcp_stream?;
29/// let peer_addr = stream.peer_addr()?;
30/// println!("accepted connection; peer address = {peer_addr}");
31/// }
32/// # Ok(())
33/// # }
34/// ```
35///
36/// [`TcpListener`]: struct@tokio::net::TcpListener
37/// [`Stream`]: trait@crate::Stream
38#[derive(Debug)]
39#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
40pub struct TcpListenerStream {
41 inner: TcpListener,
42}
43
44impl TcpListenerStream {
45 /// Create a new `TcpListenerStream`.
46 pub fn new(listener: TcpListener) -> Self {
47 Self { inner: listener }
48 }
49
50 /// Get back the inner `TcpListener`.
51 pub fn into_inner(self) -> TcpListener {
52 self.inner
53 }
54}
55
56impl Stream for TcpListenerStream {
57 type Item = io::Result<TcpStream>;
58
59 fn poll_next(
60 self: Pin<&mut Self>,
61 cx: &mut Context<'_>,
62 ) -> Poll<Option<io::Result<TcpStream>>> {
63 match self.inner.poll_accept(cx) {
64 Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
65 Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
66 Poll::Pending => Poll::Pending,
67 }
68 }
69}
70
71impl AsRef<TcpListener> for TcpListenerStream {
72 fn as_ref(&self) -> &TcpListener {
73 &self.inner
74 }
75}
76
77impl AsMut<TcpListener> for TcpListenerStream {
78 fn as_mut(&mut self) -> &mut TcpListener {
79 &mut self.inner
80 }
81}