tokio/net/unix/datagram/socket.rs
1use crate::io::{Interest, PollEvented, ReadBuf, Ready};
2use crate::net::unix::SocketAddr;
3
4use std::fmt;
5use std::io;
6use std::net::Shutdown;
7use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
8use std::os::unix::net;
9use std::path::Path;
10use std::task::{ready, Context, Poll};
11
12cfg_io_util! {
13 use bytes::BufMut;
14}
15
16cfg_net_unix! {
17 /// An I/O object representing a Unix datagram socket.
18 ///
19 /// A socket can be either named (associated with a filesystem path) or
20 /// unnamed.
21 ///
22 /// This type does not provide a `split` method, because this functionality
23 /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
24 /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>`
25 /// is enough. This is because all of the methods take `&self` instead of
26 /// `&mut self`.
27 ///
28 /// **Note:** named sockets are persisted even after the object is dropped
29 /// and the program has exited, and cannot be reconnected. It is advised
30 /// that you either check for and unlink the existing socket if it exists,
31 /// or use a temporary file that is guaranteed to not already exist.
32 ///
33 /// [`Arc`]: std::sync::Arc
34 ///
35 /// # Examples
36 /// Using named sockets, associated with a filesystem path:
37 /// ```
38 /// # if cfg!(miri) { return } // No `socket` in miri.
39 /// # use std::error::Error;
40 /// # #[tokio::main]
41 /// # async fn main() -> Result<(), Box<dyn Error>> {
42 /// use tokio::net::UnixDatagram;
43 /// use tempfile::tempdir;
44 ///
45 /// // We use a temporary directory so that the socket
46 /// // files left by the bound sockets will get cleaned up.
47 /// let tmp = tempdir()?;
48 ///
49 /// // Bind each socket to a filesystem path
50 /// let tx_path = tmp.path().join("tx");
51 /// let tx = UnixDatagram::bind(&tx_path)?;
52 /// let rx_path = tmp.path().join("rx");
53 /// let rx = UnixDatagram::bind(&rx_path)?;
54 ///
55 /// let bytes = b"hello world";
56 /// tx.send_to(bytes, &rx_path).await?;
57 ///
58 /// let mut buf = vec![0u8; 24];
59 /// let (size, addr) = rx.recv_from(&mut buf).await?;
60 ///
61 /// let dgram = &buf[..size];
62 /// assert_eq!(dgram, bytes);
63 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
64 ///
65 /// # Ok(())
66 /// # }
67 /// ```
68 ///
69 /// Using unnamed sockets, created as a pair
70 /// ```
71 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
72 /// # use std::error::Error;
73 /// # #[tokio::main]
74 /// # async fn main() -> Result<(), Box<dyn Error>> {
75 /// use tokio::net::UnixDatagram;
76 ///
77 /// // Create the pair of sockets
78 /// let (sock1, sock2) = UnixDatagram::pair()?;
79 ///
80 /// // Since the sockets are paired, the paired send/recv
81 /// // functions can be used
82 /// let bytes = b"hello world";
83 /// sock1.send(bytes).await?;
84 ///
85 /// let mut buff = vec![0u8; 24];
86 /// let size = sock2.recv(&mut buff).await?;
87 ///
88 /// let dgram = &buff[..size];
89 /// assert_eq!(dgram, bytes);
90 ///
91 /// # Ok(())
92 /// # }
93 /// ```
94 #[cfg_attr(docsrs, doc(alias = "uds"))]
95 pub struct UnixDatagram {
96 io: PollEvented<mio::net::UnixDatagram>,
97 }
98}
99
100impl UnixDatagram {
101 pub(crate) fn from_mio(sys: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
102 let datagram = UnixDatagram::new(sys)?;
103
104 if let Some(e) = datagram.io.take_error()? {
105 return Err(e);
106 }
107
108 Ok(datagram)
109 }
110
111 /// Waits for any of the requested ready states.
112 ///
113 /// This function is usually paired with `try_recv()` or `try_send()`. It
114 /// can be used to concurrently `recv` / `send` to the same socket on a single
115 /// task without splitting the socket.
116 ///
117 /// The function may complete without the socket being ready. This is a
118 /// false-positive and attempting an operation will return with
119 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
120 /// [`Ready`] set, so you should always check the returned value and possibly
121 /// wait again if the requested states are not set.
122 ///
123 /// # Cancel safety
124 ///
125 /// This method is cancel safe. Once a readiness event occurs, the method
126 /// will continue to return immediately until the readiness event is
127 /// consumed by an attempt to read or write that fails with `WouldBlock` or
128 /// `Poll::Pending`.
129 ///
130 /// # Examples
131 ///
132 /// Concurrently receive from and send to the socket on the same task
133 /// without splitting.
134 ///
135 /// ```no_run
136 /// use tokio::io::Interest;
137 /// use tokio::net::UnixDatagram;
138 /// use std::io;
139 ///
140 /// #[tokio::main]
141 /// async fn main() -> io::Result<()> {
142 /// let dir = tempfile::tempdir().unwrap();
143 /// let client_path = dir.path().join("client.sock");
144 /// let server_path = dir.path().join("server.sock");
145 /// let socket = UnixDatagram::bind(&client_path)?;
146 /// socket.connect(&server_path)?;
147 ///
148 /// loop {
149 /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
150 ///
151 /// if ready.is_readable() {
152 /// let mut data = [0; 1024];
153 /// match socket.try_recv(&mut data[..]) {
154 /// Ok(n) => {
155 /// println!("received {:?}", &data[..n]);
156 /// }
157 /// // False-positive, continue
158 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
159 /// Err(e) => {
160 /// return Err(e);
161 /// }
162 /// }
163 /// }
164 ///
165 /// if ready.is_writable() {
166 /// // Write some data
167 /// match socket.try_send(b"hello world") {
168 /// Ok(n) => {
169 /// println!("sent {} bytes", n);
170 /// }
171 /// // False-positive, continue
172 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
173 /// Err(e) => {
174 /// return Err(e);
175 /// }
176 /// }
177 /// }
178 /// }
179 /// }
180 /// ```
181 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
182 let event = self.io.registration().readiness(interest).await?;
183 Ok(event.ready)
184 }
185
186 /// Waits for the socket to become writable.
187 ///
188 /// This function is equivalent to `ready(Interest::WRITABLE)` and is
189 /// usually paired with `try_send()` or `try_send_to()`.
190 ///
191 /// The function may complete without the socket being writable. This is a
192 /// false-positive and attempting a `try_send()` will return with
193 /// `io::ErrorKind::WouldBlock`.
194 ///
195 /// # Cancel safety
196 ///
197 /// This method is cancel safe. Once a readiness event occurs, the method
198 /// will continue to return immediately until the readiness event is
199 /// consumed by an attempt to write that fails with `WouldBlock` or
200 /// `Poll::Pending`.
201 ///
202 /// # Examples
203 ///
204 /// ```no_run
205 /// use tokio::net::UnixDatagram;
206 /// use std::io;
207 ///
208 /// #[tokio::main]
209 /// async fn main() -> io::Result<()> {
210 /// let dir = tempfile::tempdir().unwrap();
211 /// let client_path = dir.path().join("client.sock");
212 /// let server_path = dir.path().join("server.sock");
213 /// let socket = UnixDatagram::bind(&client_path)?;
214 /// socket.connect(&server_path)?;
215 ///
216 /// loop {
217 /// // Wait for the socket to be writable
218 /// socket.writable().await?;
219 ///
220 /// // Try to send data, this may still fail with `WouldBlock`
221 /// // if the readiness event is a false positive.
222 /// match socket.try_send(b"hello world") {
223 /// Ok(n) => {
224 /// break;
225 /// }
226 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
227 /// continue;
228 /// }
229 /// Err(e) => {
230 /// return Err(e);
231 /// }
232 /// }
233 /// }
234 ///
235 /// Ok(())
236 /// }
237 /// ```
238 pub async fn writable(&self) -> io::Result<()> {
239 self.ready(Interest::WRITABLE).await?;
240 Ok(())
241 }
242
243 /// Polls for write/send readiness.
244 ///
245 /// If the socket is not currently ready for sending, this method will
246 /// store a clone of the `Waker` from the provided `Context`. When the socket
247 /// becomes ready for sending, `Waker::wake` will be called on the
248 /// waker.
249 ///
250 /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
251 /// the `Waker` from the `Context` passed to the most recent call is
252 /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
253 /// second, independent waker.)
254 ///
255 /// This function is intended for cases where creating and pinning a future
256 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
257 /// preferred, as this supports polling from multiple tasks at once.
258 ///
259 /// # Return value
260 ///
261 /// The function returns:
262 ///
263 /// * `Poll::Pending` if the socket is not ready for writing.
264 /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
265 /// * `Poll::Ready(Err(e))` if an error is encountered.
266 ///
267 /// # Errors
268 ///
269 /// This function may encounter any standard I/O error except `WouldBlock`.
270 ///
271 /// [`writable`]: method@Self::writable
272 pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
273 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
274 }
275
276 /// Waits for the socket to become readable.
277 ///
278 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
279 /// paired with `try_recv()`.
280 ///
281 /// The function may complete without the socket being readable. This is a
282 /// false-positive and attempting a `try_recv()` will return with
283 /// `io::ErrorKind::WouldBlock`.
284 ///
285 /// # Cancel safety
286 ///
287 /// This method is cancel safe. Once a readiness event occurs, the method
288 /// will continue to return immediately until the readiness event is
289 /// consumed by an attempt to read that fails with `WouldBlock` or
290 /// `Poll::Pending`.
291 ///
292 /// # Examples
293 ///
294 /// ```no_run
295 /// use tokio::net::UnixDatagram;
296 /// use std::io;
297 ///
298 /// #[tokio::main]
299 /// async fn main() -> io::Result<()> {
300 /// // Connect to a peer
301 /// let dir = tempfile::tempdir().unwrap();
302 /// let client_path = dir.path().join("client.sock");
303 /// let server_path = dir.path().join("server.sock");
304 /// let socket = UnixDatagram::bind(&client_path)?;
305 /// socket.connect(&server_path)?;
306 ///
307 /// loop {
308 /// // Wait for the socket to be readable
309 /// socket.readable().await?;
310 ///
311 /// // The buffer is **not** included in the async task and will
312 /// // only exist on the stack.
313 /// let mut buf = [0; 1024];
314 ///
315 /// // Try to recv data, this may still fail with `WouldBlock`
316 /// // if the readiness event is a false positive.
317 /// match socket.try_recv(&mut buf) {
318 /// Ok(n) => {
319 /// println!("GOT {:?}", &buf[..n]);
320 /// break;
321 /// }
322 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
323 /// continue;
324 /// }
325 /// Err(e) => {
326 /// return Err(e);
327 /// }
328 /// }
329 /// }
330 ///
331 /// Ok(())
332 /// }
333 /// ```
334 pub async fn readable(&self) -> io::Result<()> {
335 self.ready(Interest::READABLE).await?;
336 Ok(())
337 }
338
339 /// Polls for read/receive readiness.
340 ///
341 /// If the socket is not currently ready for receiving, this method will
342 /// store a clone of the `Waker` from the provided `Context`. When the
343 /// socket becomes ready for reading, `Waker::wake` will be called on the
344 /// waker.
345 ///
346 /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
347 /// `poll_peek`, only the `Waker` from the `Context` passed to the most
348 /// recent call is scheduled to receive a wakeup. (However,
349 /// `poll_send_ready` retains a second, independent waker.)
350 ///
351 /// This function is intended for cases where creating and pinning a future
352 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
353 /// preferred, as this supports polling from multiple tasks at once.
354 ///
355 /// # Return value
356 ///
357 /// The function returns:
358 ///
359 /// * `Poll::Pending` if the socket is not ready for reading.
360 /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
361 /// * `Poll::Ready(Err(e))` if an error is encountered.
362 ///
363 /// # Errors
364 ///
365 /// This function may encounter any standard I/O error except `WouldBlock`.
366 ///
367 /// [`readable`]: method@Self::readable
368 pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
369 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
370 }
371
372 /// Creates a new `UnixDatagram` bound to the specified path.
373 ///
374 /// # Examples
375 /// ```
376 /// # if cfg!(miri) { return } // No `socket` in miri.
377 /// # use std::error::Error;
378 /// # #[tokio::main]
379 /// # async fn main() -> Result<(), Box<dyn Error>> {
380 /// use tokio::net::UnixDatagram;
381 /// use tempfile::tempdir;
382 ///
383 /// // We use a temporary directory so that the socket
384 /// // files left by the bound sockets will get cleaned up.
385 /// let tmp = tempdir()?;
386 ///
387 /// // Bind the socket to a filesystem path
388 /// let socket_path = tmp.path().join("socket");
389 /// let socket = UnixDatagram::bind(&socket_path)?;
390 ///
391 /// # Ok(())
392 /// # }
393 /// ```
394 pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
395 where
396 P: AsRef<Path>,
397 {
398 let socket = mio::net::UnixDatagram::bind(path)?;
399 UnixDatagram::new(socket)
400 }
401
402 /// Creates an unnamed pair of connected sockets.
403 ///
404 /// This function will create a pair of interconnected Unix sockets for
405 /// communicating back and forth between one another.
406 ///
407 /// # Examples
408 /// ```
409 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
410 /// # use std::error::Error;
411 /// # #[tokio::main]
412 /// # async fn main() -> Result<(), Box<dyn Error>> {
413 /// use tokio::net::UnixDatagram;
414 ///
415 /// // Create the pair of sockets
416 /// let (sock1, sock2) = UnixDatagram::pair()?;
417 ///
418 /// // Since the sockets are paired, the paired send/recv
419 /// // functions can be used
420 /// let bytes = b"hail eris";
421 /// sock1.send(bytes).await?;
422 ///
423 /// let mut buff = vec![0u8; 24];
424 /// let size = sock2.recv(&mut buff).await?;
425 ///
426 /// let dgram = &buff[..size];
427 /// assert_eq!(dgram, bytes);
428 ///
429 /// # Ok(())
430 /// # }
431 /// ```
432 pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
433 let (a, b) = mio::net::UnixDatagram::pair()?;
434 let a = UnixDatagram::new(a)?;
435 let b = UnixDatagram::new(b)?;
436
437 Ok((a, b))
438 }
439
440 /// Creates new [`UnixDatagram`] from a [`std::os::unix::net::UnixDatagram`].
441 ///
442 /// This function is intended to be used to wrap a `UnixDatagram` from the
443 /// standard library in the Tokio equivalent.
444 ///
445 /// # Notes
446 ///
447 /// The caller is responsible for ensuring that the socket is in
448 /// non-blocking mode. Otherwise all I/O operations on the socket
449 /// will block the thread, which will cause unexpected behavior.
450 /// Non-blocking mode can be set using [`set_nonblocking`].
451 ///
452 /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking
453 ///
454 /// # Panics
455 ///
456 /// This function panics if it is not called from within a runtime with
457 /// IO enabled.
458 ///
459 /// The runtime is usually set implicitly when this function is called
460 /// from a future driven by a Tokio runtime, otherwise runtime can be set
461 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
462 /// # Examples
463 /// ```
464 /// # if cfg!(miri) { return } // No `socket` in miri.
465 /// # use std::error::Error;
466 /// # #[tokio::main]
467 /// # async fn main() -> Result<(), Box<dyn Error>> {
468 /// use tokio::net::UnixDatagram;
469 /// use std::os::unix::net::UnixDatagram as StdUDS;
470 /// use tempfile::tempdir;
471 ///
472 /// // We use a temporary directory so that the socket
473 /// // files left by the bound sockets will get cleaned up.
474 /// let tmp = tempdir()?;
475 ///
476 /// // Bind the socket to a filesystem path
477 /// let socket_path = tmp.path().join("socket");
478 /// let std_socket = StdUDS::bind(&socket_path)?;
479 /// std_socket.set_nonblocking(true)?;
480 /// let tokio_socket = UnixDatagram::from_std(std_socket)?;
481 ///
482 /// # Ok(())
483 /// # }
484 /// ```
485 #[track_caller]
486 pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
487 let socket = mio::net::UnixDatagram::from_std(datagram);
488 let io = PollEvented::new(socket)?;
489 Ok(UnixDatagram { io })
490 }
491
492 /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
493 ///
494 /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
495 /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
496 /// if needed.
497 ///
498 /// # Examples
499 ///
500 /// ```rust,no_run
501 /// # use std::error::Error;
502 /// # async fn dox() -> Result<(), Box<dyn Error>> {
503 /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?;
504 /// let std_socket = tokio_socket.into_std()?;
505 /// std_socket.set_nonblocking(false)?;
506 /// # Ok(())
507 /// # }
508 /// ```
509 ///
510 /// [`tokio::net::UnixDatagram`]: UnixDatagram
511 /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram
512 /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking
513 pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> {
514 self.io
515 .into_inner()
516 .map(IntoRawFd::into_raw_fd)
517 .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) })
518 }
519
520 fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
521 let io = PollEvented::new(socket)?;
522 Ok(UnixDatagram { io })
523 }
524
525 /// Creates a new `UnixDatagram` which is not bound to any address.
526 ///
527 /// # Examples
528 /// ```
529 /// # if cfg!(miri) { return } // No `socket` in miri.
530 /// # use std::error::Error;
531 /// # #[tokio::main]
532 /// # async fn main() -> Result<(), Box<dyn Error>> {
533 /// use tokio::net::UnixDatagram;
534 /// use tempfile::tempdir;
535 ///
536 /// // Create an unbound socket
537 /// let tx = UnixDatagram::unbound()?;
538 ///
539 /// // Create another, bound socket
540 /// let tmp = tempdir()?;
541 /// let rx_path = tmp.path().join("rx");
542 /// let rx = UnixDatagram::bind(&rx_path)?;
543 ///
544 /// // Send to the bound socket
545 /// let bytes = b"hello world";
546 /// tx.send_to(bytes, &rx_path).await?;
547 ///
548 /// let mut buf = vec![0u8; 24];
549 /// let (size, addr) = rx.recv_from(&mut buf).await?;
550 ///
551 /// let dgram = &buf[..size];
552 /// assert_eq!(dgram, bytes);
553 ///
554 /// # Ok(())
555 /// # }
556 /// ```
557 pub fn unbound() -> io::Result<UnixDatagram> {
558 let socket = mio::net::UnixDatagram::unbound()?;
559 UnixDatagram::new(socket)
560 }
561
562 /// Connects the socket to the specified address.
563 ///
564 /// The `send` method may be used to send data to the specified address.
565 /// `recv` and `recv_from` will only receive data from that address.
566 ///
567 /// # Examples
568 /// ```
569 /// # if cfg!(miri) { return } // No `socket` in miri.
570 /// # use std::error::Error;
571 /// # #[tokio::main]
572 /// # async fn main() -> Result<(), Box<dyn Error>> {
573 /// use tokio::net::UnixDatagram;
574 /// use tempfile::tempdir;
575 ///
576 /// // Create an unbound socket
577 /// let tx = UnixDatagram::unbound()?;
578 ///
579 /// // Create another, bound socket
580 /// let tmp = tempdir()?;
581 /// let rx_path = tmp.path().join("rx");
582 /// let rx = UnixDatagram::bind(&rx_path)?;
583 ///
584 /// // Connect to the bound socket
585 /// tx.connect(&rx_path)?;
586 ///
587 /// // Send to the bound socket
588 /// let bytes = b"hello world";
589 /// tx.send(bytes).await?;
590 ///
591 /// let mut buf = vec![0u8; 24];
592 /// let (size, addr) = rx.recv_from(&mut buf).await?;
593 ///
594 /// let dgram = &buf[..size];
595 /// assert_eq!(dgram, bytes);
596 ///
597 /// # Ok(())
598 /// # }
599 /// ```
600 pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
601 self.io.connect(path)
602 }
603
604 /// Sends data on the socket to the socket's peer.
605 ///
606 /// # Cancel safety
607 ///
608 /// This method is cancel safe. If `send` is used as the event in a
609 /// [`tokio::select!`](crate::select) statement and some other branch
610 /// completes first, then it is guaranteed that the message was not sent.
611 ///
612 /// # Examples
613 /// ```
614 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
615 /// # use std::error::Error;
616 /// # #[tokio::main]
617 /// # async fn main() -> Result<(), Box<dyn Error>> {
618 /// use tokio::net::UnixDatagram;
619 ///
620 /// // Create the pair of sockets
621 /// let (sock1, sock2) = UnixDatagram::pair()?;
622 ///
623 /// // Since the sockets are paired, the paired send/recv
624 /// // functions can be used
625 /// let bytes = b"hello world";
626 /// sock1.send(bytes).await?;
627 ///
628 /// let mut buff = vec![0u8; 24];
629 /// let size = sock2.recv(&mut buff).await?;
630 ///
631 /// let dgram = &buff[..size];
632 /// assert_eq!(dgram, bytes);
633 ///
634 /// # Ok(())
635 /// # }
636 /// ```
637 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
638 self.io
639 .registration()
640 .async_io(Interest::WRITABLE, || self.io.send(buf))
641 .await
642 }
643
644 /// Tries to send a datagram to the peer without waiting.
645 ///
646 /// # Examples
647 ///
648 /// ```no_run
649 /// use tokio::net::UnixDatagram;
650 /// use std::io;
651 ///
652 /// #[tokio::main]
653 /// async fn main() -> io::Result<()> {
654 /// let dir = tempfile::tempdir().unwrap();
655 /// let client_path = dir.path().join("client.sock");
656 /// let server_path = dir.path().join("server.sock");
657 /// let socket = UnixDatagram::bind(&client_path)?;
658 /// socket.connect(&server_path)?;
659 ///
660 /// loop {
661 /// // Wait for the socket to be writable
662 /// socket.writable().await?;
663 ///
664 /// // Try to send data, this may still fail with `WouldBlock`
665 /// // if the readiness event is a false positive.
666 /// match socket.try_send(b"hello world") {
667 /// Ok(n) => {
668 /// break;
669 /// }
670 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
671 /// continue;
672 /// }
673 /// Err(e) => {
674 /// return Err(e);
675 /// }
676 /// }
677 /// }
678 ///
679 /// Ok(())
680 /// }
681 /// ```
682 pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
683 self.io
684 .registration()
685 .try_io(Interest::WRITABLE, || self.io.send(buf))
686 }
687
688 /// Tries to send a datagram to the peer without waiting.
689 ///
690 /// # Examples
691 ///
692 /// ```no_run
693 /// use tokio::net::UnixDatagram;
694 /// use std::io;
695 ///
696 /// #[tokio::main]
697 /// async fn main() -> io::Result<()> {
698 /// let dir = tempfile::tempdir().unwrap();
699 /// let client_path = dir.path().join("client.sock");
700 /// let server_path = dir.path().join("server.sock");
701 /// let socket = UnixDatagram::bind(&client_path)?;
702 ///
703 /// loop {
704 /// // Wait for the socket to be writable
705 /// socket.writable().await?;
706 ///
707 /// // Try to send data, this may still fail with `WouldBlock`
708 /// // if the readiness event is a false positive.
709 /// match socket.try_send_to(b"hello world", &server_path) {
710 /// Ok(n) => {
711 /// break;
712 /// }
713 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
714 /// continue;
715 /// }
716 /// Err(e) => {
717 /// return Err(e);
718 /// }
719 /// }
720 /// }
721 ///
722 /// Ok(())
723 /// }
724 /// ```
725 pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
726 where
727 P: AsRef<Path>,
728 {
729 self.io
730 .registration()
731 .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
732 }
733
734 /// Receives data from the socket.
735 ///
736 /// # Cancel safety
737 ///
738 /// This method is cancel safe. If `recv` is used as the event in a
739 /// [`tokio::select!`](crate::select) statement and some other branch
740 /// completes first, it is guaranteed that no messages were received on this
741 /// socket.
742 ///
743 /// # Examples
744 /// ```
745 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
746 /// # use std::error::Error;
747 /// # #[tokio::main]
748 /// # async fn main() -> Result<(), Box<dyn Error>> {
749 /// use tokio::net::UnixDatagram;
750 ///
751 /// // Create the pair of sockets
752 /// let (sock1, sock2) = UnixDatagram::pair()?;
753 ///
754 /// // Since the sockets are paired, the paired send/recv
755 /// // functions can be used
756 /// let bytes = b"hello world";
757 /// sock1.send(bytes).await?;
758 ///
759 /// let mut buff = vec![0u8; 24];
760 /// let size = sock2.recv(&mut buff).await?;
761 ///
762 /// let dgram = &buff[..size];
763 /// assert_eq!(dgram, bytes);
764 ///
765 /// # Ok(())
766 /// # }
767 /// ```
768 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
769 self.io
770 .registration()
771 .async_io(Interest::READABLE, || self.io.recv(buf))
772 .await
773 }
774
775 /// Tries to receive a datagram from the peer without waiting.
776 ///
777 /// # Examples
778 ///
779 /// ```no_run
780 /// use tokio::net::UnixDatagram;
781 /// use std::io;
782 ///
783 /// #[tokio::main]
784 /// async fn main() -> io::Result<()> {
785 /// // Connect to a peer
786 /// let dir = tempfile::tempdir().unwrap();
787 /// let client_path = dir.path().join("client.sock");
788 /// let server_path = dir.path().join("server.sock");
789 /// let socket = UnixDatagram::bind(&client_path)?;
790 /// socket.connect(&server_path)?;
791 ///
792 /// loop {
793 /// // Wait for the socket to be readable
794 /// socket.readable().await?;
795 ///
796 /// // The buffer is **not** included in the async task and will
797 /// // only exist on the stack.
798 /// let mut buf = [0; 1024];
799 ///
800 /// // Try to recv data, this may still fail with `WouldBlock`
801 /// // if the readiness event is a false positive.
802 /// match socket.try_recv(&mut buf) {
803 /// Ok(n) => {
804 /// println!("GOT {:?}", &buf[..n]);
805 /// break;
806 /// }
807 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
808 /// continue;
809 /// }
810 /// Err(e) => {
811 /// return Err(e);
812 /// }
813 /// }
814 /// }
815 ///
816 /// Ok(())
817 /// }
818 /// ```
819 pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
820 self.io
821 .registration()
822 .try_io(Interest::READABLE, || self.io.recv(buf))
823 }
824
825 cfg_io_util! {
826 /// Tries to receive data from the socket without waiting.
827 ///
828 /// This method can be used even if `buf` is uninitialized.
829 ///
830 /// # Examples
831 ///
832 /// ```no_run
833 /// use tokio::net::UnixDatagram;
834 /// use std::io;
835 ///
836 /// #[tokio::main]
837 /// async fn main() -> io::Result<()> {
838 /// // Connect to a peer
839 /// let dir = tempfile::tempdir().unwrap();
840 /// let client_path = dir.path().join("client.sock");
841 /// let server_path = dir.path().join("server.sock");
842 /// let socket = UnixDatagram::bind(&client_path)?;
843 ///
844 /// loop {
845 /// // Wait for the socket to be readable
846 /// socket.readable().await?;
847 ///
848 /// let mut buf = Vec::with_capacity(1024);
849 ///
850 /// // Try to recv data, this may still fail with `WouldBlock`
851 /// // if the readiness event is a false positive.
852 /// match socket.try_recv_buf_from(&mut buf) {
853 /// Ok((n, _addr)) => {
854 /// println!("GOT {:?}", &buf[..n]);
855 /// break;
856 /// }
857 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
858 /// continue;
859 /// }
860 /// Err(e) => {
861 /// return Err(e);
862 /// }
863 /// }
864 /// }
865 ///
866 /// Ok(())
867 /// }
868 /// ```
869 pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
870 let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
871 let dst = buf.chunk_mut();
872 let dst =
873 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
874
875 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
876 // buffer.
877 let (n, addr) = (*self.io).recv_from(dst)?;
878
879 unsafe {
880 buf.advance_mut(n);
881 }
882
883 Ok((n, addr))
884 })?;
885
886 Ok((n, SocketAddr(addr)))
887 }
888
889 /// Receives from the socket, advances the
890 /// buffer's internal cursor and returns how many bytes were read and the origin.
891 ///
892 /// This method can be used even if `buf` is uninitialized.
893 ///
894 /// # Examples
895 /// ```
896 /// # if cfg!(miri) { return } // No `socket` in miri.
897 /// # use std::error::Error;
898 /// # #[tokio::main]
899 /// # async fn main() -> Result<(), Box<dyn Error>> {
900 /// use tokio::net::UnixDatagram;
901 /// use tempfile::tempdir;
902 ///
903 /// // We use a temporary directory so that the socket
904 /// // files left by the bound sockets will get cleaned up.
905 /// let tmp = tempdir()?;
906 ///
907 /// // Bind each socket to a filesystem path
908 /// let tx_path = tmp.path().join("tx");
909 /// let tx = UnixDatagram::bind(&tx_path)?;
910 /// let rx_path = tmp.path().join("rx");
911 /// let rx = UnixDatagram::bind(&rx_path)?;
912 ///
913 /// let bytes = b"hello world";
914 /// tx.send_to(bytes, &rx_path).await?;
915 ///
916 /// let mut buf = Vec::with_capacity(24);
917 /// let (size, addr) = rx.recv_buf_from(&mut buf).await?;
918 ///
919 /// let dgram = &buf[..size];
920 /// assert_eq!(dgram, bytes);
921 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
922 ///
923 /// # Ok(())
924 /// # }
925 /// ```
926 pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
927 self.io.registration().async_io(Interest::READABLE, || {
928 let dst = buf.chunk_mut();
929 let dst =
930 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
931
932 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
933 // buffer.
934 let (n, addr) = (*self.io).recv_from(dst)?;
935
936 unsafe {
937 buf.advance_mut(n);
938 }
939 Ok((n,SocketAddr(addr)))
940 }).await
941 }
942
943 /// Tries to read data from the stream into the provided buffer, advancing the
944 /// buffer's internal cursor, returning how many bytes were read.
945 ///
946 /// This method can be used even if `buf` is uninitialized.
947 ///
948 /// # Examples
949 ///
950 /// ```no_run
951 /// use tokio::net::UnixDatagram;
952 /// use std::io;
953 ///
954 /// #[tokio::main]
955 /// async fn main() -> io::Result<()> {
956 /// // Connect to a peer
957 /// let dir = tempfile::tempdir().unwrap();
958 /// let client_path = dir.path().join("client.sock");
959 /// let server_path = dir.path().join("server.sock");
960 /// let socket = UnixDatagram::bind(&client_path)?;
961 /// socket.connect(&server_path)?;
962 ///
963 /// loop {
964 /// // Wait for the socket to be readable
965 /// socket.readable().await?;
966 ///
967 /// let mut buf = Vec::with_capacity(1024);
968 ///
969 /// // Try to recv data, this may still fail with `WouldBlock`
970 /// // if the readiness event is a false positive.
971 /// match socket.try_recv_buf(&mut buf) {
972 /// Ok(n) => {
973 /// println!("GOT {:?}", &buf[..n]);
974 /// break;
975 /// }
976 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
977 /// continue;
978 /// }
979 /// Err(e) => {
980 /// return Err(e);
981 /// }
982 /// }
983 /// }
984 ///
985 /// Ok(())
986 /// }
987 /// ```
988 pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
989 self.io.registration().try_io(Interest::READABLE, || {
990 let dst = buf.chunk_mut();
991 let dst =
992 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
993
994 // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
995 // buffer.
996 let n = (*self.io).recv(dst)?;
997
998 unsafe {
999 buf.advance_mut(n);
1000 }
1001
1002 Ok(n)
1003 })
1004 }
1005
1006 /// Receives data from the socket from the address to which it is connected,
1007 /// advancing the buffer's internal cursor, returning how many bytes were read.
1008 ///
1009 /// This method can be used even if `buf` is uninitialized.
1010 ///
1011 /// # Examples
1012 /// ```
1013 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1014 /// # use std::error::Error;
1015 /// # #[tokio::main]
1016 /// # async fn main() -> Result<(), Box<dyn Error>> {
1017 /// use tokio::net::UnixDatagram;
1018 ///
1019 /// // Create the pair of sockets
1020 /// let (sock1, sock2) = UnixDatagram::pair()?;
1021 ///
1022 /// // Since the sockets are paired, the paired send/recv
1023 /// // functions can be used
1024 /// let bytes = b"hello world";
1025 /// sock1.send(bytes).await?;
1026 ///
1027 /// let mut buff = Vec::with_capacity(24);
1028 /// let size = sock2.recv_buf(&mut buff).await?;
1029 ///
1030 /// let dgram = &buff[..size];
1031 /// assert_eq!(dgram, bytes);
1032 ///
1033 /// # Ok(())
1034 /// # }
1035 /// ```
1036 pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1037 self.io.registration().async_io(Interest::READABLE, || {
1038 let dst = buf.chunk_mut();
1039 let dst =
1040 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1041
1042 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
1043 // buffer.
1044 let n = (*self.io).recv(dst)?;
1045
1046 unsafe {
1047 buf.advance_mut(n);
1048 }
1049 Ok(n)
1050 }).await
1051 }
1052 }
1053
1054 /// Sends data on the socket to the specified address.
1055 ///
1056 /// # Cancel safety
1057 ///
1058 /// This method is cancel safe. If `send_to` is used as the event in a
1059 /// [`tokio::select!`](crate::select) statement and some other branch
1060 /// completes first, then it is guaranteed that the message was not sent.
1061 ///
1062 /// # Examples
1063 /// ```
1064 /// # if cfg!(miri) { return } // No `socket` in miri.
1065 /// # use std::error::Error;
1066 /// # #[tokio::main]
1067 /// # async fn main() -> Result<(), Box<dyn Error>> {
1068 /// use tokio::net::UnixDatagram;
1069 /// use tempfile::tempdir;
1070 ///
1071 /// // We use a temporary directory so that the socket
1072 /// // files left by the bound sockets will get cleaned up.
1073 /// let tmp = tempdir()?;
1074 ///
1075 /// // Bind each socket to a filesystem path
1076 /// let tx_path = tmp.path().join("tx");
1077 /// let tx = UnixDatagram::bind(&tx_path)?;
1078 /// let rx_path = tmp.path().join("rx");
1079 /// let rx = UnixDatagram::bind(&rx_path)?;
1080 ///
1081 /// let bytes = b"hello world";
1082 /// tx.send_to(bytes, &rx_path).await?;
1083 ///
1084 /// let mut buf = vec![0u8; 24];
1085 /// let (size, addr) = rx.recv_from(&mut buf).await?;
1086 ///
1087 /// let dgram = &buf[..size];
1088 /// assert_eq!(dgram, bytes);
1089 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1090 ///
1091 /// # Ok(())
1092 /// # }
1093 /// ```
1094 pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
1095 where
1096 P: AsRef<Path>,
1097 {
1098 self.io
1099 .registration()
1100 .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref()))
1101 .await
1102 }
1103
1104 /// Receives data from the socket.
1105 ///
1106 /// # Cancel safety
1107 ///
1108 /// This method is cancel safe. If `recv_from` is used as the event in a
1109 /// [`tokio::select!`](crate::select) statement and some other branch
1110 /// completes first, it is guaranteed that no messages were received on this
1111 /// socket.
1112 ///
1113 /// # Examples
1114 /// ```
1115 /// # if cfg!(miri) { return } // No `socket` in miri.
1116 /// # use std::error::Error;
1117 /// # #[tokio::main]
1118 /// # async fn main() -> Result<(), Box<dyn Error>> {
1119 /// use tokio::net::UnixDatagram;
1120 /// use tempfile::tempdir;
1121 ///
1122 /// // We use a temporary directory so that the socket
1123 /// // files left by the bound sockets will get cleaned up.
1124 /// let tmp = tempdir()?;
1125 ///
1126 /// // Bind each socket to a filesystem path
1127 /// let tx_path = tmp.path().join("tx");
1128 /// let tx = UnixDatagram::bind(&tx_path)?;
1129 /// let rx_path = tmp.path().join("rx");
1130 /// let rx = UnixDatagram::bind(&rx_path)?;
1131 ///
1132 /// let bytes = b"hello world";
1133 /// tx.send_to(bytes, &rx_path).await?;
1134 ///
1135 /// let mut buf = vec![0u8; 24];
1136 /// let (size, addr) = rx.recv_from(&mut buf).await?;
1137 ///
1138 /// let dgram = &buf[..size];
1139 /// assert_eq!(dgram, bytes);
1140 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1141 ///
1142 /// # Ok(())
1143 /// # }
1144 /// ```
1145 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1146 let (n, addr) = self
1147 .io
1148 .registration()
1149 .async_io(Interest::READABLE, || self.io.recv_from(buf))
1150 .await?;
1151
1152 Ok((n, SocketAddr(addr)))
1153 }
1154
1155 /// Attempts to receive a single datagram on the specified address.
1156 ///
1157 /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1158 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1159 /// receive a wakeup.
1160 ///
1161 /// # Return value
1162 ///
1163 /// The function returns:
1164 ///
1165 /// * `Poll::Pending` if the socket is not ready to read
1166 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1167 /// * `Poll::Ready(Err(e))` if an error is encountered.
1168 ///
1169 /// # Errors
1170 ///
1171 /// This function may encounter any standard I/O error except `WouldBlock`.
1172 pub fn poll_recv_from(
1173 &self,
1174 cx: &mut Context<'_>,
1175 buf: &mut ReadBuf<'_>,
1176 ) -> Poll<io::Result<SocketAddr>> {
1177 #[allow(clippy::blocks_in_conditions)]
1178 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1179 // Safety: will not read the maybe uninitialized bytes.
1180 let b = unsafe {
1181 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1182 };
1183
1184 self.io.recv_from(b)
1185 }))?;
1186
1187 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1188 unsafe {
1189 buf.assume_init(n);
1190 }
1191 buf.advance(n);
1192 Poll::Ready(Ok(SocketAddr(addr)))
1193 }
1194
1195 /// Attempts to send data to the specified address.
1196 ///
1197 /// Note that on multiple calls to a `poll_*` method in the send direction, only the
1198 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1199 /// receive a wakeup.
1200 ///
1201 /// # Return value
1202 ///
1203 /// The function returns:
1204 ///
1205 /// * `Poll::Pending` if the socket is not ready to write
1206 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
1207 /// * `Poll::Ready(Err(e))` if an error is encountered.
1208 ///
1209 /// # Errors
1210 ///
1211 /// This function may encounter any standard I/O error except `WouldBlock`.
1212 pub fn poll_send_to<P>(
1213 &self,
1214 cx: &mut Context<'_>,
1215 buf: &[u8],
1216 target: P,
1217 ) -> Poll<io::Result<usize>>
1218 where
1219 P: AsRef<Path>,
1220 {
1221 self.io
1222 .registration()
1223 .poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
1224 }
1225
1226 /// Attempts to send data on the socket to the remote address to which it
1227 /// was previously `connect`ed.
1228 ///
1229 /// The [`connect`] method will connect this socket to a remote address.
1230 /// This method will fail if the socket is not connected.
1231 ///
1232 /// Note that on multiple calls to a `poll_*` method in the send direction,
1233 /// only the `Waker` from the `Context` passed to the most recent call will
1234 /// be scheduled to receive a wakeup.
1235 ///
1236 /// # Return value
1237 ///
1238 /// The function returns:
1239 ///
1240 /// * `Poll::Pending` if the socket is not available to write
1241 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
1242 /// * `Poll::Ready(Err(e))` if an error is encountered.
1243 ///
1244 /// # Errors
1245 ///
1246 /// This function may encounter any standard I/O error except `WouldBlock`.
1247 ///
1248 /// [`connect`]: method@Self::connect
1249 pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
1250 self.io
1251 .registration()
1252 .poll_write_io(cx, || self.io.send(buf))
1253 }
1254
1255 /// Attempts to receive a single datagram message on the socket from the remote
1256 /// address to which it is `connect`ed.
1257 ///
1258 /// The [`connect`] method will connect this socket to a remote address. This method
1259 /// resolves to an error if the socket is not connected.
1260 ///
1261 /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1262 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1263 /// receive a wakeup.
1264 ///
1265 /// # Return value
1266 ///
1267 /// The function returns:
1268 ///
1269 /// * `Poll::Pending` if the socket is not ready to read
1270 /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
1271 /// * `Poll::Ready(Err(e))` if an error is encountered.
1272 ///
1273 /// # Errors
1274 ///
1275 /// This function may encounter any standard I/O error except `WouldBlock`.
1276 ///
1277 /// [`connect`]: method@Self::connect
1278 pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
1279 #[allow(clippy::blocks_in_conditions)]
1280 let n = ready!(self.io.registration().poll_read_io(cx, || {
1281 // Safety: will not read the maybe uninitialized bytes.
1282 let b = unsafe {
1283 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1284 };
1285
1286 self.io.recv(b)
1287 }))?;
1288
1289 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1290 unsafe {
1291 buf.assume_init(n);
1292 }
1293 buf.advance(n);
1294 Poll::Ready(Ok(()))
1295 }
1296
1297 /// Tries to receive data from the socket without waiting.
1298 ///
1299 /// # Examples
1300 ///
1301 /// ```no_run
1302 /// use tokio::net::UnixDatagram;
1303 /// use std::io;
1304 ///
1305 /// #[tokio::main]
1306 /// async fn main() -> io::Result<()> {
1307 /// // Connect to a peer
1308 /// let dir = tempfile::tempdir().unwrap();
1309 /// let client_path = dir.path().join("client.sock");
1310 /// let server_path = dir.path().join("server.sock");
1311 /// let socket = UnixDatagram::bind(&client_path)?;
1312 ///
1313 /// loop {
1314 /// // Wait for the socket to be readable
1315 /// socket.readable().await?;
1316 ///
1317 /// // The buffer is **not** included in the async task and will
1318 /// // only exist on the stack.
1319 /// let mut buf = [0; 1024];
1320 ///
1321 /// // Try to recv data, this may still fail with `WouldBlock`
1322 /// // if the readiness event is a false positive.
1323 /// match socket.try_recv_from(&mut buf) {
1324 /// Ok((n, _addr)) => {
1325 /// println!("GOT {:?}", &buf[..n]);
1326 /// break;
1327 /// }
1328 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1329 /// continue;
1330 /// }
1331 /// Err(e) => {
1332 /// return Err(e);
1333 /// }
1334 /// }
1335 /// }
1336 ///
1337 /// Ok(())
1338 /// }
1339 /// ```
1340 pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1341 let (n, addr) = self
1342 .io
1343 .registration()
1344 .try_io(Interest::READABLE, || self.io.recv_from(buf))?;
1345
1346 Ok((n, SocketAddr(addr)))
1347 }
1348
1349 /// Tries to read or write from the socket using a user-provided IO operation.
1350 ///
1351 /// If the socket is ready, the provided closure is called. The closure
1352 /// should attempt to perform IO operation on the socket by manually
1353 /// calling the appropriate syscall. If the operation fails because the
1354 /// socket is not actually ready, then the closure should return a
1355 /// `WouldBlock` error and the readiness flag is cleared. The return value
1356 /// of the closure is then returned by `try_io`.
1357 ///
1358 /// If the socket is not ready, then the closure is not called
1359 /// and a `WouldBlock` error is returned.
1360 ///
1361 /// The closure should only return a `WouldBlock` error if it has performed
1362 /// an IO operation on the socket that failed due to the socket not being
1363 /// ready. Returning a `WouldBlock` error in any other situation will
1364 /// incorrectly clear the readiness flag, which can cause the socket to
1365 /// behave incorrectly.
1366 ///
1367 /// The closure should not perform the IO operation using any of the methods
1368 /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1369 /// readiness flag and can cause the socket to behave incorrectly.
1370 ///
1371 /// This method is not intended to be used with combined interests.
1372 /// The closure should perform only one type of IO operation, so it should not
1373 /// require more than one ready state. This method may panic or sleep forever
1374 /// if it is called with a combined interest.
1375 ///
1376 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1377 ///
1378 /// [`readable()`]: UnixDatagram::readable()
1379 /// [`writable()`]: UnixDatagram::writable()
1380 /// [`ready()`]: UnixDatagram::ready()
1381 pub fn try_io<R>(
1382 &self,
1383 interest: Interest,
1384 f: impl FnOnce() -> io::Result<R>,
1385 ) -> io::Result<R> {
1386 self.io
1387 .registration()
1388 .try_io(interest, || self.io.try_io(f))
1389 }
1390
1391 /// Reads or writes from the socket using a user-provided IO operation.
1392 ///
1393 /// The readiness of the socket is awaited and when the socket is ready,
1394 /// the provided closure is called. The closure should attempt to perform
1395 /// IO operation on the socket by manually calling the appropriate syscall.
1396 /// If the operation fails because the socket is not actually ready,
1397 /// then the closure should return a `WouldBlock` error. In such case the
1398 /// readiness flag is cleared and the socket readiness is awaited again.
1399 /// This loop is repeated until the closure returns an `Ok` or an error
1400 /// other than `WouldBlock`.
1401 ///
1402 /// The closure should only return a `WouldBlock` error if it has performed
1403 /// an IO operation on the socket that failed due to the socket not being
1404 /// ready. Returning a `WouldBlock` error in any other situation will
1405 /// incorrectly clear the readiness flag, which can cause the socket to
1406 /// behave incorrectly.
1407 ///
1408 /// The closure should not perform the IO operation using any of the methods
1409 /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1410 /// readiness flag and can cause the socket to behave incorrectly.
1411 ///
1412 /// This method is not intended to be used with combined interests.
1413 /// The closure should perform only one type of IO operation, so it should not
1414 /// require more than one ready state. This method may panic or sleep forever
1415 /// if it is called with a combined interest.
1416 pub async fn async_io<R>(
1417 &self,
1418 interest: Interest,
1419 mut f: impl FnMut() -> io::Result<R>,
1420 ) -> io::Result<R> {
1421 self.io
1422 .registration()
1423 .async_io(interest, || self.io.try_io(&mut f))
1424 .await
1425 }
1426
1427 /// Returns the local address that this socket is bound to.
1428 ///
1429 /// # Examples
1430 /// For a socket bound to a local path
1431 /// ```
1432 /// # if cfg!(miri) { return } // No `socket` in miri.
1433 /// # use std::error::Error;
1434 /// # #[tokio::main]
1435 /// # async fn main() -> Result<(), Box<dyn Error>> {
1436 /// use tokio::net::UnixDatagram;
1437 /// use tempfile::tempdir;
1438 ///
1439 /// // We use a temporary directory so that the socket
1440 /// // files left by the bound sockets will get cleaned up.
1441 /// let tmp = tempdir()?;
1442 ///
1443 /// // Bind socket to a filesystem path
1444 /// let socket_path = tmp.path().join("socket");
1445 /// let socket = UnixDatagram::bind(&socket_path)?;
1446 ///
1447 /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path);
1448 ///
1449 /// # Ok(())
1450 /// # }
1451 /// ```
1452 ///
1453 /// For an unbound socket
1454 /// ```
1455 /// # if cfg!(miri) { return } // No `socket` in miri.
1456 /// # use std::error::Error;
1457 /// # #[tokio::main]
1458 /// # async fn main() -> Result<(), Box<dyn Error>> {
1459 /// use tokio::net::UnixDatagram;
1460 ///
1461 /// // Create an unbound socket
1462 /// let socket = UnixDatagram::unbound()?;
1463 ///
1464 /// assert!(socket.local_addr()?.is_unnamed());
1465 ///
1466 /// # Ok(())
1467 /// # }
1468 /// ```
1469 pub fn local_addr(&self) -> io::Result<SocketAddr> {
1470 self.io.local_addr().map(SocketAddr)
1471 }
1472
1473 /// Returns the address of this socket's peer.
1474 ///
1475 /// The `connect` method will connect the socket to a peer.
1476 ///
1477 /// # Examples
1478 /// For a peer with a local path
1479 /// ```
1480 /// # if cfg!(miri) { return } // No `socket` in miri.
1481 /// # use std::error::Error;
1482 /// # #[tokio::main]
1483 /// # async fn main() -> Result<(), Box<dyn Error>> {
1484 /// use tokio::net::UnixDatagram;
1485 /// use tempfile::tempdir;
1486 ///
1487 /// // Create an unbound socket
1488 /// let tx = UnixDatagram::unbound()?;
1489 ///
1490 /// // Create another, bound socket
1491 /// let tmp = tempdir()?;
1492 /// let rx_path = tmp.path().join("rx");
1493 /// let rx = UnixDatagram::bind(&rx_path)?;
1494 ///
1495 /// // Connect to the bound socket
1496 /// tx.connect(&rx_path)?;
1497 ///
1498 /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path);
1499 ///
1500 /// # Ok(())
1501 /// # }
1502 /// ```
1503 ///
1504 /// For an unbound peer
1505 /// ```
1506 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1507 /// # use std::error::Error;
1508 /// # #[tokio::main]
1509 /// # async fn main() -> Result<(), Box<dyn Error>> {
1510 /// use tokio::net::UnixDatagram;
1511 ///
1512 /// // Create the pair of sockets
1513 /// let (sock1, sock2) = UnixDatagram::pair()?;
1514 ///
1515 /// assert!(sock1.peer_addr()?.is_unnamed());
1516 ///
1517 /// # Ok(())
1518 /// # }
1519 /// ```
1520 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
1521 self.io.peer_addr().map(SocketAddr)
1522 }
1523
1524 /// Returns the value of the `SO_ERROR` option.
1525 ///
1526 /// # Examples
1527 /// ```
1528 /// # if cfg!(miri) { return } // No `socket` in miri.
1529 /// # use std::error::Error;
1530 /// # #[tokio::main]
1531 /// # async fn main() -> Result<(), Box<dyn Error>> {
1532 /// use tokio::net::UnixDatagram;
1533 ///
1534 /// // Create an unbound socket
1535 /// let socket = UnixDatagram::unbound()?;
1536 ///
1537 /// if let Ok(Some(err)) = socket.take_error() {
1538 /// println!("Got error: {:?}", err);
1539 /// }
1540 ///
1541 /// # Ok(())
1542 /// # }
1543 /// ```
1544 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1545 self.io.take_error()
1546 }
1547
1548 /// Shuts down the read, write, or both halves of this connection.
1549 ///
1550 /// This function will cause all pending and future I/O calls on the
1551 /// specified portions to immediately return with an appropriate value
1552 /// (see the documentation of `Shutdown`).
1553 ///
1554 /// # Examples
1555 /// ```
1556 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1557 /// # use std::error::Error;
1558 /// # #[tokio::main]
1559 /// # async fn main() -> Result<(), Box<dyn Error>> {
1560 /// use tokio::net::UnixDatagram;
1561 /// use std::net::Shutdown;
1562 ///
1563 /// // Create an unbound socket
1564 /// let (socket, other) = UnixDatagram::pair()?;
1565 ///
1566 /// socket.shutdown(Shutdown::Both)?;
1567 ///
1568 /// // NOTE: the following commented out code does NOT work as expected.
1569 /// // Due to an underlying issue, the recv call will block indefinitely.
1570 /// // See: https://github.com/tokio-rs/tokio/issues/1679
1571 /// //let mut buff = vec![0u8; 24];
1572 /// //let size = socket.recv(&mut buff).await?;
1573 /// //assert_eq!(size, 0);
1574 ///
1575 /// let send_result = socket.send(b"hello world").await;
1576 /// assert!(send_result.is_err());
1577 ///
1578 /// # Ok(())
1579 /// # }
1580 /// ```
1581 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
1582 self.io.shutdown(how)
1583 }
1584}
1585
1586impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
1587 type Error = io::Error;
1588
1589 /// Consumes stream, returning the Tokio I/O object.
1590 ///
1591 /// This is equivalent to
1592 /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
1593 fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> {
1594 Self::from_std(stream)
1595 }
1596}
1597
1598impl fmt::Debug for UnixDatagram {
1599 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1600 self.io.fmt(f)
1601 }
1602}
1603
1604impl AsRawFd for UnixDatagram {
1605 fn as_raw_fd(&self) -> RawFd {
1606 self.io.as_raw_fd()
1607 }
1608}
1609
1610impl AsFd for UnixDatagram {
1611 fn as_fd(&self) -> BorrowedFd<'_> {
1612 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1613 }
1614}