tokio/net/tcp/
stream.rs

1cfg_not_wasi! {
2    use crate::net::{to_socket_addrs, ToSocketAddrs};
3    use std::future::poll_fn;
4    use std::time::Duration;
5}
6
7use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
8use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
9use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
10
11use std::fmt;
12use std::io;
13use std::net::{Shutdown, SocketAddr};
14use std::pin::Pin;
15use std::task::{ready, Context, Poll};
16
17cfg_io_util! {
18    use bytes::BufMut;
19}
20
21cfg_net! {
22    /// A TCP stream between a local and a remote socket.
23    ///
24    /// A TCP stream can either be created by connecting to an endpoint, via the
25    /// [`connect`] method, or by [accepting] a connection from a [listener]. A
26    /// TCP stream can also be created via the [`TcpSocket`] type.
27    ///
28    /// Reading and writing to a `TcpStream` is usually done using the
29    /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
30    /// traits.
31    ///
32    /// [`connect`]: method@TcpStream::connect
33    /// [accepting]: method@crate::net::TcpListener::accept
34    /// [listener]: struct@crate::net::TcpListener
35    /// [`TcpSocket`]: struct@crate::net::TcpSocket
36    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
37    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
38    ///
39    /// # Examples
40    ///
41    /// ```no_run
42    /// use tokio::net::TcpStream;
43    /// use tokio::io::AsyncWriteExt;
44    /// use std::error::Error;
45    ///
46    /// #[tokio::main]
47    /// async fn main() -> Result<(), Box<dyn Error>> {
48    ///     // Connect to a peer
49    ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
50    ///
51    ///     // Write some data.
52    ///     stream.write_all(b"hello world!").await?;
53    ///
54    ///     Ok(())
55    /// }
56    /// ```
57    ///
58    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
59    ///
60    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
61    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
62    ///
63    /// To shut down the stream in the write direction, you can call the
64    /// [`shutdown()`] method. This will cause the other peer to receive a read of
65    /// length 0, indicating that no more data will be sent. This only closes
66    /// the stream in one direction.
67    ///
68    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
69    pub struct TcpStream {
70        io: PollEvented<mio::net::TcpStream>,
71    }
72}
73
74impl TcpStream {
75    cfg_not_wasi! {
76        /// Opens a TCP connection to a remote host.
77        ///
78        /// `addr` is an address of the remote host. Anything which implements the
79        /// [`ToSocketAddrs`] trait can be supplied as the address.  If `addr`
80        /// yields multiple addresses, connect will be attempted with each of the
81        /// addresses until a connection is successful. If none of the addresses
82        /// result in a successful connection, the error returned from the last
83        /// connection attempt (the last address) is returned.
84        ///
85        /// To configure the socket before connecting, you can use the [`TcpSocket`]
86        /// type.
87        ///
88        /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
89        /// [`TcpSocket`]: struct@crate::net::TcpSocket
90        ///
91        /// # Examples
92        ///
93        /// ```no_run
94        /// use tokio::net::TcpStream;
95        /// use tokio::io::AsyncWriteExt;
96        /// use std::error::Error;
97        ///
98        /// #[tokio::main]
99        /// async fn main() -> Result<(), Box<dyn Error>> {
100        ///     // Connect to a peer
101        ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
102        ///
103        ///     // Write some data.
104        ///     stream.write_all(b"hello world!").await?;
105        ///
106        ///     Ok(())
107        /// }
108        /// ```
109        ///
110        /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
111        ///
112        /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
113        /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
114        pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
115            let addrs = to_socket_addrs(addr).await?;
116
117            let mut last_err = None;
118
119            for addr in addrs {
120                match TcpStream::connect_addr(addr).await {
121                    Ok(stream) => return Ok(stream),
122                    Err(e) => last_err = Some(e),
123                }
124            }
125
126            Err(last_err.unwrap_or_else(|| {
127                io::Error::new(
128                    io::ErrorKind::InvalidInput,
129                    "could not resolve to any address",
130                )
131            }))
132        }
133
134        /// Establishes a connection to the specified `addr`.
135        async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
136            let sys = mio::net::TcpStream::connect(addr)?;
137            TcpStream::connect_mio(sys).await
138        }
139
140        pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
141            let stream = TcpStream::new(sys)?;
142
143            // Once we've connected, wait for the stream to be writable as
144            // that's when the actual connection has been initiated. Once we're
145            // writable we check for `take_socket_error` to see if the connect
146            // actually hit an error or not.
147            //
148            // If all that succeeded then we ship everything on up.
149            poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
150
151            if let Some(e) = stream.io.take_error()? {
152                return Err(e);
153            }
154
155            Ok(stream)
156        }
157    }
158
159    pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
160        let io = PollEvented::new(connected)?;
161        Ok(TcpStream { io })
162    }
163
164    /// Creates new `TcpStream` from a `std::net::TcpStream`.
165    ///
166    /// This function is intended to be used to wrap a TCP stream from the
167    /// standard library in the Tokio equivalent.
168    ///
169    /// # Notes
170    ///
171    /// The caller is responsible for ensuring that the stream is in
172    /// non-blocking mode. Otherwise all I/O operations on the stream
173    /// will block the thread, which will cause unexpected behavior.
174    /// Non-blocking mode can be set using [`set_nonblocking`].
175    ///
176    /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
177    ///
178    /// # Examples
179    ///
180    /// ```rust,no_run
181    /// use std::error::Error;
182    /// use tokio::net::TcpStream;
183    ///
184    /// #[tokio::main]
185    /// async fn main() -> Result<(), Box<dyn Error>> {
186    ///     let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
187    ///     std_stream.set_nonblocking(true)?;
188    ///     let stream = TcpStream::from_std(std_stream)?;
189    ///     Ok(())
190    /// }
191    /// ```
192    ///
193    /// # Panics
194    ///
195    /// This function panics if it is not called from within a runtime with
196    /// IO enabled.
197    ///
198    /// The runtime is usually set implicitly when this function is called
199    /// from a future driven by a tokio runtime, otherwise runtime can be set
200    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
201    #[track_caller]
202    pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
203        let io = mio::net::TcpStream::from_std(stream);
204        let io = PollEvented::new(io)?;
205        Ok(TcpStream { io })
206    }
207
208    /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
209    ///
210    /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
211    /// Use [`set_nonblocking`] to change the blocking mode if needed.
212    ///
213    /// # Examples
214    ///
215    /// ```
216    /// # if cfg!(miri) { return } // No `socket` in miri.
217    /// use std::error::Error;
218    /// use std::io::Read;
219    /// use tokio::net::TcpListener;
220    /// # use tokio::net::TcpStream;
221    /// # use tokio::io::AsyncWriteExt;
222    ///
223    /// #[tokio::main]
224    /// async fn main() -> Result<(), Box<dyn Error>> {
225    ///     let mut data = [0u8; 12];
226    /// #   if false {
227    ///     let listener = TcpListener::bind("127.0.0.1:34254").await?;
228    /// #   }
229    /// #   let listener = TcpListener::bind("127.0.0.1:0").await?;
230    /// #   let addr = listener.local_addr().unwrap();
231    /// #   let handle = tokio::spawn(async move {
232    /// #       let mut stream: TcpStream = TcpStream::connect(addr).await.unwrap();
233    /// #       stream.write_all(b"Hello world!").await.unwrap();
234    /// #   });
235    ///     let (tokio_tcp_stream, _) = listener.accept().await?;
236    ///     let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
237    /// #   handle.await.expect("The task being joined has panicked");
238    ///     std_tcp_stream.set_nonblocking(false)?;
239    ///     std_tcp_stream.read_exact(&mut data)?;
240    /// #   assert_eq!(b"Hello world!", &data);
241    ///     Ok(())
242    /// }
243    /// ```
244    /// [`tokio::net::TcpStream`]: TcpStream
245    /// [`std::net::TcpStream`]: std::net::TcpStream
246    /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
247    pub fn into_std(self) -> io::Result<std::net::TcpStream> {
248        #[cfg(unix)]
249        {
250            use std::os::unix::io::{FromRawFd, IntoRawFd};
251            self.io
252                .into_inner()
253                .map(IntoRawFd::into_raw_fd)
254                .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
255        }
256
257        #[cfg(windows)]
258        {
259            use std::os::windows::io::{FromRawSocket, IntoRawSocket};
260            self.io
261                .into_inner()
262                .map(|io| io.into_raw_socket())
263                .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
264        }
265
266        #[cfg(target_os = "wasi")]
267        {
268            use std::os::wasi::io::{FromRawFd, IntoRawFd};
269            self.io
270                .into_inner()
271                .map(|io| io.into_raw_fd())
272                .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
273        }
274    }
275
276    /// Returns the local address that this stream is bound to.
277    ///
278    /// # Examples
279    ///
280    /// ```no_run
281    /// use tokio::net::TcpStream;
282    ///
283    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
284    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
285    ///
286    /// println!("{:?}", stream.local_addr()?);
287    /// # Ok(())
288    /// # }
289    /// ```
290    pub fn local_addr(&self) -> io::Result<SocketAddr> {
291        self.io.local_addr()
292    }
293
294    /// Returns the value of the `SO_ERROR` option.
295    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
296        self.io.take_error()
297    }
298
299    /// Returns the remote address that this stream is connected to.
300    ///
301    /// # Examples
302    ///
303    /// ```no_run
304    /// use tokio::net::TcpStream;
305    ///
306    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
307    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
308    ///
309    /// println!("{:?}", stream.peer_addr()?);
310    /// # Ok(())
311    /// # }
312    /// ```
313    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
314        self.io.peer_addr()
315    }
316
317    /// Attempts to receive data on the socket, without removing that data from
318    /// the queue, registering the current task for wakeup if data is not yet
319    /// available.
320    ///
321    /// Note that on multiple calls to `poll_peek`, `poll_read` or
322    /// `poll_read_ready`, only the `Waker` from the `Context` passed to the
323    /// most recent call is scheduled to receive a wakeup. (However,
324    /// `poll_write` retains a second, independent waker.)
325    ///
326    /// # Return value
327    ///
328    /// The function returns:
329    ///
330    /// * `Poll::Pending` if data is not yet available.
331    /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
332    /// * `Poll::Ready(Err(e))` if an error is encountered.
333    ///
334    /// # Errors
335    ///
336    /// This function may encounter any standard I/O error except `WouldBlock`.
337    ///
338    /// # Examples
339    ///
340    /// ```no_run
341    /// use tokio::io::{self, ReadBuf};
342    /// use tokio::net::TcpStream;
343    ///
344    /// use std::future::poll_fn;
345    ///
346    /// #[tokio::main]
347    /// async fn main() -> io::Result<()> {
348    ///     let stream = TcpStream::connect("127.0.0.1:8000").await?;
349    ///     let mut buf = [0; 10];
350    ///     let mut buf = ReadBuf::new(&mut buf);
351    ///
352    ///     poll_fn(|cx| {
353    ///         stream.poll_peek(cx, &mut buf)
354    ///     }).await?;
355    ///
356    ///     Ok(())
357    /// }
358    /// ```
359    pub fn poll_peek(
360        &self,
361        cx: &mut Context<'_>,
362        buf: &mut ReadBuf<'_>,
363    ) -> Poll<io::Result<usize>> {
364        loop {
365            let ev = ready!(self.io.registration().poll_read_ready(cx))?;
366
367            let b = unsafe {
368                &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
369            };
370
371            match self.io.peek(b) {
372                Ok(ret) => {
373                    unsafe { buf.assume_init(ret) };
374                    buf.advance(ret);
375                    return Poll::Ready(Ok(ret));
376                }
377                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
378                    self.io.registration().clear_readiness(ev);
379                }
380                Err(e) => return Poll::Ready(Err(e)),
381            }
382        }
383    }
384
385    /// Waits for any of the requested ready states.
386    ///
387    /// This function is usually paired with `try_read()` or `try_write()`. It
388    /// can be used to concurrently read / write to the same socket on a single
389    /// task without splitting the socket.
390    ///
391    /// The function may complete without the socket being ready. This is a
392    /// false-positive and attempting an operation will return with
393    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
394    /// [`Ready`] set, so you should always check the returned value and possibly
395    /// wait again if the requested states are not set.
396    ///
397    /// # Cancel safety
398    ///
399    /// This method is cancel safe. Once a readiness event occurs, the method
400    /// will continue to return immediately until the readiness event is
401    /// consumed by an attempt to read or write that fails with `WouldBlock` or
402    /// `Poll::Pending`.
403    ///
404    /// # Examples
405    ///
406    /// Concurrently read and write to the stream on the same task without
407    /// splitting.
408    ///
409    /// ```no_run
410    /// use tokio::io::Interest;
411    /// use tokio::net::TcpStream;
412    /// use std::error::Error;
413    /// use std::io;
414    ///
415    /// #[tokio::main]
416    /// async fn main() -> Result<(), Box<dyn Error>> {
417    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
418    ///
419    ///     loop {
420    ///         let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
421    ///
422    ///         if ready.is_readable() {
423    ///             let mut data = vec![0; 1024];
424    ///             // Try to read data, this may still fail with `WouldBlock`
425    ///             // if the readiness event is a false positive.
426    ///             match stream.try_read(&mut data) {
427    ///                 Ok(n) => {
428    ///                     println!("read {} bytes", n);
429    ///                 }
430    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
431    ///                     continue;
432    ///                 }
433    ///                 Err(e) => {
434    ///                     return Err(e.into());
435    ///                 }
436    ///             }
437    ///
438    ///         }
439    ///
440    ///         if ready.is_writable() {
441    ///             // Try to write data, this may still fail with `WouldBlock`
442    ///             // if the readiness event is a false positive.
443    ///             match stream.try_write(b"hello world") {
444    ///                 Ok(n) => {
445    ///                     println!("write {} bytes", n);
446    ///                 }
447    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
448    ///                     continue
449    ///                 }
450    ///                 Err(e) => {
451    ///                     return Err(e.into());
452    ///                 }
453    ///             }
454    ///         }
455    ///     }
456    /// }
457    /// ```
458    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
459        let event = self.io.registration().readiness(interest).await?;
460        Ok(event.ready)
461    }
462
463    /// Waits for the socket to become readable.
464    ///
465    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
466    /// paired with `try_read()`.
467    ///
468    /// # Cancel safety
469    ///
470    /// This method is cancel safe. Once a readiness event occurs, the method
471    /// will continue to return immediately until the readiness event is
472    /// consumed by an attempt to read that fails with `WouldBlock` or
473    /// `Poll::Pending`.
474    ///
475    /// # Examples
476    ///
477    /// ```no_run
478    /// use tokio::net::TcpStream;
479    /// use std::error::Error;
480    /// use std::io;
481    ///
482    /// #[tokio::main]
483    /// async fn main() -> Result<(), Box<dyn Error>> {
484    ///     // Connect to a peer
485    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
486    ///
487    ///     let mut msg = vec![0; 1024];
488    ///
489    ///     loop {
490    ///         // Wait for the socket to be readable
491    ///         stream.readable().await?;
492    ///
493    ///         // Try to read data, this may still fail with `WouldBlock`
494    ///         // if the readiness event is a false positive.
495    ///         match stream.try_read(&mut msg) {
496    ///             Ok(n) => {
497    ///                 msg.truncate(n);
498    ///                 break;
499    ///             }
500    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
501    ///                 continue;
502    ///             }
503    ///             Err(e) => {
504    ///                 return Err(e.into());
505    ///             }
506    ///         }
507    ///     }
508    ///
509    ///     println!("GOT = {:?}", msg);
510    ///     Ok(())
511    /// }
512    /// ```
513    pub async fn readable(&self) -> io::Result<()> {
514        self.ready(Interest::READABLE).await?;
515        Ok(())
516    }
517
518    /// Polls for read readiness.
519    ///
520    /// If the tcp stream is not currently ready for reading, this method will
521    /// store a clone of the `Waker` from the provided `Context`. When the tcp
522    /// stream becomes ready for reading, `Waker::wake` will be called on the
523    /// waker.
524    ///
525    /// Note that on multiple calls to `poll_read_ready`, `poll_read` or
526    /// `poll_peek`, only the `Waker` from the `Context` passed to the most
527    /// recent call is scheduled to receive a wakeup. (However,
528    /// `poll_write_ready` retains a second, independent waker.)
529    ///
530    /// This function is intended for cases where creating and pinning a future
531    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
532    /// preferred, as this supports polling from multiple tasks at once.
533    ///
534    /// # Return value
535    ///
536    /// The function returns:
537    ///
538    /// * `Poll::Pending` if the tcp stream is not ready for reading.
539    /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading.
540    /// * `Poll::Ready(Err(e))` if an error is encountered.
541    ///
542    /// # Errors
543    ///
544    /// This function may encounter any standard I/O error except `WouldBlock`.
545    ///
546    /// [`readable`]: method@Self::readable
547    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
548        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
549    }
550
551    /// Tries to read data from the stream into the provided buffer, returning how
552    /// many bytes were read.
553    ///
554    /// Receives any pending data from the socket but does not wait for new data
555    /// to arrive. On success, returns the number of bytes read. Because
556    /// `try_read()` is non-blocking, the buffer does not have to be stored by
557    /// the async task and can exist entirely on the stack.
558    ///
559    /// Usually, [`readable()`] or [`ready()`] is used with this function.
560    ///
561    /// [`readable()`]: TcpStream::readable()
562    /// [`ready()`]: TcpStream::ready()
563    ///
564    /// # Return
565    ///
566    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
567    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
568    ///
569    /// 1. The stream's read half is closed and will no longer yield data.
570    /// 2. The specified buffer was 0 bytes in length.
571    ///
572    /// If the stream is not ready to read data,
573    /// `Err(io::ErrorKind::WouldBlock)` is returned.
574    ///
575    /// # Examples
576    ///
577    /// ```no_run
578    /// use tokio::net::TcpStream;
579    /// use std::error::Error;
580    /// use std::io;
581    ///
582    /// #[tokio::main]
583    /// async fn main() -> Result<(), Box<dyn Error>> {
584    ///     // Connect to a peer
585    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
586    ///
587    ///     loop {
588    ///         // Wait for the socket to be readable
589    ///         stream.readable().await?;
590    ///
591    ///         // Creating the buffer **after** the `await` prevents it from
592    ///         // being stored in the async task.
593    ///         let mut buf = [0; 4096];
594    ///
595    ///         // Try to read data, this may still fail with `WouldBlock`
596    ///         // if the readiness event is a false positive.
597    ///         match stream.try_read(&mut buf) {
598    ///             Ok(0) => break,
599    ///             Ok(n) => {
600    ///                 println!("read {} bytes", n);
601    ///             }
602    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
603    ///                 continue;
604    ///             }
605    ///             Err(e) => {
606    ///                 return Err(e.into());
607    ///             }
608    ///         }
609    ///     }
610    ///
611    ///     Ok(())
612    /// }
613    /// ```
614    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
615        use std::io::Read;
616
617        self.io
618            .registration()
619            .try_io(Interest::READABLE, || (&*self.io).read(buf))
620    }
621
622    /// Tries to read data from the stream into the provided buffers, returning
623    /// how many bytes were read.
624    ///
625    /// Data is copied to fill each buffer in order, with the final buffer
626    /// written to possibly being only partially filled. This method behaves
627    /// equivalently to a single call to [`try_read()`] with concatenated
628    /// buffers.
629    ///
630    /// Receives any pending data from the socket but does not wait for new data
631    /// to arrive. On success, returns the number of bytes read. Because
632    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
633    /// stored by the async task and can exist entirely on the stack.
634    ///
635    /// Usually, [`readable()`] or [`ready()`] is used with this function.
636    ///
637    /// [`try_read()`]: TcpStream::try_read()
638    /// [`readable()`]: TcpStream::readable()
639    /// [`ready()`]: TcpStream::ready()
640    ///
641    /// # Return
642    ///
643    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
644    /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
645    /// and will no longer yield data. If the stream is not ready to read data
646    /// `Err(io::ErrorKind::WouldBlock)` is returned.
647    ///
648    /// # Examples
649    ///
650    /// ```no_run
651    /// use tokio::net::TcpStream;
652    /// use std::error::Error;
653    /// use std::io::{self, IoSliceMut};
654    ///
655    /// #[tokio::main]
656    /// async fn main() -> Result<(), Box<dyn Error>> {
657    ///     // Connect to a peer
658    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
659    ///
660    ///     loop {
661    ///         // Wait for the socket to be readable
662    ///         stream.readable().await?;
663    ///
664    ///         // Creating the buffer **after** the `await` prevents it from
665    ///         // being stored in the async task.
666    ///         let mut buf_a = [0; 512];
667    ///         let mut buf_b = [0; 1024];
668    ///         let mut bufs = [
669    ///             IoSliceMut::new(&mut buf_a),
670    ///             IoSliceMut::new(&mut buf_b),
671    ///         ];
672    ///
673    ///         // Try to read data, this may still fail with `WouldBlock`
674    ///         // if the readiness event is a false positive.
675    ///         match stream.try_read_vectored(&mut bufs) {
676    ///             Ok(0) => break,
677    ///             Ok(n) => {
678    ///                 println!("read {} bytes", n);
679    ///             }
680    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
681    ///                 continue;
682    ///             }
683    ///             Err(e) => {
684    ///                 return Err(e.into());
685    ///             }
686    ///         }
687    ///     }
688    ///
689    ///     Ok(())
690    /// }
691    /// ```
692    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
693        use std::io::Read;
694
695        self.io
696            .registration()
697            .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
698    }
699
700    cfg_io_util! {
701        /// Tries to read data from the stream into the provided buffer, advancing the
702        /// buffer's internal cursor, returning how many bytes were read.
703        ///
704        /// Receives any pending data from the socket but does not wait for new data
705        /// to arrive. On success, returns the number of bytes read. Because
706        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
707        /// the async task and can exist entirely on the stack.
708        ///
709        /// Usually, [`readable()`] or [`ready()`] is used with this function.
710        ///
711        /// [`readable()`]: TcpStream::readable()
712        /// [`ready()`]: TcpStream::ready()
713        ///
714        /// # Return
715        ///
716        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
717        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
718        /// and will no longer yield data. If the stream is not ready to read data
719        /// `Err(io::ErrorKind::WouldBlock)` is returned.
720        ///
721        /// # Examples
722        ///
723        /// ```no_run
724        /// use tokio::net::TcpStream;
725        /// use std::error::Error;
726        /// use std::io;
727        ///
728        /// #[tokio::main]
729        /// async fn main() -> Result<(), Box<dyn Error>> {
730        ///     // Connect to a peer
731        ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
732        ///
733        ///     loop {
734        ///         // Wait for the socket to be readable
735        ///         stream.readable().await?;
736        ///
737        ///         let mut buf = Vec::with_capacity(4096);
738        ///
739        ///         // Try to read data, this may still fail with `WouldBlock`
740        ///         // if the readiness event is a false positive.
741        ///         match stream.try_read_buf(&mut buf) {
742        ///             Ok(0) => break,
743        ///             Ok(n) => {
744        ///                 println!("read {} bytes", n);
745        ///             }
746        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
747        ///                 continue;
748        ///             }
749        ///             Err(e) => {
750        ///                 return Err(e.into());
751        ///             }
752        ///         }
753        ///     }
754        ///
755        ///     Ok(())
756        /// }
757        /// ```
758        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
759            self.io.registration().try_io(Interest::READABLE, || {
760                use std::io::Read;
761
762                let dst = buf.chunk_mut();
763                let dst =
764                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
765
766                // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
767                // buffer.
768                let n = (&*self.io).read(dst)?;
769
770                unsafe {
771                    buf.advance_mut(n);
772                }
773
774                Ok(n)
775            })
776        }
777    }
778
779    /// Waits for the socket to become writable.
780    ///
781    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
782    /// paired with `try_write()`.
783    ///
784    /// # Cancel safety
785    ///
786    /// This method is cancel safe. Once a readiness event occurs, the method
787    /// will continue to return immediately until the readiness event is
788    /// consumed by an attempt to write that fails with `WouldBlock` or
789    /// `Poll::Pending`.
790    ///
791    /// # Examples
792    ///
793    /// ```no_run
794    /// use tokio::net::TcpStream;
795    /// use std::error::Error;
796    /// use std::io;
797    ///
798    /// #[tokio::main]
799    /// async fn main() -> Result<(), Box<dyn Error>> {
800    ///     // Connect to a peer
801    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
802    ///
803    ///     loop {
804    ///         // Wait for the socket to be writable
805    ///         stream.writable().await?;
806    ///
807    ///         // Try to write data, this may still fail with `WouldBlock`
808    ///         // if the readiness event is a false positive.
809    ///         match stream.try_write(b"hello world") {
810    ///             Ok(n) => {
811    ///                 break;
812    ///             }
813    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
814    ///                 continue;
815    ///             }
816    ///             Err(e) => {
817    ///                 return Err(e.into());
818    ///             }
819    ///         }
820    ///     }
821    ///
822    ///     Ok(())
823    /// }
824    /// ```
825    pub async fn writable(&self) -> io::Result<()> {
826        self.ready(Interest::WRITABLE).await?;
827        Ok(())
828    }
829
830    /// Polls for write readiness.
831    ///
832    /// If the tcp stream is not currently ready for writing, this method will
833    /// store a clone of the `Waker` from the provided `Context`. When the tcp
834    /// stream becomes ready for writing, `Waker::wake` will be called on the
835    /// waker.
836    ///
837    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
838    /// the `Waker` from the `Context` passed to the most recent call is
839    /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
840    /// second, independent waker.)
841    ///
842    /// This function is intended for cases where creating and pinning a future
843    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
844    /// preferred, as this supports polling from multiple tasks at once.
845    ///
846    /// # Return value
847    ///
848    /// The function returns:
849    ///
850    /// * `Poll::Pending` if the tcp stream is not ready for writing.
851    /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing.
852    /// * `Poll::Ready(Err(e))` if an error is encountered.
853    ///
854    /// # Errors
855    ///
856    /// This function may encounter any standard I/O error except `WouldBlock`.
857    ///
858    /// [`writable`]: method@Self::writable
859    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
860        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
861    }
862
863    /// Try to write a buffer to the stream, returning how many bytes were
864    /// written.
865    ///
866    /// The function will attempt to write the entire contents of `buf`, but
867    /// only part of the buffer may be written.
868    ///
869    /// This function is usually paired with `writable()`.
870    ///
871    /// # Return
872    ///
873    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
874    /// number of bytes written. If the stream is not ready to write data,
875    /// `Err(io::ErrorKind::WouldBlock)` is returned.
876    ///
877    /// # Examples
878    ///
879    /// ```no_run
880    /// use tokio::net::TcpStream;
881    /// use std::error::Error;
882    /// use std::io;
883    ///
884    /// #[tokio::main]
885    /// async fn main() -> Result<(), Box<dyn Error>> {
886    ///     // Connect to a peer
887    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
888    ///
889    ///     loop {
890    ///         // Wait for the socket to be writable
891    ///         stream.writable().await?;
892    ///
893    ///         // Try to write data, this may still fail with `WouldBlock`
894    ///         // if the readiness event is a false positive.
895    ///         match stream.try_write(b"hello world") {
896    ///             Ok(n) => {
897    ///                 break;
898    ///             }
899    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
900    ///                 continue;
901    ///             }
902    ///             Err(e) => {
903    ///                 return Err(e.into());
904    ///             }
905    ///         }
906    ///     }
907    ///
908    ///     Ok(())
909    /// }
910    /// ```
911    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
912        use std::io::Write;
913
914        self.io
915            .registration()
916            .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
917    }
918
919    /// Tries to write several buffers to the stream, returning how many bytes
920    /// were written.
921    ///
922    /// Data is written from each buffer in order, with the final buffer read
923    /// from possibly being only partially consumed. This method behaves
924    /// equivalently to a single call to [`try_write()`] with concatenated
925    /// buffers.
926    ///
927    /// This function is usually paired with `writable()`.
928    ///
929    /// [`try_write()`]: TcpStream::try_write()
930    ///
931    /// # Return
932    ///
933    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
934    /// number of bytes written. If the stream is not ready to write data,
935    /// `Err(io::ErrorKind::WouldBlock)` is returned.
936    ///
937    /// # Examples
938    ///
939    /// ```no_run
940    /// use tokio::net::TcpStream;
941    /// use std::error::Error;
942    /// use std::io;
943    ///
944    /// #[tokio::main]
945    /// async fn main() -> Result<(), Box<dyn Error>> {
946    ///     // Connect to a peer
947    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
948    ///
949    ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
950    ///
951    ///     loop {
952    ///         // Wait for the socket to be writable
953    ///         stream.writable().await?;
954    ///
955    ///         // Try to write data, this may still fail with `WouldBlock`
956    ///         // if the readiness event is a false positive.
957    ///         match stream.try_write_vectored(&bufs) {
958    ///             Ok(n) => {
959    ///                 break;
960    ///             }
961    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
962    ///                 continue;
963    ///             }
964    ///             Err(e) => {
965    ///                 return Err(e.into());
966    ///             }
967    ///         }
968    ///     }
969    ///
970    ///     Ok(())
971    /// }
972    /// ```
973    pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
974        use std::io::Write;
975
976        self.io
977            .registration()
978            .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
979    }
980
981    /// Tries to read or write from the socket using a user-provided IO operation.
982    ///
983    /// If the socket is ready, the provided closure is called. The closure
984    /// should attempt to perform IO operation on the socket by manually
985    /// calling the appropriate syscall. If the operation fails because the
986    /// socket is not actually ready, then the closure should return a
987    /// `WouldBlock` error and the readiness flag is cleared. The return value
988    /// of the closure is then returned by `try_io`.
989    ///
990    /// If the socket is not ready, then the closure is not called
991    /// and a `WouldBlock` error is returned.
992    ///
993    /// The closure should only return a `WouldBlock` error if it has performed
994    /// an IO operation on the socket that failed due to the socket not being
995    /// ready. Returning a `WouldBlock` error in any other situation will
996    /// incorrectly clear the readiness flag, which can cause the socket to
997    /// behave incorrectly.
998    ///
999    /// The closure should not perform the IO operation using any of the methods
1000    /// defined on the Tokio `TcpStream` type, as this will mess with the
1001    /// readiness flag and can cause the socket to behave incorrectly.
1002    ///
1003    /// This method is not intended to be used with combined interests.
1004    /// The closure should perform only one type of IO operation, so it should not
1005    /// require more than one ready state. This method may panic or sleep forever
1006    /// if it is called with a combined interest.
1007    ///
1008    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1009    ///
1010    /// [`readable()`]: TcpStream::readable()
1011    /// [`writable()`]: TcpStream::writable()
1012    /// [`ready()`]: TcpStream::ready()
1013    pub fn try_io<R>(
1014        &self,
1015        interest: Interest,
1016        f: impl FnOnce() -> io::Result<R>,
1017    ) -> io::Result<R> {
1018        self.io
1019            .registration()
1020            .try_io(interest, || self.io.try_io(f))
1021    }
1022
1023    /// Reads or writes from the socket using a user-provided IO operation.
1024    ///
1025    /// The readiness of the socket is awaited and when the socket is ready,
1026    /// the provided closure is called. The closure should attempt to perform
1027    /// IO operation on the socket by manually calling the appropriate syscall.
1028    /// If the operation fails because the socket is not actually ready,
1029    /// then the closure should return a `WouldBlock` error. In such case the
1030    /// readiness flag is cleared and the socket readiness is awaited again.
1031    /// This loop is repeated until the closure returns an `Ok` or an error
1032    /// other than `WouldBlock`.
1033    ///
1034    /// The closure should only return a `WouldBlock` error if it has performed
1035    /// an IO operation on the socket that failed due to the socket not being
1036    /// ready. Returning a `WouldBlock` error in any other situation will
1037    /// incorrectly clear the readiness flag, which can cause the socket to
1038    /// behave incorrectly.
1039    ///
1040    /// The closure should not perform the IO operation using any of the methods
1041    /// defined on the Tokio `TcpStream` type, as this will mess with the
1042    /// readiness flag and can cause the socket to behave incorrectly.
1043    ///
1044    /// This method is not intended to be used with combined interests.
1045    /// The closure should perform only one type of IO operation, so it should not
1046    /// require more than one ready state. This method may panic or sleep forever
1047    /// if it is called with a combined interest.
1048    pub async fn async_io<R>(
1049        &self,
1050        interest: Interest,
1051        mut f: impl FnMut() -> io::Result<R>,
1052    ) -> io::Result<R> {
1053        self.io
1054            .registration()
1055            .async_io(interest, || self.io.try_io(&mut f))
1056            .await
1057    }
1058
1059    /// Receives data on the socket from the remote address to which it is
1060    /// connected, without removing that data from the queue. On success,
1061    /// returns the number of bytes peeked.
1062    ///
1063    /// Successive calls return the same data. This is accomplished by passing
1064    /// `MSG_PEEK` as a flag to the underlying `recv` system call.
1065    ///
1066    /// # Examples
1067    ///
1068    /// ```no_run
1069    /// use tokio::net::TcpStream;
1070    /// use tokio::io::AsyncReadExt;
1071    /// use std::error::Error;
1072    ///
1073    /// #[tokio::main]
1074    /// async fn main() -> Result<(), Box<dyn Error>> {
1075    ///     // Connect to a peer
1076    ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
1077    ///
1078    ///     let mut b1 = [0; 10];
1079    ///     let mut b2 = [0; 10];
1080    ///
1081    ///     // Peek at the data
1082    ///     let n = stream.peek(&mut b1).await?;
1083    ///
1084    ///     // Read the data
1085    ///     assert_eq!(n, stream.read(&mut b2[..n]).await?);
1086    ///     assert_eq!(&b1[..n], &b2[..n]);
1087    ///
1088    ///     Ok(())
1089    /// }
1090    /// ```
1091    ///
1092    /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
1093    ///
1094    /// [`read`]: fn@crate::io::AsyncReadExt::read
1095    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
1096    pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1097        self.io
1098            .registration()
1099            .async_io(Interest::READABLE, || self.io.peek(buf))
1100            .await
1101    }
1102
1103    /// Shuts down the read, write, or both halves of this connection.
1104    ///
1105    /// This function will cause all pending and future I/O on the specified
1106    /// portions to return immediately with an appropriate value (see the
1107    /// documentation of `Shutdown`).
1108    pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1109        self.io.shutdown(how)
1110    }
1111
1112    /// Gets the value of the `TCP_NODELAY` option on this socket.
1113    ///
1114    /// For more information about this option, see [`set_nodelay`].
1115    ///
1116    /// [`set_nodelay`]: TcpStream::set_nodelay
1117    ///
1118    /// # Examples
1119    ///
1120    /// ```no_run
1121    /// use tokio::net::TcpStream;
1122    ///
1123    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1124    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1125    ///
1126    /// println!("{:?}", stream.nodelay()?);
1127    /// # Ok(())
1128    /// # }
1129    /// ```
1130    pub fn nodelay(&self) -> io::Result<bool> {
1131        self.io.nodelay()
1132    }
1133
1134    /// Sets the value of the `TCP_NODELAY` option on this socket.
1135    ///
1136    /// If set, this option disables the Nagle algorithm. This means that
1137    /// segments are always sent as soon as possible, even if there is only a
1138    /// small amount of data. When not set, data is buffered until there is a
1139    /// sufficient amount to send out, thereby avoiding the frequent sending of
1140    /// small packets.
1141    ///
1142    /// # Examples
1143    ///
1144    /// ```no_run
1145    /// use tokio::net::TcpStream;
1146    ///
1147    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1148    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1149    ///
1150    /// stream.set_nodelay(true)?;
1151    /// # Ok(())
1152    /// # }
1153    /// ```
1154    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
1155        self.io.set_nodelay(nodelay)
1156    }
1157
1158    cfg_not_wasi! {
1159        /// Reads the linger duration for this socket by getting the `SO_LINGER`
1160        /// option.
1161        ///
1162        /// For more information about this option, see [`set_linger`].
1163        ///
1164        /// [`set_linger`]: TcpStream::set_linger
1165        ///
1166        /// # Examples
1167        ///
1168        /// ```no_run
1169        /// use tokio::net::TcpStream;
1170        ///
1171        /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1172        /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1173        ///
1174        /// println!("{:?}", stream.linger()?);
1175        /// # Ok(())
1176        /// # }
1177        /// ```
1178        pub fn linger(&self) -> io::Result<Option<Duration>> {
1179            socket2::SockRef::from(self).linger()
1180        }
1181
1182        /// Sets the linger duration of this socket by setting the `SO_LINGER` option.
1183        ///
1184        /// This option controls the action taken when a stream has unsent messages and the stream is
1185        /// closed. If `SO_LINGER` is set, the system shall block the process until it can transmit the
1186        /// data or until the time expires.
1187        ///
1188        /// If `SO_LINGER` is not specified, and the stream is closed, the system handles the call in a
1189        /// way that allows the process to continue as quickly as possible.
1190        ///
1191        /// # Examples
1192        ///
1193        /// ```no_run
1194        /// use tokio::net::TcpStream;
1195        ///
1196        /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1197        /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1198        ///
1199        /// stream.set_linger(None)?;
1200        /// # Ok(())
1201        /// # }
1202        /// ```
1203        pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
1204            socket2::SockRef::from(self).set_linger(dur)
1205        }
1206    }
1207
1208    /// Gets the value of the `IP_TTL` option for this socket.
1209    ///
1210    /// For more information about this option, see [`set_ttl`].
1211    ///
1212    /// [`set_ttl`]: TcpStream::set_ttl
1213    ///
1214    /// # Examples
1215    ///
1216    /// ```no_run
1217    /// use tokio::net::TcpStream;
1218    ///
1219    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1220    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1221    ///
1222    /// println!("{:?}", stream.ttl()?);
1223    /// # Ok(())
1224    /// # }
1225    /// ```
1226    pub fn ttl(&self) -> io::Result<u32> {
1227        self.io.ttl()
1228    }
1229
1230    /// Sets the value for the `IP_TTL` option on this socket.
1231    ///
1232    /// This value sets the time-to-live field that is used in every packet sent
1233    /// from this socket.
1234    ///
1235    /// # Examples
1236    ///
1237    /// ```no_run
1238    /// use tokio::net::TcpStream;
1239    ///
1240    /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1241    /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1242    ///
1243    /// stream.set_ttl(123)?;
1244    /// # Ok(())
1245    /// # }
1246    /// ```
1247    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1248        self.io.set_ttl(ttl)
1249    }
1250
1251    // These lifetime markers also appear in the generated documentation, and make
1252    // it more clear that this is a *borrowed* split.
1253    #[allow(clippy::needless_lifetimes)]
1254    /// Splits a `TcpStream` into a read half and a write half, which can be used
1255    /// to read and write the stream concurrently.
1256    ///
1257    /// This method is more efficient than [`into_split`], but the halves cannot be
1258    /// moved into independently spawned tasks.
1259    ///
1260    /// [`into_split`]: TcpStream::into_split()
1261    pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
1262        split(self)
1263    }
1264
1265    /// Splits a `TcpStream` into a read half and a write half, which can be used
1266    /// to read and write the stream concurrently.
1267    ///
1268    /// Unlike [`split`], the owned halves can be moved to separate tasks, however
1269    /// this comes at the cost of a heap allocation.
1270    ///
1271    /// **Note:** Dropping the write half will shut down the write half of the TCP
1272    /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.
1273    ///
1274    /// [`split`]: TcpStream::split()
1275    /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
1276    pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
1277        split_owned(self)
1278    }
1279
1280    // == Poll IO functions that takes `&self` ==
1281    //
1282    // To read or write without mutable access to the `TcpStream`, combine the
1283    // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1284    // `try_write` methods.
1285
1286    pub(crate) fn poll_read_priv(
1287        &self,
1288        cx: &mut Context<'_>,
1289        buf: &mut ReadBuf<'_>,
1290    ) -> Poll<io::Result<()>> {
1291        // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
1292        unsafe { self.io.poll_read(cx, buf) }
1293    }
1294
1295    pub(super) fn poll_write_priv(
1296        &self,
1297        cx: &mut Context<'_>,
1298        buf: &[u8],
1299    ) -> Poll<io::Result<usize>> {
1300        self.io.poll_write(cx, buf)
1301    }
1302
1303    pub(super) fn poll_write_vectored_priv(
1304        &self,
1305        cx: &mut Context<'_>,
1306        bufs: &[io::IoSlice<'_>],
1307    ) -> Poll<io::Result<usize>> {
1308        self.io.poll_write_vectored(cx, bufs)
1309    }
1310}
1311
1312impl TryFrom<std::net::TcpStream> for TcpStream {
1313    type Error = io::Error;
1314
1315    /// Consumes stream, returning the tokio I/O object.
1316    ///
1317    /// This is equivalent to
1318    /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
1319    fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
1320        Self::from_std(stream)
1321    }
1322}
1323
1324// ===== impl Read / Write =====
1325
1326impl AsyncRead for TcpStream {
1327    fn poll_read(
1328        self: Pin<&mut Self>,
1329        cx: &mut Context<'_>,
1330        buf: &mut ReadBuf<'_>,
1331    ) -> Poll<io::Result<()>> {
1332        self.poll_read_priv(cx, buf)
1333    }
1334}
1335
1336impl AsyncWrite for TcpStream {
1337    fn poll_write(
1338        self: Pin<&mut Self>,
1339        cx: &mut Context<'_>,
1340        buf: &[u8],
1341    ) -> Poll<io::Result<usize>> {
1342        self.poll_write_priv(cx, buf)
1343    }
1344
1345    fn poll_write_vectored(
1346        self: Pin<&mut Self>,
1347        cx: &mut Context<'_>,
1348        bufs: &[io::IoSlice<'_>],
1349    ) -> Poll<io::Result<usize>> {
1350        self.poll_write_vectored_priv(cx, bufs)
1351    }
1352
1353    fn is_write_vectored(&self) -> bool {
1354        true
1355    }
1356
1357    #[inline]
1358    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1359        // tcp flush is a no-op
1360        Poll::Ready(Ok(()))
1361    }
1362
1363    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1364        self.shutdown_std(std::net::Shutdown::Write)?;
1365        Poll::Ready(Ok(()))
1366    }
1367}
1368
1369impl fmt::Debug for TcpStream {
1370    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1371        self.io.fmt(f)
1372    }
1373}
1374
1375#[cfg(unix)]
1376mod sys {
1377    use super::TcpStream;
1378    use std::os::unix::prelude::*;
1379
1380    impl AsRawFd for TcpStream {
1381        fn as_raw_fd(&self) -> RawFd {
1382            self.io.as_raw_fd()
1383        }
1384    }
1385
1386    impl AsFd for TcpStream {
1387        fn as_fd(&self) -> BorrowedFd<'_> {
1388            unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1389        }
1390    }
1391}
1392
1393cfg_windows! {
1394    use crate::os::windows::io::{AsRawSocket, RawSocket, AsSocket, BorrowedSocket};
1395
1396    impl AsRawSocket for TcpStream {
1397        fn as_raw_socket(&self) -> RawSocket {
1398            self.io.as_raw_socket()
1399        }
1400    }
1401
1402    impl AsSocket for TcpStream {
1403        fn as_socket(&self) -> BorrowedSocket<'_> {
1404            unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
1405        }
1406    }
1407}
1408
1409#[cfg(all(tokio_unstable, target_os = "wasi"))]
1410mod sys {
1411    use super::TcpStream;
1412    use std::os::wasi::prelude::*;
1413
1414    impl AsRawFd for TcpStream {
1415        fn as_raw_fd(&self) -> RawFd {
1416            self.io.as_raw_fd()
1417        }
1418    }
1419
1420    impl AsFd for TcpStream {
1421        fn as_fd(&self) -> BorrowedFd<'_> {
1422            unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1423        }
1424    }
1425}