tokio/net/tcp/stream.rs
1cfg_not_wasi! {
2 use crate::net::{to_socket_addrs, ToSocketAddrs};
3 use std::future::poll_fn;
4 use std::time::Duration;
5}
6
7use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
8use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
9use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
10
11use std::fmt;
12use std::io;
13use std::net::{Shutdown, SocketAddr};
14use std::pin::Pin;
15use std::task::{ready, Context, Poll};
16
17cfg_io_util! {
18 use bytes::BufMut;
19}
20
21cfg_net! {
22 /// A TCP stream between a local and a remote socket.
23 ///
24 /// A TCP stream can either be created by connecting to an endpoint, via the
25 /// [`connect`] method, or by [accepting] a connection from a [listener]. A
26 /// TCP stream can also be created via the [`TcpSocket`] type.
27 ///
28 /// Reading and writing to a `TcpStream` is usually done using the
29 /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
30 /// traits.
31 ///
32 /// [`connect`]: method@TcpStream::connect
33 /// [accepting]: method@crate::net::TcpListener::accept
34 /// [listener]: struct@crate::net::TcpListener
35 /// [`TcpSocket`]: struct@crate::net::TcpSocket
36 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
37 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
38 ///
39 /// # Examples
40 ///
41 /// ```no_run
42 /// use tokio::net::TcpStream;
43 /// use tokio::io::AsyncWriteExt;
44 /// use std::error::Error;
45 ///
46 /// #[tokio::main]
47 /// async fn main() -> Result<(), Box<dyn Error>> {
48 /// // Connect to a peer
49 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
50 ///
51 /// // Write some data.
52 /// stream.write_all(b"hello world!").await?;
53 ///
54 /// Ok(())
55 /// }
56 /// ```
57 ///
58 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
59 ///
60 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
61 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
62 ///
63 /// To shut down the stream in the write direction, you can call the
64 /// [`shutdown()`] method. This will cause the other peer to receive a read of
65 /// length 0, indicating that no more data will be sent. This only closes
66 /// the stream in one direction.
67 ///
68 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
69 pub struct TcpStream {
70 io: PollEvented<mio::net::TcpStream>,
71 }
72}
73
74impl TcpStream {
75 cfg_not_wasi! {
76 /// Opens a TCP connection to a remote host.
77 ///
78 /// `addr` is an address of the remote host. Anything which implements the
79 /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
80 /// yields multiple addresses, connect will be attempted with each of the
81 /// addresses until a connection is successful. If none of the addresses
82 /// result in a successful connection, the error returned from the last
83 /// connection attempt (the last address) is returned.
84 ///
85 /// To configure the socket before connecting, you can use the [`TcpSocket`]
86 /// type.
87 ///
88 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
89 /// [`TcpSocket`]: struct@crate::net::TcpSocket
90 ///
91 /// # Examples
92 ///
93 /// ```no_run
94 /// use tokio::net::TcpStream;
95 /// use tokio::io::AsyncWriteExt;
96 /// use std::error::Error;
97 ///
98 /// #[tokio::main]
99 /// async fn main() -> Result<(), Box<dyn Error>> {
100 /// // Connect to a peer
101 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
102 ///
103 /// // Write some data.
104 /// stream.write_all(b"hello world!").await?;
105 ///
106 /// Ok(())
107 /// }
108 /// ```
109 ///
110 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
111 ///
112 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
113 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
114 pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
115 let addrs = to_socket_addrs(addr).await?;
116
117 let mut last_err = None;
118
119 for addr in addrs {
120 match TcpStream::connect_addr(addr).await {
121 Ok(stream) => return Ok(stream),
122 Err(e) => last_err = Some(e),
123 }
124 }
125
126 Err(last_err.unwrap_or_else(|| {
127 io::Error::new(
128 io::ErrorKind::InvalidInput,
129 "could not resolve to any address",
130 )
131 }))
132 }
133
134 /// Establishes a connection to the specified `addr`.
135 async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
136 let sys = mio::net::TcpStream::connect(addr)?;
137 TcpStream::connect_mio(sys).await
138 }
139
140 pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
141 let stream = TcpStream::new(sys)?;
142
143 // Once we've connected, wait for the stream to be writable as
144 // that's when the actual connection has been initiated. Once we're
145 // writable we check for `take_socket_error` to see if the connect
146 // actually hit an error or not.
147 //
148 // If all that succeeded then we ship everything on up.
149 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
150
151 if let Some(e) = stream.io.take_error()? {
152 return Err(e);
153 }
154
155 Ok(stream)
156 }
157 }
158
159 pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
160 let io = PollEvented::new(connected)?;
161 Ok(TcpStream { io })
162 }
163
164 /// Creates new `TcpStream` from a `std::net::TcpStream`.
165 ///
166 /// This function is intended to be used to wrap a TCP stream from the
167 /// standard library in the Tokio equivalent.
168 ///
169 /// # Notes
170 ///
171 /// The caller is responsible for ensuring that the stream is in
172 /// non-blocking mode. Otherwise all I/O operations on the stream
173 /// will block the thread, which will cause unexpected behavior.
174 /// Non-blocking mode can be set using [`set_nonblocking`].
175 ///
176 /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
177 ///
178 /// # Examples
179 ///
180 /// ```rust,no_run
181 /// use std::error::Error;
182 /// use tokio::net::TcpStream;
183 ///
184 /// #[tokio::main]
185 /// async fn main() -> Result<(), Box<dyn Error>> {
186 /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
187 /// std_stream.set_nonblocking(true)?;
188 /// let stream = TcpStream::from_std(std_stream)?;
189 /// Ok(())
190 /// }
191 /// ```
192 ///
193 /// # Panics
194 ///
195 /// This function panics if it is not called from within a runtime with
196 /// IO enabled.
197 ///
198 /// The runtime is usually set implicitly when this function is called
199 /// from a future driven by a tokio runtime, otherwise runtime can be set
200 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
201 #[track_caller]
202 pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
203 let io = mio::net::TcpStream::from_std(stream);
204 let io = PollEvented::new(io)?;
205 Ok(TcpStream { io })
206 }
207
208 /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
209 ///
210 /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
211 /// Use [`set_nonblocking`] to change the blocking mode if needed.
212 ///
213 /// # Examples
214 ///
215 /// ```
216 /// # if cfg!(miri) { return } // No `socket` in miri.
217 /// use std::error::Error;
218 /// use std::io::Read;
219 /// use tokio::net::TcpListener;
220 /// # use tokio::net::TcpStream;
221 /// # use tokio::io::AsyncWriteExt;
222 ///
223 /// #[tokio::main]
224 /// async fn main() -> Result<(), Box<dyn Error>> {
225 /// let mut data = [0u8; 12];
226 /// # if false {
227 /// let listener = TcpListener::bind("127.0.0.1:34254").await?;
228 /// # }
229 /// # let listener = TcpListener::bind("127.0.0.1:0").await?;
230 /// # let addr = listener.local_addr().unwrap();
231 /// # let handle = tokio::spawn(async move {
232 /// # let mut stream: TcpStream = TcpStream::connect(addr).await.unwrap();
233 /// # stream.write_all(b"Hello world!").await.unwrap();
234 /// # });
235 /// let (tokio_tcp_stream, _) = listener.accept().await?;
236 /// let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
237 /// # handle.await.expect("The task being joined has panicked");
238 /// std_tcp_stream.set_nonblocking(false)?;
239 /// std_tcp_stream.read_exact(&mut data)?;
240 /// # assert_eq!(b"Hello world!", &data);
241 /// Ok(())
242 /// }
243 /// ```
244 /// [`tokio::net::TcpStream`]: TcpStream
245 /// [`std::net::TcpStream`]: std::net::TcpStream
246 /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
247 pub fn into_std(self) -> io::Result<std::net::TcpStream> {
248 #[cfg(unix)]
249 {
250 use std::os::unix::io::{FromRawFd, IntoRawFd};
251 self.io
252 .into_inner()
253 .map(IntoRawFd::into_raw_fd)
254 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
255 }
256
257 #[cfg(windows)]
258 {
259 use std::os::windows::io::{FromRawSocket, IntoRawSocket};
260 self.io
261 .into_inner()
262 .map(|io| io.into_raw_socket())
263 .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
264 }
265
266 #[cfg(target_os = "wasi")]
267 {
268 use std::os::wasi::io::{FromRawFd, IntoRawFd};
269 self.io
270 .into_inner()
271 .map(|io| io.into_raw_fd())
272 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
273 }
274 }
275
276 /// Returns the local address that this stream is bound to.
277 ///
278 /// # Examples
279 ///
280 /// ```no_run
281 /// use tokio::net::TcpStream;
282 ///
283 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
284 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
285 ///
286 /// println!("{:?}", stream.local_addr()?);
287 /// # Ok(())
288 /// # }
289 /// ```
290 pub fn local_addr(&self) -> io::Result<SocketAddr> {
291 self.io.local_addr()
292 }
293
294 /// Returns the value of the `SO_ERROR` option.
295 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
296 self.io.take_error()
297 }
298
299 /// Returns the remote address that this stream is connected to.
300 ///
301 /// # Examples
302 ///
303 /// ```no_run
304 /// use tokio::net::TcpStream;
305 ///
306 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
307 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
308 ///
309 /// println!("{:?}", stream.peer_addr()?);
310 /// # Ok(())
311 /// # }
312 /// ```
313 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
314 self.io.peer_addr()
315 }
316
317 /// Attempts to receive data on the socket, without removing that data from
318 /// the queue, registering the current task for wakeup if data is not yet
319 /// available.
320 ///
321 /// Note that on multiple calls to `poll_peek`, `poll_read` or
322 /// `poll_read_ready`, only the `Waker` from the `Context` passed to the
323 /// most recent call is scheduled to receive a wakeup. (However,
324 /// `poll_write` retains a second, independent waker.)
325 ///
326 /// # Return value
327 ///
328 /// The function returns:
329 ///
330 /// * `Poll::Pending` if data is not yet available.
331 /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
332 /// * `Poll::Ready(Err(e))` if an error is encountered.
333 ///
334 /// # Errors
335 ///
336 /// This function may encounter any standard I/O error except `WouldBlock`.
337 ///
338 /// # Examples
339 ///
340 /// ```no_run
341 /// use tokio::io::{self, ReadBuf};
342 /// use tokio::net::TcpStream;
343 ///
344 /// use std::future::poll_fn;
345 ///
346 /// #[tokio::main]
347 /// async fn main() -> io::Result<()> {
348 /// let stream = TcpStream::connect("127.0.0.1:8000").await?;
349 /// let mut buf = [0; 10];
350 /// let mut buf = ReadBuf::new(&mut buf);
351 ///
352 /// poll_fn(|cx| {
353 /// stream.poll_peek(cx, &mut buf)
354 /// }).await?;
355 ///
356 /// Ok(())
357 /// }
358 /// ```
359 pub fn poll_peek(
360 &self,
361 cx: &mut Context<'_>,
362 buf: &mut ReadBuf<'_>,
363 ) -> Poll<io::Result<usize>> {
364 loop {
365 let ev = ready!(self.io.registration().poll_read_ready(cx))?;
366
367 let b = unsafe {
368 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
369 };
370
371 match self.io.peek(b) {
372 Ok(ret) => {
373 unsafe { buf.assume_init(ret) };
374 buf.advance(ret);
375 return Poll::Ready(Ok(ret));
376 }
377 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
378 self.io.registration().clear_readiness(ev);
379 }
380 Err(e) => return Poll::Ready(Err(e)),
381 }
382 }
383 }
384
385 /// Waits for any of the requested ready states.
386 ///
387 /// This function is usually paired with `try_read()` or `try_write()`. It
388 /// can be used to concurrently read / write to the same socket on a single
389 /// task without splitting the socket.
390 ///
391 /// The function may complete without the socket being ready. This is a
392 /// false-positive and attempting an operation will return with
393 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
394 /// [`Ready`] set, so you should always check the returned value and possibly
395 /// wait again if the requested states are not set.
396 ///
397 /// # Cancel safety
398 ///
399 /// This method is cancel safe. Once a readiness event occurs, the method
400 /// will continue to return immediately until the readiness event is
401 /// consumed by an attempt to read or write that fails with `WouldBlock` or
402 /// `Poll::Pending`.
403 ///
404 /// # Examples
405 ///
406 /// Concurrently read and write to the stream on the same task without
407 /// splitting.
408 ///
409 /// ```no_run
410 /// use tokio::io::Interest;
411 /// use tokio::net::TcpStream;
412 /// use std::error::Error;
413 /// use std::io;
414 ///
415 /// #[tokio::main]
416 /// async fn main() -> Result<(), Box<dyn Error>> {
417 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
418 ///
419 /// loop {
420 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
421 ///
422 /// if ready.is_readable() {
423 /// let mut data = vec![0; 1024];
424 /// // Try to read data, this may still fail with `WouldBlock`
425 /// // if the readiness event is a false positive.
426 /// match stream.try_read(&mut data) {
427 /// Ok(n) => {
428 /// println!("read {} bytes", n);
429 /// }
430 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
431 /// continue;
432 /// }
433 /// Err(e) => {
434 /// return Err(e.into());
435 /// }
436 /// }
437 ///
438 /// }
439 ///
440 /// if ready.is_writable() {
441 /// // Try to write data, this may still fail with `WouldBlock`
442 /// // if the readiness event is a false positive.
443 /// match stream.try_write(b"hello world") {
444 /// Ok(n) => {
445 /// println!("write {} bytes", n);
446 /// }
447 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
448 /// continue
449 /// }
450 /// Err(e) => {
451 /// return Err(e.into());
452 /// }
453 /// }
454 /// }
455 /// }
456 /// }
457 /// ```
458 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
459 let event = self.io.registration().readiness(interest).await?;
460 Ok(event.ready)
461 }
462
463 /// Waits for the socket to become readable.
464 ///
465 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
466 /// paired with `try_read()`.
467 ///
468 /// # Cancel safety
469 ///
470 /// This method is cancel safe. Once a readiness event occurs, the method
471 /// will continue to return immediately until the readiness event is
472 /// consumed by an attempt to read that fails with `WouldBlock` or
473 /// `Poll::Pending`.
474 ///
475 /// # Examples
476 ///
477 /// ```no_run
478 /// use tokio::net::TcpStream;
479 /// use std::error::Error;
480 /// use std::io;
481 ///
482 /// #[tokio::main]
483 /// async fn main() -> Result<(), Box<dyn Error>> {
484 /// // Connect to a peer
485 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
486 ///
487 /// let mut msg = vec![0; 1024];
488 ///
489 /// loop {
490 /// // Wait for the socket to be readable
491 /// stream.readable().await?;
492 ///
493 /// // Try to read data, this may still fail with `WouldBlock`
494 /// // if the readiness event is a false positive.
495 /// match stream.try_read(&mut msg) {
496 /// Ok(n) => {
497 /// msg.truncate(n);
498 /// break;
499 /// }
500 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
501 /// continue;
502 /// }
503 /// Err(e) => {
504 /// return Err(e.into());
505 /// }
506 /// }
507 /// }
508 ///
509 /// println!("GOT = {:?}", msg);
510 /// Ok(())
511 /// }
512 /// ```
513 pub async fn readable(&self) -> io::Result<()> {
514 self.ready(Interest::READABLE).await?;
515 Ok(())
516 }
517
518 /// Polls for read readiness.
519 ///
520 /// If the tcp stream is not currently ready for reading, this method will
521 /// store a clone of the `Waker` from the provided `Context`. When the tcp
522 /// stream becomes ready for reading, `Waker::wake` will be called on the
523 /// waker.
524 ///
525 /// Note that on multiple calls to `poll_read_ready`, `poll_read` or
526 /// `poll_peek`, only the `Waker` from the `Context` passed to the most
527 /// recent call is scheduled to receive a wakeup. (However,
528 /// `poll_write_ready` retains a second, independent waker.)
529 ///
530 /// This function is intended for cases where creating and pinning a future
531 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
532 /// preferred, as this supports polling from multiple tasks at once.
533 ///
534 /// # Return value
535 ///
536 /// The function returns:
537 ///
538 /// * `Poll::Pending` if the tcp stream is not ready for reading.
539 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading.
540 /// * `Poll::Ready(Err(e))` if an error is encountered.
541 ///
542 /// # Errors
543 ///
544 /// This function may encounter any standard I/O error except `WouldBlock`.
545 ///
546 /// [`readable`]: method@Self::readable
547 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
548 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
549 }
550
551 /// Tries to read data from the stream into the provided buffer, returning how
552 /// many bytes were read.
553 ///
554 /// Receives any pending data from the socket but does not wait for new data
555 /// to arrive. On success, returns the number of bytes read. Because
556 /// `try_read()` is non-blocking, the buffer does not have to be stored by
557 /// the async task and can exist entirely on the stack.
558 ///
559 /// Usually, [`readable()`] or [`ready()`] is used with this function.
560 ///
561 /// [`readable()`]: TcpStream::readable()
562 /// [`ready()`]: TcpStream::ready()
563 ///
564 /// # Return
565 ///
566 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
567 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
568 ///
569 /// 1. The stream's read half is closed and will no longer yield data.
570 /// 2. The specified buffer was 0 bytes in length.
571 ///
572 /// If the stream is not ready to read data,
573 /// `Err(io::ErrorKind::WouldBlock)` is returned.
574 ///
575 /// # Examples
576 ///
577 /// ```no_run
578 /// use tokio::net::TcpStream;
579 /// use std::error::Error;
580 /// use std::io;
581 ///
582 /// #[tokio::main]
583 /// async fn main() -> Result<(), Box<dyn Error>> {
584 /// // Connect to a peer
585 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
586 ///
587 /// loop {
588 /// // Wait for the socket to be readable
589 /// stream.readable().await?;
590 ///
591 /// // Creating the buffer **after** the `await` prevents it from
592 /// // being stored in the async task.
593 /// let mut buf = [0; 4096];
594 ///
595 /// // Try to read data, this may still fail with `WouldBlock`
596 /// // if the readiness event is a false positive.
597 /// match stream.try_read(&mut buf) {
598 /// Ok(0) => break,
599 /// Ok(n) => {
600 /// println!("read {} bytes", n);
601 /// }
602 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
603 /// continue;
604 /// }
605 /// Err(e) => {
606 /// return Err(e.into());
607 /// }
608 /// }
609 /// }
610 ///
611 /// Ok(())
612 /// }
613 /// ```
614 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
615 use std::io::Read;
616
617 self.io
618 .registration()
619 .try_io(Interest::READABLE, || (&*self.io).read(buf))
620 }
621
622 /// Tries to read data from the stream into the provided buffers, returning
623 /// how many bytes were read.
624 ///
625 /// Data is copied to fill each buffer in order, with the final buffer
626 /// written to possibly being only partially filled. This method behaves
627 /// equivalently to a single call to [`try_read()`] with concatenated
628 /// buffers.
629 ///
630 /// Receives any pending data from the socket but does not wait for new data
631 /// to arrive. On success, returns the number of bytes read. Because
632 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
633 /// stored by the async task and can exist entirely on the stack.
634 ///
635 /// Usually, [`readable()`] or [`ready()`] is used with this function.
636 ///
637 /// [`try_read()`]: TcpStream::try_read()
638 /// [`readable()`]: TcpStream::readable()
639 /// [`ready()`]: TcpStream::ready()
640 ///
641 /// # Return
642 ///
643 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
644 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
645 /// and will no longer yield data. If the stream is not ready to read data
646 /// `Err(io::ErrorKind::WouldBlock)` is returned.
647 ///
648 /// # Examples
649 ///
650 /// ```no_run
651 /// use tokio::net::TcpStream;
652 /// use std::error::Error;
653 /// use std::io::{self, IoSliceMut};
654 ///
655 /// #[tokio::main]
656 /// async fn main() -> Result<(), Box<dyn Error>> {
657 /// // Connect to a peer
658 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
659 ///
660 /// loop {
661 /// // Wait for the socket to be readable
662 /// stream.readable().await?;
663 ///
664 /// // Creating the buffer **after** the `await` prevents it from
665 /// // being stored in the async task.
666 /// let mut buf_a = [0; 512];
667 /// let mut buf_b = [0; 1024];
668 /// let mut bufs = [
669 /// IoSliceMut::new(&mut buf_a),
670 /// IoSliceMut::new(&mut buf_b),
671 /// ];
672 ///
673 /// // Try to read data, this may still fail with `WouldBlock`
674 /// // if the readiness event is a false positive.
675 /// match stream.try_read_vectored(&mut bufs) {
676 /// Ok(0) => break,
677 /// Ok(n) => {
678 /// println!("read {} bytes", n);
679 /// }
680 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
681 /// continue;
682 /// }
683 /// Err(e) => {
684 /// return Err(e.into());
685 /// }
686 /// }
687 /// }
688 ///
689 /// Ok(())
690 /// }
691 /// ```
692 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
693 use std::io::Read;
694
695 self.io
696 .registration()
697 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
698 }
699
700 cfg_io_util! {
701 /// Tries to read data from the stream into the provided buffer, advancing the
702 /// buffer's internal cursor, returning how many bytes were read.
703 ///
704 /// Receives any pending data from the socket but does not wait for new data
705 /// to arrive. On success, returns the number of bytes read. Because
706 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
707 /// the async task and can exist entirely on the stack.
708 ///
709 /// Usually, [`readable()`] or [`ready()`] is used with this function.
710 ///
711 /// [`readable()`]: TcpStream::readable()
712 /// [`ready()`]: TcpStream::ready()
713 ///
714 /// # Return
715 ///
716 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
717 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
718 /// and will no longer yield data. If the stream is not ready to read data
719 /// `Err(io::ErrorKind::WouldBlock)` is returned.
720 ///
721 /// # Examples
722 ///
723 /// ```no_run
724 /// use tokio::net::TcpStream;
725 /// use std::error::Error;
726 /// use std::io;
727 ///
728 /// #[tokio::main]
729 /// async fn main() -> Result<(), Box<dyn Error>> {
730 /// // Connect to a peer
731 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
732 ///
733 /// loop {
734 /// // Wait for the socket to be readable
735 /// stream.readable().await?;
736 ///
737 /// let mut buf = Vec::with_capacity(4096);
738 ///
739 /// // Try to read data, this may still fail with `WouldBlock`
740 /// // if the readiness event is a false positive.
741 /// match stream.try_read_buf(&mut buf) {
742 /// Ok(0) => break,
743 /// Ok(n) => {
744 /// println!("read {} bytes", n);
745 /// }
746 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
747 /// continue;
748 /// }
749 /// Err(e) => {
750 /// return Err(e.into());
751 /// }
752 /// }
753 /// }
754 ///
755 /// Ok(())
756 /// }
757 /// ```
758 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
759 self.io.registration().try_io(Interest::READABLE, || {
760 use std::io::Read;
761
762 let dst = buf.chunk_mut();
763 let dst =
764 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
765
766 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
767 // buffer.
768 let n = (&*self.io).read(dst)?;
769
770 unsafe {
771 buf.advance_mut(n);
772 }
773
774 Ok(n)
775 })
776 }
777 }
778
779 /// Waits for the socket to become writable.
780 ///
781 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
782 /// paired with `try_write()`.
783 ///
784 /// # Cancel safety
785 ///
786 /// This method is cancel safe. Once a readiness event occurs, the method
787 /// will continue to return immediately until the readiness event is
788 /// consumed by an attempt to write that fails with `WouldBlock` or
789 /// `Poll::Pending`.
790 ///
791 /// # Examples
792 ///
793 /// ```no_run
794 /// use tokio::net::TcpStream;
795 /// use std::error::Error;
796 /// use std::io;
797 ///
798 /// #[tokio::main]
799 /// async fn main() -> Result<(), Box<dyn Error>> {
800 /// // Connect to a peer
801 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
802 ///
803 /// loop {
804 /// // Wait for the socket to be writable
805 /// stream.writable().await?;
806 ///
807 /// // Try to write data, this may still fail with `WouldBlock`
808 /// // if the readiness event is a false positive.
809 /// match stream.try_write(b"hello world") {
810 /// Ok(n) => {
811 /// break;
812 /// }
813 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
814 /// continue;
815 /// }
816 /// Err(e) => {
817 /// return Err(e.into());
818 /// }
819 /// }
820 /// }
821 ///
822 /// Ok(())
823 /// }
824 /// ```
825 pub async fn writable(&self) -> io::Result<()> {
826 self.ready(Interest::WRITABLE).await?;
827 Ok(())
828 }
829
830 /// Polls for write readiness.
831 ///
832 /// If the tcp stream is not currently ready for writing, this method will
833 /// store a clone of the `Waker` from the provided `Context`. When the tcp
834 /// stream becomes ready for writing, `Waker::wake` will be called on the
835 /// waker.
836 ///
837 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
838 /// the `Waker` from the `Context` passed to the most recent call is
839 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
840 /// second, independent waker.)
841 ///
842 /// This function is intended for cases where creating and pinning a future
843 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
844 /// preferred, as this supports polling from multiple tasks at once.
845 ///
846 /// # Return value
847 ///
848 /// The function returns:
849 ///
850 /// * `Poll::Pending` if the tcp stream is not ready for writing.
851 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing.
852 /// * `Poll::Ready(Err(e))` if an error is encountered.
853 ///
854 /// # Errors
855 ///
856 /// This function may encounter any standard I/O error except `WouldBlock`.
857 ///
858 /// [`writable`]: method@Self::writable
859 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
860 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
861 }
862
863 /// Try to write a buffer to the stream, returning how many bytes were
864 /// written.
865 ///
866 /// The function will attempt to write the entire contents of `buf`, but
867 /// only part of the buffer may be written.
868 ///
869 /// This function is usually paired with `writable()`.
870 ///
871 /// # Return
872 ///
873 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
874 /// number of bytes written. If the stream is not ready to write data,
875 /// `Err(io::ErrorKind::WouldBlock)` is returned.
876 ///
877 /// # Examples
878 ///
879 /// ```no_run
880 /// use tokio::net::TcpStream;
881 /// use std::error::Error;
882 /// use std::io;
883 ///
884 /// #[tokio::main]
885 /// async fn main() -> Result<(), Box<dyn Error>> {
886 /// // Connect to a peer
887 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
888 ///
889 /// loop {
890 /// // Wait for the socket to be writable
891 /// stream.writable().await?;
892 ///
893 /// // Try to write data, this may still fail with `WouldBlock`
894 /// // if the readiness event is a false positive.
895 /// match stream.try_write(b"hello world") {
896 /// Ok(n) => {
897 /// break;
898 /// }
899 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
900 /// continue;
901 /// }
902 /// Err(e) => {
903 /// return Err(e.into());
904 /// }
905 /// }
906 /// }
907 ///
908 /// Ok(())
909 /// }
910 /// ```
911 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
912 use std::io::Write;
913
914 self.io
915 .registration()
916 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
917 }
918
919 /// Tries to write several buffers to the stream, returning how many bytes
920 /// were written.
921 ///
922 /// Data is written from each buffer in order, with the final buffer read
923 /// from possibly being only partially consumed. This method behaves
924 /// equivalently to a single call to [`try_write()`] with concatenated
925 /// buffers.
926 ///
927 /// This function is usually paired with `writable()`.
928 ///
929 /// [`try_write()`]: TcpStream::try_write()
930 ///
931 /// # Return
932 ///
933 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
934 /// number of bytes written. If the stream is not ready to write data,
935 /// `Err(io::ErrorKind::WouldBlock)` is returned.
936 ///
937 /// # Examples
938 ///
939 /// ```no_run
940 /// use tokio::net::TcpStream;
941 /// use std::error::Error;
942 /// use std::io;
943 ///
944 /// #[tokio::main]
945 /// async fn main() -> Result<(), Box<dyn Error>> {
946 /// // Connect to a peer
947 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
948 ///
949 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
950 ///
951 /// loop {
952 /// // Wait for the socket to be writable
953 /// stream.writable().await?;
954 ///
955 /// // Try to write data, this may still fail with `WouldBlock`
956 /// // if the readiness event is a false positive.
957 /// match stream.try_write_vectored(&bufs) {
958 /// Ok(n) => {
959 /// break;
960 /// }
961 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
962 /// continue;
963 /// }
964 /// Err(e) => {
965 /// return Err(e.into());
966 /// }
967 /// }
968 /// }
969 ///
970 /// Ok(())
971 /// }
972 /// ```
973 pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
974 use std::io::Write;
975
976 self.io
977 .registration()
978 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
979 }
980
981 /// Tries to read or write from the socket using a user-provided IO operation.
982 ///
983 /// If the socket is ready, the provided closure is called. The closure
984 /// should attempt to perform IO operation on the socket by manually
985 /// calling the appropriate syscall. If the operation fails because the
986 /// socket is not actually ready, then the closure should return a
987 /// `WouldBlock` error and the readiness flag is cleared. The return value
988 /// of the closure is then returned by `try_io`.
989 ///
990 /// If the socket is not ready, then the closure is not called
991 /// and a `WouldBlock` error is returned.
992 ///
993 /// The closure should only return a `WouldBlock` error if it has performed
994 /// an IO operation on the socket that failed due to the socket not being
995 /// ready. Returning a `WouldBlock` error in any other situation will
996 /// incorrectly clear the readiness flag, which can cause the socket to
997 /// behave incorrectly.
998 ///
999 /// The closure should not perform the IO operation using any of the methods
1000 /// defined on the Tokio `TcpStream` type, as this will mess with the
1001 /// readiness flag and can cause the socket to behave incorrectly.
1002 ///
1003 /// This method is not intended to be used with combined interests.
1004 /// The closure should perform only one type of IO operation, so it should not
1005 /// require more than one ready state. This method may panic or sleep forever
1006 /// if it is called with a combined interest.
1007 ///
1008 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1009 ///
1010 /// [`readable()`]: TcpStream::readable()
1011 /// [`writable()`]: TcpStream::writable()
1012 /// [`ready()`]: TcpStream::ready()
1013 pub fn try_io<R>(
1014 &self,
1015 interest: Interest,
1016 f: impl FnOnce() -> io::Result<R>,
1017 ) -> io::Result<R> {
1018 self.io
1019 .registration()
1020 .try_io(interest, || self.io.try_io(f))
1021 }
1022
1023 /// Reads or writes from the socket using a user-provided IO operation.
1024 ///
1025 /// The readiness of the socket is awaited and when the socket is ready,
1026 /// the provided closure is called. The closure should attempt to perform
1027 /// IO operation on the socket by manually calling the appropriate syscall.
1028 /// If the operation fails because the socket is not actually ready,
1029 /// then the closure should return a `WouldBlock` error. In such case the
1030 /// readiness flag is cleared and the socket readiness is awaited again.
1031 /// This loop is repeated until the closure returns an `Ok` or an error
1032 /// other than `WouldBlock`.
1033 ///
1034 /// The closure should only return a `WouldBlock` error if it has performed
1035 /// an IO operation on the socket that failed due to the socket not being
1036 /// ready. Returning a `WouldBlock` error in any other situation will
1037 /// incorrectly clear the readiness flag, which can cause the socket to
1038 /// behave incorrectly.
1039 ///
1040 /// The closure should not perform the IO operation using any of the methods
1041 /// defined on the Tokio `TcpStream` type, as this will mess with the
1042 /// readiness flag and can cause the socket to behave incorrectly.
1043 ///
1044 /// This method is not intended to be used with combined interests.
1045 /// The closure should perform only one type of IO operation, so it should not
1046 /// require more than one ready state. This method may panic or sleep forever
1047 /// if it is called with a combined interest.
1048 pub async fn async_io<R>(
1049 &self,
1050 interest: Interest,
1051 mut f: impl FnMut() -> io::Result<R>,
1052 ) -> io::Result<R> {
1053 self.io
1054 .registration()
1055 .async_io(interest, || self.io.try_io(&mut f))
1056 .await
1057 }
1058
1059 /// Receives data on the socket from the remote address to which it is
1060 /// connected, without removing that data from the queue. On success,
1061 /// returns the number of bytes peeked.
1062 ///
1063 /// Successive calls return the same data. This is accomplished by passing
1064 /// `MSG_PEEK` as a flag to the underlying `recv` system call.
1065 ///
1066 /// # Examples
1067 ///
1068 /// ```no_run
1069 /// use tokio::net::TcpStream;
1070 /// use tokio::io::AsyncReadExt;
1071 /// use std::error::Error;
1072 ///
1073 /// #[tokio::main]
1074 /// async fn main() -> Result<(), Box<dyn Error>> {
1075 /// // Connect to a peer
1076 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
1077 ///
1078 /// let mut b1 = [0; 10];
1079 /// let mut b2 = [0; 10];
1080 ///
1081 /// // Peek at the data
1082 /// let n = stream.peek(&mut b1).await?;
1083 ///
1084 /// // Read the data
1085 /// assert_eq!(n, stream.read(&mut b2[..n]).await?);
1086 /// assert_eq!(&b1[..n], &b2[..n]);
1087 ///
1088 /// Ok(())
1089 /// }
1090 /// ```
1091 ///
1092 /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
1093 ///
1094 /// [`read`]: fn@crate::io::AsyncReadExt::read
1095 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
1096 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1097 self.io
1098 .registration()
1099 .async_io(Interest::READABLE, || self.io.peek(buf))
1100 .await
1101 }
1102
1103 /// Shuts down the read, write, or both halves of this connection.
1104 ///
1105 /// This function will cause all pending and future I/O on the specified
1106 /// portions to return immediately with an appropriate value (see the
1107 /// documentation of `Shutdown`).
1108 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1109 self.io.shutdown(how)
1110 }
1111
1112 /// Gets the value of the `TCP_NODELAY` option on this socket.
1113 ///
1114 /// For more information about this option, see [`set_nodelay`].
1115 ///
1116 /// [`set_nodelay`]: TcpStream::set_nodelay
1117 ///
1118 /// # Examples
1119 ///
1120 /// ```no_run
1121 /// use tokio::net::TcpStream;
1122 ///
1123 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1124 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1125 ///
1126 /// println!("{:?}", stream.nodelay()?);
1127 /// # Ok(())
1128 /// # }
1129 /// ```
1130 pub fn nodelay(&self) -> io::Result<bool> {
1131 self.io.nodelay()
1132 }
1133
1134 /// Sets the value of the `TCP_NODELAY` option on this socket.
1135 ///
1136 /// If set, this option disables the Nagle algorithm. This means that
1137 /// segments are always sent as soon as possible, even if there is only a
1138 /// small amount of data. When not set, data is buffered until there is a
1139 /// sufficient amount to send out, thereby avoiding the frequent sending of
1140 /// small packets.
1141 ///
1142 /// # Examples
1143 ///
1144 /// ```no_run
1145 /// use tokio::net::TcpStream;
1146 ///
1147 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1148 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1149 ///
1150 /// stream.set_nodelay(true)?;
1151 /// # Ok(())
1152 /// # }
1153 /// ```
1154 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
1155 self.io.set_nodelay(nodelay)
1156 }
1157
1158 cfg_not_wasi! {
1159 /// Reads the linger duration for this socket by getting the `SO_LINGER`
1160 /// option.
1161 ///
1162 /// For more information about this option, see [`set_linger`].
1163 ///
1164 /// [`set_linger`]: TcpStream::set_linger
1165 ///
1166 /// # Examples
1167 ///
1168 /// ```no_run
1169 /// use tokio::net::TcpStream;
1170 ///
1171 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1172 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1173 ///
1174 /// println!("{:?}", stream.linger()?);
1175 /// # Ok(())
1176 /// # }
1177 /// ```
1178 pub fn linger(&self) -> io::Result<Option<Duration>> {
1179 socket2::SockRef::from(self).linger()
1180 }
1181
1182 /// Sets the linger duration of this socket by setting the `SO_LINGER` option.
1183 ///
1184 /// This option controls the action taken when a stream has unsent messages and the stream is
1185 /// closed. If `SO_LINGER` is set, the system shall block the process until it can transmit the
1186 /// data or until the time expires.
1187 ///
1188 /// If `SO_LINGER` is not specified, and the stream is closed, the system handles the call in a
1189 /// way that allows the process to continue as quickly as possible.
1190 ///
1191 /// # Examples
1192 ///
1193 /// ```no_run
1194 /// use tokio::net::TcpStream;
1195 ///
1196 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1197 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1198 ///
1199 /// stream.set_linger(None)?;
1200 /// # Ok(())
1201 /// # }
1202 /// ```
1203 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
1204 socket2::SockRef::from(self).set_linger(dur)
1205 }
1206 }
1207
1208 /// Gets the value of the `IP_TTL` option for this socket.
1209 ///
1210 /// For more information about this option, see [`set_ttl`].
1211 ///
1212 /// [`set_ttl`]: TcpStream::set_ttl
1213 ///
1214 /// # Examples
1215 ///
1216 /// ```no_run
1217 /// use tokio::net::TcpStream;
1218 ///
1219 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1220 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1221 ///
1222 /// println!("{:?}", stream.ttl()?);
1223 /// # Ok(())
1224 /// # }
1225 /// ```
1226 pub fn ttl(&self) -> io::Result<u32> {
1227 self.io.ttl()
1228 }
1229
1230 /// Sets the value for the `IP_TTL` option on this socket.
1231 ///
1232 /// This value sets the time-to-live field that is used in every packet sent
1233 /// from this socket.
1234 ///
1235 /// # Examples
1236 ///
1237 /// ```no_run
1238 /// use tokio::net::TcpStream;
1239 ///
1240 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1241 /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1242 ///
1243 /// stream.set_ttl(123)?;
1244 /// # Ok(())
1245 /// # }
1246 /// ```
1247 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1248 self.io.set_ttl(ttl)
1249 }
1250
1251 // These lifetime markers also appear in the generated documentation, and make
1252 // it more clear that this is a *borrowed* split.
1253 #[allow(clippy::needless_lifetimes)]
1254 /// Splits a `TcpStream` into a read half and a write half, which can be used
1255 /// to read and write the stream concurrently.
1256 ///
1257 /// This method is more efficient than [`into_split`], but the halves cannot be
1258 /// moved into independently spawned tasks.
1259 ///
1260 /// [`into_split`]: TcpStream::into_split()
1261 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
1262 split(self)
1263 }
1264
1265 /// Splits a `TcpStream` into a read half and a write half, which can be used
1266 /// to read and write the stream concurrently.
1267 ///
1268 /// Unlike [`split`], the owned halves can be moved to separate tasks, however
1269 /// this comes at the cost of a heap allocation.
1270 ///
1271 /// **Note:** Dropping the write half will shut down the write half of the TCP
1272 /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.
1273 ///
1274 /// [`split`]: TcpStream::split()
1275 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
1276 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
1277 split_owned(self)
1278 }
1279
1280 // == Poll IO functions that takes `&self` ==
1281 //
1282 // To read or write without mutable access to the `TcpStream`, combine the
1283 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1284 // `try_write` methods.
1285
1286 pub(crate) fn poll_read_priv(
1287 &self,
1288 cx: &mut Context<'_>,
1289 buf: &mut ReadBuf<'_>,
1290 ) -> Poll<io::Result<()>> {
1291 // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
1292 unsafe { self.io.poll_read(cx, buf) }
1293 }
1294
1295 pub(super) fn poll_write_priv(
1296 &self,
1297 cx: &mut Context<'_>,
1298 buf: &[u8],
1299 ) -> Poll<io::Result<usize>> {
1300 self.io.poll_write(cx, buf)
1301 }
1302
1303 pub(super) fn poll_write_vectored_priv(
1304 &self,
1305 cx: &mut Context<'_>,
1306 bufs: &[io::IoSlice<'_>],
1307 ) -> Poll<io::Result<usize>> {
1308 self.io.poll_write_vectored(cx, bufs)
1309 }
1310}
1311
1312impl TryFrom<std::net::TcpStream> for TcpStream {
1313 type Error = io::Error;
1314
1315 /// Consumes stream, returning the tokio I/O object.
1316 ///
1317 /// This is equivalent to
1318 /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
1319 fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
1320 Self::from_std(stream)
1321 }
1322}
1323
1324// ===== impl Read / Write =====
1325
1326impl AsyncRead for TcpStream {
1327 fn poll_read(
1328 self: Pin<&mut Self>,
1329 cx: &mut Context<'_>,
1330 buf: &mut ReadBuf<'_>,
1331 ) -> Poll<io::Result<()>> {
1332 self.poll_read_priv(cx, buf)
1333 }
1334}
1335
1336impl AsyncWrite for TcpStream {
1337 fn poll_write(
1338 self: Pin<&mut Self>,
1339 cx: &mut Context<'_>,
1340 buf: &[u8],
1341 ) -> Poll<io::Result<usize>> {
1342 self.poll_write_priv(cx, buf)
1343 }
1344
1345 fn poll_write_vectored(
1346 self: Pin<&mut Self>,
1347 cx: &mut Context<'_>,
1348 bufs: &[io::IoSlice<'_>],
1349 ) -> Poll<io::Result<usize>> {
1350 self.poll_write_vectored_priv(cx, bufs)
1351 }
1352
1353 fn is_write_vectored(&self) -> bool {
1354 true
1355 }
1356
1357 #[inline]
1358 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1359 // tcp flush is a no-op
1360 Poll::Ready(Ok(()))
1361 }
1362
1363 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1364 self.shutdown_std(std::net::Shutdown::Write)?;
1365 Poll::Ready(Ok(()))
1366 }
1367}
1368
1369impl fmt::Debug for TcpStream {
1370 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1371 self.io.fmt(f)
1372 }
1373}
1374
1375#[cfg(unix)]
1376mod sys {
1377 use super::TcpStream;
1378 use std::os::unix::prelude::*;
1379
1380 impl AsRawFd for TcpStream {
1381 fn as_raw_fd(&self) -> RawFd {
1382 self.io.as_raw_fd()
1383 }
1384 }
1385
1386 impl AsFd for TcpStream {
1387 fn as_fd(&self) -> BorrowedFd<'_> {
1388 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1389 }
1390 }
1391}
1392
1393cfg_windows! {
1394 use crate::os::windows::io::{AsRawSocket, RawSocket, AsSocket, BorrowedSocket};
1395
1396 impl AsRawSocket for TcpStream {
1397 fn as_raw_socket(&self) -> RawSocket {
1398 self.io.as_raw_socket()
1399 }
1400 }
1401
1402 impl AsSocket for TcpStream {
1403 fn as_socket(&self) -> BorrowedSocket<'_> {
1404 unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
1405 }
1406 }
1407}
1408
1409#[cfg(all(tokio_unstable, target_os = "wasi"))]
1410mod sys {
1411 use super::TcpStream;
1412 use std::os::wasi::prelude::*;
1413
1414 impl AsRawFd for TcpStream {
1415 fn as_raw_fd(&self) -> RawFd {
1416 self.io.as_raw_fd()
1417 }
1418 }
1419
1420 impl AsFd for TcpStream {
1421 fn as_fd(&self) -> BorrowedFd<'_> {
1422 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1423 }
1424 }
1425}