tokio/net/unix/datagram/
socket.rs

1use crate::io::{Interest, PollEvented, ReadBuf, Ready};
2use crate::net::unix::SocketAddr;
3
4use std::fmt;
5use std::io;
6use std::net::Shutdown;
7use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
8use std::os::unix::net;
9use std::path::Path;
10use std::task::{ready, Context, Poll};
11
12cfg_io_util! {
13    use bytes::BufMut;
14}
15
16cfg_net_unix! {
17    /// An I/O object representing a Unix datagram socket.
18    ///
19    /// A socket can be either named (associated with a filesystem path) or
20    /// unnamed.
21    ///
22    /// This type does not provide a `split` method, because this functionality
23    /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
24    /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>`
25    /// is enough. This is because all of the methods take `&self` instead of
26    /// `&mut self`.
27    ///
28    /// **Note:** named sockets are persisted even after the object is dropped
29    /// and the program has exited, and cannot be reconnected. It is advised
30    /// that you either check for and unlink the existing socket if it exists,
31    /// or use a temporary file that is guaranteed to not already exist.
32    ///
33    /// [`Arc`]: std::sync::Arc
34    ///
35    /// # Examples
36    /// Using named sockets, associated with a filesystem path:
37    /// ```
38    /// # if cfg!(miri) { return } // No `socket` in miri.
39    /// # use std::error::Error;
40    /// # #[tokio::main]
41    /// # async fn main() -> Result<(), Box<dyn Error>> {
42    /// use tokio::net::UnixDatagram;
43    /// use tempfile::tempdir;
44    ///
45    /// // We use a temporary directory so that the socket
46    /// // files left by the bound sockets will get cleaned up.
47    /// let tmp = tempdir()?;
48    ///
49    /// // Bind each socket to a filesystem path
50    /// let tx_path = tmp.path().join("tx");
51    /// let tx = UnixDatagram::bind(&tx_path)?;
52    /// let rx_path = tmp.path().join("rx");
53    /// let rx = UnixDatagram::bind(&rx_path)?;
54    ///
55    /// let bytes = b"hello world";
56    /// tx.send_to(bytes, &rx_path).await?;
57    ///
58    /// let mut buf = vec![0u8; 24];
59    /// let (size, addr) = rx.recv_from(&mut buf).await?;
60    ///
61    /// let dgram = &buf[..size];
62    /// assert_eq!(dgram, bytes);
63    /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
64    ///
65    /// # Ok(())
66    /// # }
67    /// ```
68    ///
69    /// Using unnamed sockets, created as a pair
70    /// ```
71    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
72    /// # use std::error::Error;
73    /// # #[tokio::main]
74    /// # async fn main() -> Result<(), Box<dyn Error>> {
75    /// use tokio::net::UnixDatagram;
76    ///
77    /// // Create the pair of sockets
78    /// let (sock1, sock2) = UnixDatagram::pair()?;
79    ///
80    /// // Since the sockets are paired, the paired send/recv
81    /// // functions can be used
82    /// let bytes = b"hello world";
83    /// sock1.send(bytes).await?;
84    ///
85    /// let mut buff = vec![0u8; 24];
86    /// let size = sock2.recv(&mut buff).await?;
87    ///
88    /// let dgram = &buff[..size];
89    /// assert_eq!(dgram, bytes);
90    ///
91    /// # Ok(())
92    /// # }
93    /// ```
94    #[cfg_attr(docsrs, doc(alias = "uds"))]
95    pub struct UnixDatagram {
96        io: PollEvented<mio::net::UnixDatagram>,
97    }
98}
99
100impl UnixDatagram {
101    pub(crate) fn from_mio(sys: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
102        let datagram = UnixDatagram::new(sys)?;
103
104        if let Some(e) = datagram.io.take_error()? {
105            return Err(e);
106        }
107
108        Ok(datagram)
109    }
110
111    /// Waits for any of the requested ready states.
112    ///
113    /// This function is usually paired with `try_recv()` or `try_send()`. It
114    /// can be used to concurrently `recv` / `send` to the same socket on a single
115    /// task without splitting the socket.
116    ///
117    /// The function may complete without the socket being ready. This is a
118    /// false-positive and attempting an operation will return with
119    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
120    /// [`Ready`] set, so you should always check the returned value and possibly
121    /// wait again if the requested states are not set.
122    ///
123    /// # Cancel safety
124    ///
125    /// This method is cancel safe. Once a readiness event occurs, the method
126    /// will continue to return immediately until the readiness event is
127    /// consumed by an attempt to read or write that fails with `WouldBlock` or
128    /// `Poll::Pending`.
129    ///
130    /// # Examples
131    ///
132    /// Concurrently receive from and send to the socket on the same task
133    /// without splitting.
134    ///
135    /// ```no_run
136    /// use tokio::io::Interest;
137    /// use tokio::net::UnixDatagram;
138    /// use std::io;
139    ///
140    /// #[tokio::main]
141    /// async fn main() -> io::Result<()> {
142    ///     let dir = tempfile::tempdir().unwrap();
143    ///     let client_path = dir.path().join("client.sock");
144    ///     let server_path = dir.path().join("server.sock");
145    ///     let socket = UnixDatagram::bind(&client_path)?;
146    ///     socket.connect(&server_path)?;
147    ///
148    ///     loop {
149    ///         let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
150    ///
151    ///         if ready.is_readable() {
152    ///             let mut data = [0; 1024];
153    ///             match socket.try_recv(&mut data[..]) {
154    ///                 Ok(n) => {
155    ///                     println!("received {:?}", &data[..n]);
156    ///                 }
157    ///                 // False-positive, continue
158    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
159    ///                 Err(e) => {
160    ///                     return Err(e);
161    ///                 }
162    ///             }
163    ///         }
164    ///
165    ///         if ready.is_writable() {
166    ///             // Write some data
167    ///             match socket.try_send(b"hello world") {
168    ///                 Ok(n) => {
169    ///                     println!("sent {} bytes", n);
170    ///                 }
171    ///                 // False-positive, continue
172    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
173    ///                 Err(e) => {
174    ///                     return Err(e);
175    ///                 }
176    ///             }
177    ///         }
178    ///     }
179    /// }
180    /// ```
181    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
182        let event = self.io.registration().readiness(interest).await?;
183        Ok(event.ready)
184    }
185
186    /// Waits for the socket to become writable.
187    ///
188    /// This function is equivalent to `ready(Interest::WRITABLE)` and is
189    /// usually paired with `try_send()` or `try_send_to()`.
190    ///
191    /// The function may complete without the socket being writable. This is a
192    /// false-positive and attempting a `try_send()` will return with
193    /// `io::ErrorKind::WouldBlock`.
194    ///
195    /// # Cancel safety
196    ///
197    /// This method is cancel safe. Once a readiness event occurs, the method
198    /// will continue to return immediately until the readiness event is
199    /// consumed by an attempt to write that fails with `WouldBlock` or
200    /// `Poll::Pending`.
201    ///
202    /// # Examples
203    ///
204    /// ```no_run
205    /// use tokio::net::UnixDatagram;
206    /// use std::io;
207    ///
208    /// #[tokio::main]
209    /// async fn main() -> io::Result<()> {
210    ///     let dir = tempfile::tempdir().unwrap();
211    ///     let client_path = dir.path().join("client.sock");
212    ///     let server_path = dir.path().join("server.sock");
213    ///     let socket = UnixDatagram::bind(&client_path)?;
214    ///     socket.connect(&server_path)?;
215    ///
216    ///     loop {
217    ///         // Wait for the socket to be writable
218    ///         socket.writable().await?;
219    ///
220    ///         // Try to send data, this may still fail with `WouldBlock`
221    ///         // if the readiness event is a false positive.
222    ///         match socket.try_send(b"hello world") {
223    ///             Ok(n) => {
224    ///                 break;
225    ///             }
226    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
227    ///                 continue;
228    ///             }
229    ///             Err(e) => {
230    ///                 return Err(e);
231    ///             }
232    ///         }
233    ///     }
234    ///
235    ///     Ok(())
236    /// }
237    /// ```
238    pub async fn writable(&self) -> io::Result<()> {
239        self.ready(Interest::WRITABLE).await?;
240        Ok(())
241    }
242
243    /// Polls for write/send readiness.
244    ///
245    /// If the socket is not currently ready for sending, this method will
246    /// store a clone of the `Waker` from the provided `Context`. When the socket
247    /// becomes ready for sending, `Waker::wake` will be called on the
248    /// waker.
249    ///
250    /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
251    /// the `Waker` from the `Context` passed to the most recent call is
252    /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
253    /// second, independent waker.)
254    ///
255    /// This function is intended for cases where creating and pinning a future
256    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
257    /// preferred, as this supports polling from multiple tasks at once.
258    ///
259    /// # Return value
260    ///
261    /// The function returns:
262    ///
263    /// * `Poll::Pending` if the socket is not ready for writing.
264    /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
265    /// * `Poll::Ready(Err(e))` if an error is encountered.
266    ///
267    /// # Errors
268    ///
269    /// This function may encounter any standard I/O error except `WouldBlock`.
270    ///
271    /// [`writable`]: method@Self::writable
272    pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
273        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
274    }
275
276    /// Waits for the socket to become readable.
277    ///
278    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
279    /// paired with `try_recv()`.
280    ///
281    /// The function may complete without the socket being readable. This is a
282    /// false-positive and attempting a `try_recv()` will return with
283    /// `io::ErrorKind::WouldBlock`.
284    ///
285    /// # Cancel safety
286    ///
287    /// This method is cancel safe. Once a readiness event occurs, the method
288    /// will continue to return immediately until the readiness event is
289    /// consumed by an attempt to read that fails with `WouldBlock` or
290    /// `Poll::Pending`.
291    ///
292    /// # Examples
293    ///
294    /// ```no_run
295    /// use tokio::net::UnixDatagram;
296    /// use std::io;
297    ///
298    /// #[tokio::main]
299    /// async fn main() -> io::Result<()> {
300    ///     // Connect to a peer
301    ///     let dir = tempfile::tempdir().unwrap();
302    ///     let client_path = dir.path().join("client.sock");
303    ///     let server_path = dir.path().join("server.sock");
304    ///     let socket = UnixDatagram::bind(&client_path)?;
305    ///     socket.connect(&server_path)?;
306    ///
307    ///     loop {
308    ///         // Wait for the socket to be readable
309    ///         socket.readable().await?;
310    ///
311    ///         // The buffer is **not** included in the async task and will
312    ///         // only exist on the stack.
313    ///         let mut buf = [0; 1024];
314    ///
315    ///         // Try to recv data, this may still fail with `WouldBlock`
316    ///         // if the readiness event is a false positive.
317    ///         match socket.try_recv(&mut buf) {
318    ///             Ok(n) => {
319    ///                 println!("GOT {:?}", &buf[..n]);
320    ///                 break;
321    ///             }
322    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
323    ///                 continue;
324    ///             }
325    ///             Err(e) => {
326    ///                 return Err(e);
327    ///             }
328    ///         }
329    ///     }
330    ///
331    ///     Ok(())
332    /// }
333    /// ```
334    pub async fn readable(&self) -> io::Result<()> {
335        self.ready(Interest::READABLE).await?;
336        Ok(())
337    }
338
339    /// Polls for read/receive readiness.
340    ///
341    /// If the socket is not currently ready for receiving, this method will
342    /// store a clone of the `Waker` from the provided `Context`. When the
343    /// socket becomes ready for reading, `Waker::wake` will be called on the
344    /// waker.
345    ///
346    /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
347    /// `poll_peek`, only the `Waker` from the `Context` passed to the most
348    /// recent call is scheduled to receive a wakeup. (However,
349    /// `poll_send_ready` retains a second, independent waker.)
350    ///
351    /// This function is intended for cases where creating and pinning a future
352    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
353    /// preferred, as this supports polling from multiple tasks at once.
354    ///
355    /// # Return value
356    ///
357    /// The function returns:
358    ///
359    /// * `Poll::Pending` if the socket is not ready for reading.
360    /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
361    /// * `Poll::Ready(Err(e))` if an error is encountered.
362    ///
363    /// # Errors
364    ///
365    /// This function may encounter any standard I/O error except `WouldBlock`.
366    ///
367    /// [`readable`]: method@Self::readable
368    pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
369        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
370    }
371
372    /// Creates a new `UnixDatagram` bound to the specified path.
373    ///
374    /// # Examples
375    /// ```
376    /// # if cfg!(miri) { return } // No `socket` in miri.
377    /// # use std::error::Error;
378    /// # #[tokio::main]
379    /// # async fn main() -> Result<(), Box<dyn Error>> {
380    /// use tokio::net::UnixDatagram;
381    /// use tempfile::tempdir;
382    ///
383    /// // We use a temporary directory so that the socket
384    /// // files left by the bound sockets will get cleaned up.
385    /// let tmp = tempdir()?;
386    ///
387    /// // Bind the socket to a filesystem path
388    /// let socket_path = tmp.path().join("socket");
389    /// let socket = UnixDatagram::bind(&socket_path)?;
390    ///
391    /// # Ok(())
392    /// # }
393    /// ```
394    pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
395    where
396        P: AsRef<Path>,
397    {
398        let socket = mio::net::UnixDatagram::bind(path)?;
399        UnixDatagram::new(socket)
400    }
401
402    /// Creates an unnamed pair of connected sockets.
403    ///
404    /// This function will create a pair of interconnected Unix sockets for
405    /// communicating back and forth between one another.
406    ///
407    /// # Examples
408    /// ```
409    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
410    /// # use std::error::Error;
411    /// # #[tokio::main]
412    /// # async fn main() -> Result<(), Box<dyn Error>> {
413    /// use tokio::net::UnixDatagram;
414    ///
415    /// // Create the pair of sockets
416    /// let (sock1, sock2) = UnixDatagram::pair()?;
417    ///
418    /// // Since the sockets are paired, the paired send/recv
419    /// // functions can be used
420    /// let bytes = b"hail eris";
421    /// sock1.send(bytes).await?;
422    ///
423    /// let mut buff = vec![0u8; 24];
424    /// let size = sock2.recv(&mut buff).await?;
425    ///
426    /// let dgram = &buff[..size];
427    /// assert_eq!(dgram, bytes);
428    ///
429    /// # Ok(())
430    /// # }
431    /// ```
432    pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
433        let (a, b) = mio::net::UnixDatagram::pair()?;
434        let a = UnixDatagram::new(a)?;
435        let b = UnixDatagram::new(b)?;
436
437        Ok((a, b))
438    }
439
440    /// Creates new [`UnixDatagram`] from a [`std::os::unix::net::UnixDatagram`].
441    ///
442    /// This function is intended to be used to wrap a `UnixDatagram` from the
443    /// standard library in the Tokio equivalent.
444    ///
445    /// # Notes
446    ///
447    /// The caller is responsible for ensuring that the socket is in
448    /// non-blocking mode. Otherwise all I/O operations on the socket
449    /// will block the thread, which will cause unexpected behavior.
450    /// Non-blocking mode can be set using [`set_nonblocking`].
451    ///
452    /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking
453    ///
454    /// # Panics
455    ///
456    /// This function panics if it is not called from within a runtime with
457    /// IO enabled.
458    ///
459    /// The runtime is usually set implicitly when this function is called
460    /// from a future driven by a Tokio runtime, otherwise runtime can be set
461    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
462    /// # Examples
463    /// ```
464    /// # if cfg!(miri) { return } // No `socket` in miri.
465    /// # use std::error::Error;
466    /// # #[tokio::main]
467    /// # async fn main() -> Result<(), Box<dyn Error>> {
468    /// use tokio::net::UnixDatagram;
469    /// use std::os::unix::net::UnixDatagram as StdUDS;
470    /// use tempfile::tempdir;
471    ///
472    /// // We use a temporary directory so that the socket
473    /// // files left by the bound sockets will get cleaned up.
474    /// let tmp = tempdir()?;
475    ///
476    /// // Bind the socket to a filesystem path
477    /// let socket_path = tmp.path().join("socket");
478    /// let std_socket = StdUDS::bind(&socket_path)?;
479    /// std_socket.set_nonblocking(true)?;
480    /// let tokio_socket = UnixDatagram::from_std(std_socket)?;
481    ///
482    /// # Ok(())
483    /// # }
484    /// ```
485    #[track_caller]
486    pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
487        let socket = mio::net::UnixDatagram::from_std(datagram);
488        let io = PollEvented::new(socket)?;
489        Ok(UnixDatagram { io })
490    }
491
492    /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
493    ///
494    /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
495    /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
496    /// if needed.
497    ///
498    /// # Examples
499    ///
500    /// ```rust,no_run
501    /// # use std::error::Error;
502    /// # async fn dox() -> Result<(), Box<dyn Error>> {
503    /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?;
504    /// let std_socket = tokio_socket.into_std()?;
505    /// std_socket.set_nonblocking(false)?;
506    /// # Ok(())
507    /// # }
508    /// ```
509    ///
510    /// [`tokio::net::UnixDatagram`]: UnixDatagram
511    /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram
512    /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking
513    pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> {
514        self.io
515            .into_inner()
516            .map(IntoRawFd::into_raw_fd)
517            .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) })
518    }
519
520    fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
521        let io = PollEvented::new(socket)?;
522        Ok(UnixDatagram { io })
523    }
524
525    /// Creates a new `UnixDatagram` which is not bound to any address.
526    ///
527    /// # Examples
528    /// ```
529    /// # if cfg!(miri) { return } // No `socket` in miri.
530    /// # use std::error::Error;
531    /// # #[tokio::main]
532    /// # async fn main() -> Result<(), Box<dyn Error>> {
533    /// use tokio::net::UnixDatagram;
534    /// use tempfile::tempdir;
535    ///
536    /// // Create an unbound socket
537    /// let tx = UnixDatagram::unbound()?;
538    ///
539    /// // Create another, bound socket
540    /// let tmp = tempdir()?;
541    /// let rx_path = tmp.path().join("rx");
542    /// let rx = UnixDatagram::bind(&rx_path)?;
543    ///
544    /// // Send to the bound socket
545    /// let bytes = b"hello world";
546    /// tx.send_to(bytes, &rx_path).await?;
547    ///
548    /// let mut buf = vec![0u8; 24];
549    /// let (size, addr) = rx.recv_from(&mut buf).await?;
550    ///
551    /// let dgram = &buf[..size];
552    /// assert_eq!(dgram, bytes);
553    ///
554    /// # Ok(())
555    /// # }
556    /// ```
557    pub fn unbound() -> io::Result<UnixDatagram> {
558        let socket = mio::net::UnixDatagram::unbound()?;
559        UnixDatagram::new(socket)
560    }
561
562    /// Connects the socket to the specified address.
563    ///
564    /// The `send` method may be used to send data to the specified address.
565    /// `recv` and `recv_from` will only receive data from that address.
566    ///
567    /// # Examples
568    /// ```
569    /// # if cfg!(miri) { return } // No `socket` in miri.
570    /// # use std::error::Error;
571    /// # #[tokio::main]
572    /// # async fn main() -> Result<(), Box<dyn Error>> {
573    /// use tokio::net::UnixDatagram;
574    /// use tempfile::tempdir;
575    ///
576    /// // Create an unbound socket
577    /// let tx = UnixDatagram::unbound()?;
578    ///
579    /// // Create another, bound socket
580    /// let tmp = tempdir()?;
581    /// let rx_path = tmp.path().join("rx");
582    /// let rx = UnixDatagram::bind(&rx_path)?;
583    ///
584    /// // Connect to the bound socket
585    /// tx.connect(&rx_path)?;
586    ///
587    /// // Send to the bound socket
588    /// let bytes = b"hello world";
589    /// tx.send(bytes).await?;
590    ///
591    /// let mut buf = vec![0u8; 24];
592    /// let (size, addr) = rx.recv_from(&mut buf).await?;
593    ///
594    /// let dgram = &buf[..size];
595    /// assert_eq!(dgram, bytes);
596    ///
597    /// # Ok(())
598    /// # }
599    /// ```
600    pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
601        self.io.connect(path)
602    }
603
604    /// Sends data on the socket to the socket's peer.
605    ///
606    /// # Cancel safety
607    ///
608    /// This method is cancel safe. If `send` is used as the event in a
609    /// [`tokio::select!`](crate::select) statement and some other branch
610    /// completes first, then it is guaranteed that the message was not sent.
611    ///
612    /// # Examples
613    /// ```
614    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
615    /// # use std::error::Error;
616    /// # #[tokio::main]
617    /// # async fn main() -> Result<(), Box<dyn Error>> {
618    /// use tokio::net::UnixDatagram;
619    ///
620    /// // Create the pair of sockets
621    /// let (sock1, sock2) = UnixDatagram::pair()?;
622    ///
623    /// // Since the sockets are paired, the paired send/recv
624    /// // functions can be used
625    /// let bytes = b"hello world";
626    /// sock1.send(bytes).await?;
627    ///
628    /// let mut buff = vec![0u8; 24];
629    /// let size = sock2.recv(&mut buff).await?;
630    ///
631    /// let dgram = &buff[..size];
632    /// assert_eq!(dgram, bytes);
633    ///
634    /// # Ok(())
635    /// # }
636    /// ```
637    pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
638        self.io
639            .registration()
640            .async_io(Interest::WRITABLE, || self.io.send(buf))
641            .await
642    }
643
644    /// Tries to send a datagram to the peer without waiting.
645    ///
646    /// # Examples
647    ///
648    /// ```no_run
649    /// use tokio::net::UnixDatagram;
650    /// use std::io;
651    ///
652    /// #[tokio::main]
653    /// async fn main() -> io::Result<()> {
654    ///     let dir = tempfile::tempdir().unwrap();
655    ///     let client_path = dir.path().join("client.sock");
656    ///     let server_path = dir.path().join("server.sock");
657    ///     let socket = UnixDatagram::bind(&client_path)?;
658    ///     socket.connect(&server_path)?;
659    ///
660    ///     loop {
661    ///         // Wait for the socket to be writable
662    ///         socket.writable().await?;
663    ///
664    ///         // Try to send data, this may still fail with `WouldBlock`
665    ///         // if the readiness event is a false positive.
666    ///         match socket.try_send(b"hello world") {
667    ///             Ok(n) => {
668    ///                 break;
669    ///             }
670    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
671    ///                 continue;
672    ///             }
673    ///             Err(e) => {
674    ///                 return Err(e);
675    ///             }
676    ///         }
677    ///     }
678    ///
679    ///     Ok(())
680    /// }
681    /// ```
682    pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
683        self.io
684            .registration()
685            .try_io(Interest::WRITABLE, || self.io.send(buf))
686    }
687
688    /// Tries to send a datagram to the peer without waiting.
689    ///
690    /// # Examples
691    ///
692    /// ```no_run
693    /// use tokio::net::UnixDatagram;
694    /// use std::io;
695    ///
696    /// #[tokio::main]
697    /// async fn main() -> io::Result<()> {
698    ///     let dir = tempfile::tempdir().unwrap();
699    ///     let client_path = dir.path().join("client.sock");
700    ///     let server_path = dir.path().join("server.sock");
701    ///     let socket = UnixDatagram::bind(&client_path)?;
702    ///
703    ///     loop {
704    ///         // Wait for the socket to be writable
705    ///         socket.writable().await?;
706    ///
707    ///         // Try to send data, this may still fail with `WouldBlock`
708    ///         // if the readiness event is a false positive.
709    ///         match socket.try_send_to(b"hello world", &server_path) {
710    ///             Ok(n) => {
711    ///                 break;
712    ///             }
713    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
714    ///                 continue;
715    ///             }
716    ///             Err(e) => {
717    ///                 return Err(e);
718    ///             }
719    ///         }
720    ///     }
721    ///
722    ///     Ok(())
723    /// }
724    /// ```
725    pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
726    where
727        P: AsRef<Path>,
728    {
729        self.io
730            .registration()
731            .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
732    }
733
734    /// Receives data from the socket.
735    ///
736    /// # Cancel safety
737    ///
738    /// This method is cancel safe. If `recv` is used as the event in a
739    /// [`tokio::select!`](crate::select) statement and some other branch
740    /// completes first, it is guaranteed that no messages were received on this
741    /// socket.
742    ///
743    /// # Examples
744    /// ```
745    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
746    /// # use std::error::Error;
747    /// # #[tokio::main]
748    /// # async fn main() -> Result<(), Box<dyn Error>> {
749    /// use tokio::net::UnixDatagram;
750    ///
751    /// // Create the pair of sockets
752    /// let (sock1, sock2) = UnixDatagram::pair()?;
753    ///
754    /// // Since the sockets are paired, the paired send/recv
755    /// // functions can be used
756    /// let bytes = b"hello world";
757    /// sock1.send(bytes).await?;
758    ///
759    /// let mut buff = vec![0u8; 24];
760    /// let size = sock2.recv(&mut buff).await?;
761    ///
762    /// let dgram = &buff[..size];
763    /// assert_eq!(dgram, bytes);
764    ///
765    /// # Ok(())
766    /// # }
767    /// ```
768    pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
769        self.io
770            .registration()
771            .async_io(Interest::READABLE, || self.io.recv(buf))
772            .await
773    }
774
775    /// Tries to receive a datagram from the peer without waiting.
776    ///
777    /// # Examples
778    ///
779    /// ```no_run
780    /// use tokio::net::UnixDatagram;
781    /// use std::io;
782    ///
783    /// #[tokio::main]
784    /// async fn main() -> io::Result<()> {
785    ///     // Connect to a peer
786    ///     let dir = tempfile::tempdir().unwrap();
787    ///     let client_path = dir.path().join("client.sock");
788    ///     let server_path = dir.path().join("server.sock");
789    ///     let socket = UnixDatagram::bind(&client_path)?;
790    ///     socket.connect(&server_path)?;
791    ///
792    ///     loop {
793    ///         // Wait for the socket to be readable
794    ///         socket.readable().await?;
795    ///
796    ///         // The buffer is **not** included in the async task and will
797    ///         // only exist on the stack.
798    ///         let mut buf = [0; 1024];
799    ///
800    ///         // Try to recv data, this may still fail with `WouldBlock`
801    ///         // if the readiness event is a false positive.
802    ///         match socket.try_recv(&mut buf) {
803    ///             Ok(n) => {
804    ///                 println!("GOT {:?}", &buf[..n]);
805    ///                 break;
806    ///             }
807    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
808    ///                 continue;
809    ///             }
810    ///             Err(e) => {
811    ///                 return Err(e);
812    ///             }
813    ///         }
814    ///     }
815    ///
816    ///     Ok(())
817    /// }
818    /// ```
819    pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
820        self.io
821            .registration()
822            .try_io(Interest::READABLE, || self.io.recv(buf))
823    }
824
825    cfg_io_util! {
826        /// Tries to receive data from the socket without waiting.
827        ///
828        /// This method can be used even if `buf` is uninitialized.
829        ///
830        /// # Examples
831        ///
832        /// ```no_run
833        /// use tokio::net::UnixDatagram;
834        /// use std::io;
835        ///
836        /// #[tokio::main]
837        /// async fn main() -> io::Result<()> {
838        ///     // Connect to a peer
839        ///     let dir = tempfile::tempdir().unwrap();
840        ///     let client_path = dir.path().join("client.sock");
841        ///     let server_path = dir.path().join("server.sock");
842        ///     let socket = UnixDatagram::bind(&client_path)?;
843        ///
844        ///     loop {
845        ///         // Wait for the socket to be readable
846        ///         socket.readable().await?;
847        ///
848        ///         let mut buf = Vec::with_capacity(1024);
849        ///
850        ///         // Try to recv data, this may still fail with `WouldBlock`
851        ///         // if the readiness event is a false positive.
852        ///         match socket.try_recv_buf_from(&mut buf) {
853        ///             Ok((n, _addr)) => {
854        ///                 println!("GOT {:?}", &buf[..n]);
855        ///                 break;
856        ///             }
857        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
858        ///                 continue;
859        ///             }
860        ///             Err(e) => {
861        ///                 return Err(e);
862        ///             }
863        ///         }
864        ///     }
865        ///
866        ///     Ok(())
867        /// }
868        /// ```
869        pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
870            let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
871                let dst = buf.chunk_mut();
872                let dst =
873                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
874
875                // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
876                // buffer.
877                let (n, addr) = (*self.io).recv_from(dst)?;
878
879                unsafe {
880                    buf.advance_mut(n);
881                }
882
883                Ok((n, addr))
884            })?;
885
886            Ok((n, SocketAddr(addr)))
887        }
888
889        /// Receives from the socket, advances the
890        /// buffer's internal cursor and returns how many bytes were read and the origin.
891        ///
892        /// This method can be used even if `buf` is uninitialized.
893        ///
894        /// # Examples
895        /// ```
896        /// # if cfg!(miri) { return } // No `socket` in miri.
897        /// # use std::error::Error;
898        /// # #[tokio::main]
899        /// # async fn main() -> Result<(), Box<dyn Error>> {
900        /// use tokio::net::UnixDatagram;
901        /// use tempfile::tempdir;
902        ///
903        /// // We use a temporary directory so that the socket
904        /// // files left by the bound sockets will get cleaned up.
905        /// let tmp = tempdir()?;
906        ///
907        /// // Bind each socket to a filesystem path
908        /// let tx_path = tmp.path().join("tx");
909        /// let tx = UnixDatagram::bind(&tx_path)?;
910        /// let rx_path = tmp.path().join("rx");
911        /// let rx = UnixDatagram::bind(&rx_path)?;
912        ///
913        /// let bytes = b"hello world";
914        /// tx.send_to(bytes, &rx_path).await?;
915        ///
916        /// let mut buf = Vec::with_capacity(24);
917        /// let (size, addr) = rx.recv_buf_from(&mut buf).await?;
918        ///
919        /// let dgram = &buf[..size];
920        /// assert_eq!(dgram, bytes);
921        /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
922        ///
923        /// # Ok(())
924        /// # }
925        /// ```
926        pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
927            self.io.registration().async_io(Interest::READABLE, || {
928                let dst = buf.chunk_mut();
929                let dst =
930                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
931
932                // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
933                // buffer.
934                let (n, addr) = (*self.io).recv_from(dst)?;
935
936                unsafe {
937                    buf.advance_mut(n);
938                }
939                Ok((n,SocketAddr(addr)))
940            }).await
941        }
942
943        /// Tries to read data from the stream into the provided buffer, advancing the
944        /// buffer's internal cursor, returning how many bytes were read.
945        ///
946        /// This method can be used even if `buf` is uninitialized.
947        ///
948        /// # Examples
949        ///
950        /// ```no_run
951        /// use tokio::net::UnixDatagram;
952        /// use std::io;
953        ///
954        /// #[tokio::main]
955        /// async fn main() -> io::Result<()> {
956        ///     // Connect to a peer
957        ///     let dir = tempfile::tempdir().unwrap();
958        ///     let client_path = dir.path().join("client.sock");
959        ///     let server_path = dir.path().join("server.sock");
960        ///     let socket = UnixDatagram::bind(&client_path)?;
961        ///     socket.connect(&server_path)?;
962        ///
963        ///     loop {
964        ///         // Wait for the socket to be readable
965        ///         socket.readable().await?;
966        ///
967        ///         let mut buf = Vec::with_capacity(1024);
968        ///
969        ///         // Try to recv data, this may still fail with `WouldBlock`
970        ///         // if the readiness event is a false positive.
971        ///         match socket.try_recv_buf(&mut buf) {
972        ///             Ok(n) => {
973        ///                 println!("GOT {:?}", &buf[..n]);
974        ///                 break;
975        ///             }
976        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
977        ///                 continue;
978        ///             }
979        ///             Err(e) => {
980        ///                 return Err(e);
981        ///             }
982        ///         }
983        ///     }
984        ///
985        ///     Ok(())
986        /// }
987        /// ```
988        pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
989            self.io.registration().try_io(Interest::READABLE, || {
990                let dst = buf.chunk_mut();
991                let dst =
992                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
993
994                // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
995                // buffer.
996                let n = (*self.io).recv(dst)?;
997
998                unsafe {
999                    buf.advance_mut(n);
1000                }
1001
1002                Ok(n)
1003            })
1004        }
1005
1006        /// Receives data from the socket from the address to which it is connected,
1007        /// advancing the buffer's internal cursor, returning how many bytes were read.
1008        ///
1009        /// This method can be used even if `buf` is uninitialized.
1010        ///
1011        /// # Examples
1012        /// ```
1013        /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1014        /// # use std::error::Error;
1015        /// # #[tokio::main]
1016        /// # async fn main() -> Result<(), Box<dyn Error>> {
1017        /// use tokio::net::UnixDatagram;
1018        ///
1019        /// // Create the pair of sockets
1020        /// let (sock1, sock2) = UnixDatagram::pair()?;
1021        ///
1022        /// // Since the sockets are paired, the paired send/recv
1023        /// // functions can be used
1024        /// let bytes = b"hello world";
1025        /// sock1.send(bytes).await?;
1026        ///
1027        /// let mut buff = Vec::with_capacity(24);
1028        /// let size = sock2.recv_buf(&mut buff).await?;
1029        ///
1030        /// let dgram = &buff[..size];
1031        /// assert_eq!(dgram, bytes);
1032        ///
1033        /// # Ok(())
1034        /// # }
1035        /// ```
1036        pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1037            self.io.registration().async_io(Interest::READABLE, || {
1038                let dst = buf.chunk_mut();
1039                let dst =
1040                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1041
1042                // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
1043                // buffer.
1044                let n = (*self.io).recv(dst)?;
1045
1046                unsafe {
1047                    buf.advance_mut(n);
1048                }
1049                Ok(n)
1050            }).await
1051        }
1052    }
1053
1054    /// Sends data on the socket to the specified address.
1055    ///
1056    /// # Cancel safety
1057    ///
1058    /// This method is cancel safe. If `send_to` is used as the event in a
1059    /// [`tokio::select!`](crate::select) statement and some other branch
1060    /// completes first, then it is guaranteed that the message was not sent.
1061    ///
1062    /// # Examples
1063    /// ```
1064    /// # if cfg!(miri) { return } // No `socket` in miri.
1065    /// # use std::error::Error;
1066    /// # #[tokio::main]
1067    /// # async fn main() -> Result<(), Box<dyn Error>> {
1068    /// use tokio::net::UnixDatagram;
1069    /// use tempfile::tempdir;
1070    ///
1071    /// // We use a temporary directory so that the socket
1072    /// // files left by the bound sockets will get cleaned up.
1073    /// let tmp = tempdir()?;
1074    ///
1075    /// // Bind each socket to a filesystem path
1076    /// let tx_path = tmp.path().join("tx");
1077    /// let tx = UnixDatagram::bind(&tx_path)?;
1078    /// let rx_path = tmp.path().join("rx");
1079    /// let rx = UnixDatagram::bind(&rx_path)?;
1080    ///
1081    /// let bytes = b"hello world";
1082    /// tx.send_to(bytes, &rx_path).await?;
1083    ///
1084    /// let mut buf = vec![0u8; 24];
1085    /// let (size, addr) = rx.recv_from(&mut buf).await?;
1086    ///
1087    /// let dgram = &buf[..size];
1088    /// assert_eq!(dgram, bytes);
1089    /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1090    ///
1091    /// # Ok(())
1092    /// # }
1093    /// ```
1094    pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
1095    where
1096        P: AsRef<Path>,
1097    {
1098        self.io
1099            .registration()
1100            .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref()))
1101            .await
1102    }
1103
1104    /// Receives data from the socket.
1105    ///
1106    /// # Cancel safety
1107    ///
1108    /// This method is cancel safe. If `recv_from` is used as the event in a
1109    /// [`tokio::select!`](crate::select) statement and some other branch
1110    /// completes first, it is guaranteed that no messages were received on this
1111    /// socket.
1112    ///
1113    /// # Examples
1114    /// ```
1115    /// # if cfg!(miri) { return } // No `socket` in miri.
1116    /// # use std::error::Error;
1117    /// # #[tokio::main]
1118    /// # async fn main() -> Result<(), Box<dyn Error>> {
1119    /// use tokio::net::UnixDatagram;
1120    /// use tempfile::tempdir;
1121    ///
1122    /// // We use a temporary directory so that the socket
1123    /// // files left by the bound sockets will get cleaned up.
1124    /// let tmp = tempdir()?;
1125    ///
1126    /// // Bind each socket to a filesystem path
1127    /// let tx_path = tmp.path().join("tx");
1128    /// let tx = UnixDatagram::bind(&tx_path)?;
1129    /// let rx_path = tmp.path().join("rx");
1130    /// let rx = UnixDatagram::bind(&rx_path)?;
1131    ///
1132    /// let bytes = b"hello world";
1133    /// tx.send_to(bytes, &rx_path).await?;
1134    ///
1135    /// let mut buf = vec![0u8; 24];
1136    /// let (size, addr) = rx.recv_from(&mut buf).await?;
1137    ///
1138    /// let dgram = &buf[..size];
1139    /// assert_eq!(dgram, bytes);
1140    /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1141    ///
1142    /// # Ok(())
1143    /// # }
1144    /// ```
1145    pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1146        let (n, addr) = self
1147            .io
1148            .registration()
1149            .async_io(Interest::READABLE, || self.io.recv_from(buf))
1150            .await?;
1151
1152        Ok((n, SocketAddr(addr)))
1153    }
1154
1155    /// Attempts to receive a single datagram on the specified address.
1156    ///
1157    /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1158    /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1159    /// receive a wakeup.
1160    ///
1161    /// # Return value
1162    ///
1163    /// The function returns:
1164    ///
1165    /// * `Poll::Pending` if the socket is not ready to read
1166    /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1167    /// * `Poll::Ready(Err(e))` if an error is encountered.
1168    ///
1169    /// # Errors
1170    ///
1171    /// This function may encounter any standard I/O error except `WouldBlock`.
1172    pub fn poll_recv_from(
1173        &self,
1174        cx: &mut Context<'_>,
1175        buf: &mut ReadBuf<'_>,
1176    ) -> Poll<io::Result<SocketAddr>> {
1177        #[allow(clippy::blocks_in_conditions)]
1178        let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1179            // Safety: will not read the maybe uninitialized bytes.
1180            let b = unsafe {
1181                &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1182            };
1183
1184            self.io.recv_from(b)
1185        }))?;
1186
1187        // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1188        unsafe {
1189            buf.assume_init(n);
1190        }
1191        buf.advance(n);
1192        Poll::Ready(Ok(SocketAddr(addr)))
1193    }
1194
1195    /// Attempts to send data to the specified address.
1196    ///
1197    /// Note that on multiple calls to a `poll_*` method in the send direction, only the
1198    /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1199    /// receive a wakeup.
1200    ///
1201    /// # Return value
1202    ///
1203    /// The function returns:
1204    ///
1205    /// * `Poll::Pending` if the socket is not ready to write
1206    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
1207    /// * `Poll::Ready(Err(e))` if an error is encountered.
1208    ///
1209    /// # Errors
1210    ///
1211    /// This function may encounter any standard I/O error except `WouldBlock`.
1212    pub fn poll_send_to<P>(
1213        &self,
1214        cx: &mut Context<'_>,
1215        buf: &[u8],
1216        target: P,
1217    ) -> Poll<io::Result<usize>>
1218    where
1219        P: AsRef<Path>,
1220    {
1221        self.io
1222            .registration()
1223            .poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
1224    }
1225
1226    /// Attempts to send data on the socket to the remote address to which it
1227    /// was previously `connect`ed.
1228    ///
1229    /// The [`connect`] method will connect this socket to a remote address.
1230    /// This method will fail if the socket is not connected.
1231    ///
1232    /// Note that on multiple calls to a `poll_*` method in the send direction,
1233    /// only the `Waker` from the `Context` passed to the most recent call will
1234    /// be scheduled to receive a wakeup.
1235    ///
1236    /// # Return value
1237    ///
1238    /// The function returns:
1239    ///
1240    /// * `Poll::Pending` if the socket is not available to write
1241    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
1242    /// * `Poll::Ready(Err(e))` if an error is encountered.
1243    ///
1244    /// # Errors
1245    ///
1246    /// This function may encounter any standard I/O error except `WouldBlock`.
1247    ///
1248    /// [`connect`]: method@Self::connect
1249    pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
1250        self.io
1251            .registration()
1252            .poll_write_io(cx, || self.io.send(buf))
1253    }
1254
1255    /// Attempts to receive a single datagram message on the socket from the remote
1256    /// address to which it is `connect`ed.
1257    ///
1258    /// The [`connect`] method will connect this socket to a remote address. This method
1259    /// resolves to an error if the socket is not connected.
1260    ///
1261    /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1262    /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1263    /// receive a wakeup.
1264    ///
1265    /// # Return value
1266    ///
1267    /// The function returns:
1268    ///
1269    /// * `Poll::Pending` if the socket is not ready to read
1270    /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
1271    /// * `Poll::Ready(Err(e))` if an error is encountered.
1272    ///
1273    /// # Errors
1274    ///
1275    /// This function may encounter any standard I/O error except `WouldBlock`.
1276    ///
1277    /// [`connect`]: method@Self::connect
1278    pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
1279        #[allow(clippy::blocks_in_conditions)]
1280        let n = ready!(self.io.registration().poll_read_io(cx, || {
1281            // Safety: will not read the maybe uninitialized bytes.
1282            let b = unsafe {
1283                &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1284            };
1285
1286            self.io.recv(b)
1287        }))?;
1288
1289        // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1290        unsafe {
1291            buf.assume_init(n);
1292        }
1293        buf.advance(n);
1294        Poll::Ready(Ok(()))
1295    }
1296
1297    /// Tries to receive data from the socket without waiting.
1298    ///
1299    /// # Examples
1300    ///
1301    /// ```no_run
1302    /// use tokio::net::UnixDatagram;
1303    /// use std::io;
1304    ///
1305    /// #[tokio::main]
1306    /// async fn main() -> io::Result<()> {
1307    ///     // Connect to a peer
1308    ///     let dir = tempfile::tempdir().unwrap();
1309    ///     let client_path = dir.path().join("client.sock");
1310    ///     let server_path = dir.path().join("server.sock");
1311    ///     let socket = UnixDatagram::bind(&client_path)?;
1312    ///
1313    ///     loop {
1314    ///         // Wait for the socket to be readable
1315    ///         socket.readable().await?;
1316    ///
1317    ///         // The buffer is **not** included in the async task and will
1318    ///         // only exist on the stack.
1319    ///         let mut buf = [0; 1024];
1320    ///
1321    ///         // Try to recv data, this may still fail with `WouldBlock`
1322    ///         // if the readiness event is a false positive.
1323    ///         match socket.try_recv_from(&mut buf) {
1324    ///             Ok((n, _addr)) => {
1325    ///                 println!("GOT {:?}", &buf[..n]);
1326    ///                 break;
1327    ///             }
1328    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1329    ///                 continue;
1330    ///             }
1331    ///             Err(e) => {
1332    ///                 return Err(e);
1333    ///             }
1334    ///         }
1335    ///     }
1336    ///
1337    ///     Ok(())
1338    /// }
1339    /// ```
1340    pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1341        let (n, addr) = self
1342            .io
1343            .registration()
1344            .try_io(Interest::READABLE, || self.io.recv_from(buf))?;
1345
1346        Ok((n, SocketAddr(addr)))
1347    }
1348
1349    /// Tries to read or write from the socket using a user-provided IO operation.
1350    ///
1351    /// If the socket is ready, the provided closure is called. The closure
1352    /// should attempt to perform IO operation on the socket by manually
1353    /// calling the appropriate syscall. If the operation fails because the
1354    /// socket is not actually ready, then the closure should return a
1355    /// `WouldBlock` error and the readiness flag is cleared. The return value
1356    /// of the closure is then returned by `try_io`.
1357    ///
1358    /// If the socket is not ready, then the closure is not called
1359    /// and a `WouldBlock` error is returned.
1360    ///
1361    /// The closure should only return a `WouldBlock` error if it has performed
1362    /// an IO operation on the socket that failed due to the socket not being
1363    /// ready. Returning a `WouldBlock` error in any other situation will
1364    /// incorrectly clear the readiness flag, which can cause the socket to
1365    /// behave incorrectly.
1366    ///
1367    /// The closure should not perform the IO operation using any of the methods
1368    /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1369    /// readiness flag and can cause the socket to behave incorrectly.
1370    ///
1371    /// This method is not intended to be used with combined interests.
1372    /// The closure should perform only one type of IO operation, so it should not
1373    /// require more than one ready state. This method may panic or sleep forever
1374    /// if it is called with a combined interest.
1375    ///
1376    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1377    ///
1378    /// [`readable()`]: UnixDatagram::readable()
1379    /// [`writable()`]: UnixDatagram::writable()
1380    /// [`ready()`]: UnixDatagram::ready()
1381    pub fn try_io<R>(
1382        &self,
1383        interest: Interest,
1384        f: impl FnOnce() -> io::Result<R>,
1385    ) -> io::Result<R> {
1386        self.io
1387            .registration()
1388            .try_io(interest, || self.io.try_io(f))
1389    }
1390
1391    /// Reads or writes from the socket using a user-provided IO operation.
1392    ///
1393    /// The readiness of the socket is awaited and when the socket is ready,
1394    /// the provided closure is called. The closure should attempt to perform
1395    /// IO operation on the socket by manually calling the appropriate syscall.
1396    /// If the operation fails because the socket is not actually ready,
1397    /// then the closure should return a `WouldBlock` error. In such case the
1398    /// readiness flag is cleared and the socket readiness is awaited again.
1399    /// This loop is repeated until the closure returns an `Ok` or an error
1400    /// other than `WouldBlock`.
1401    ///
1402    /// The closure should only return a `WouldBlock` error if it has performed
1403    /// an IO operation on the socket that failed due to the socket not being
1404    /// ready. Returning a `WouldBlock` error in any other situation will
1405    /// incorrectly clear the readiness flag, which can cause the socket to
1406    /// behave incorrectly.
1407    ///
1408    /// The closure should not perform the IO operation using any of the methods
1409    /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1410    /// readiness flag and can cause the socket to behave incorrectly.
1411    ///
1412    /// This method is not intended to be used with combined interests.
1413    /// The closure should perform only one type of IO operation, so it should not
1414    /// require more than one ready state. This method may panic or sleep forever
1415    /// if it is called with a combined interest.
1416    pub async fn async_io<R>(
1417        &self,
1418        interest: Interest,
1419        mut f: impl FnMut() -> io::Result<R>,
1420    ) -> io::Result<R> {
1421        self.io
1422            .registration()
1423            .async_io(interest, || self.io.try_io(&mut f))
1424            .await
1425    }
1426
1427    /// Returns the local address that this socket is bound to.
1428    ///
1429    /// # Examples
1430    /// For a socket bound to a local path
1431    /// ```
1432    /// # if cfg!(miri) { return } // No `socket` in miri.
1433    /// # use std::error::Error;
1434    /// # #[tokio::main]
1435    /// # async fn main() -> Result<(), Box<dyn Error>> {
1436    /// use tokio::net::UnixDatagram;
1437    /// use tempfile::tempdir;
1438    ///
1439    /// // We use a temporary directory so that the socket
1440    /// // files left by the bound sockets will get cleaned up.
1441    /// let tmp = tempdir()?;
1442    ///
1443    /// // Bind socket to a filesystem path
1444    /// let socket_path = tmp.path().join("socket");
1445    /// let socket = UnixDatagram::bind(&socket_path)?;
1446    ///
1447    /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path);
1448    ///
1449    /// # Ok(())
1450    /// # }
1451    /// ```
1452    ///
1453    /// For an unbound socket
1454    /// ```
1455    /// # if cfg!(miri) { return } // No `socket` in miri.
1456    /// # use std::error::Error;
1457    /// # #[tokio::main]
1458    /// # async fn main() -> Result<(), Box<dyn Error>> {
1459    /// use tokio::net::UnixDatagram;
1460    ///
1461    /// // Create an unbound socket
1462    /// let socket = UnixDatagram::unbound()?;
1463    ///
1464    /// assert!(socket.local_addr()?.is_unnamed());
1465    ///
1466    /// # Ok(())
1467    /// # }
1468    /// ```
1469    pub fn local_addr(&self) -> io::Result<SocketAddr> {
1470        self.io.local_addr().map(SocketAddr)
1471    }
1472
1473    /// Returns the address of this socket's peer.
1474    ///
1475    /// The `connect` method will connect the socket to a peer.
1476    ///
1477    /// # Examples
1478    /// For a peer with a local path
1479    /// ```
1480    /// # if cfg!(miri) { return } // No `socket` in miri.
1481    /// # use std::error::Error;
1482    /// # #[tokio::main]
1483    /// # async fn main() -> Result<(), Box<dyn Error>> {
1484    /// use tokio::net::UnixDatagram;
1485    /// use tempfile::tempdir;
1486    ///
1487    /// // Create an unbound socket
1488    /// let tx = UnixDatagram::unbound()?;
1489    ///
1490    /// // Create another, bound socket
1491    /// let tmp = tempdir()?;
1492    /// let rx_path = tmp.path().join("rx");
1493    /// let rx = UnixDatagram::bind(&rx_path)?;
1494    ///
1495    /// // Connect to the bound socket
1496    /// tx.connect(&rx_path)?;
1497    ///
1498    /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path);
1499    ///
1500    /// # Ok(())
1501    /// # }
1502    /// ```
1503    ///
1504    /// For an unbound peer
1505    /// ```
1506    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1507    /// # use std::error::Error;
1508    /// # #[tokio::main]
1509    /// # async fn main() -> Result<(), Box<dyn Error>> {
1510    /// use tokio::net::UnixDatagram;
1511    ///
1512    /// // Create the pair of sockets
1513    /// let (sock1, sock2) = UnixDatagram::pair()?;
1514    ///
1515    /// assert!(sock1.peer_addr()?.is_unnamed());
1516    ///
1517    /// # Ok(())
1518    /// # }
1519    /// ```
1520    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
1521        self.io.peer_addr().map(SocketAddr)
1522    }
1523
1524    /// Returns the value of the `SO_ERROR` option.
1525    ///
1526    /// # Examples
1527    /// ```
1528    /// # if cfg!(miri) { return } // No `socket` in miri.
1529    /// # use std::error::Error;
1530    /// # #[tokio::main]
1531    /// # async fn main() -> Result<(), Box<dyn Error>> {
1532    /// use tokio::net::UnixDatagram;
1533    ///
1534    /// // Create an unbound socket
1535    /// let socket = UnixDatagram::unbound()?;
1536    ///
1537    /// if let Ok(Some(err)) = socket.take_error() {
1538    ///     println!("Got error: {:?}", err);
1539    /// }
1540    ///
1541    /// # Ok(())
1542    /// # }
1543    /// ```
1544    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1545        self.io.take_error()
1546    }
1547
1548    /// Shuts down the read, write, or both halves of this connection.
1549    ///
1550    /// This function will cause all pending and future I/O calls on the
1551    /// specified portions to immediately return with an appropriate value
1552    /// (see the documentation of `Shutdown`).
1553    ///
1554    /// # Examples
1555    /// ```
1556    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1557    /// # use std::error::Error;
1558    /// # #[tokio::main]
1559    /// # async fn main() -> Result<(), Box<dyn Error>> {
1560    /// use tokio::net::UnixDatagram;
1561    /// use std::net::Shutdown;
1562    ///
1563    /// // Create an unbound socket
1564    /// let (socket, other) = UnixDatagram::pair()?;
1565    ///
1566    /// socket.shutdown(Shutdown::Both)?;
1567    ///
1568    /// // NOTE: the following commented out code does NOT work as expected.
1569    /// // Due to an underlying issue, the recv call will block indefinitely.
1570    /// // See: https://github.com/tokio-rs/tokio/issues/1679
1571    /// //let mut buff = vec![0u8; 24];
1572    /// //let size = socket.recv(&mut buff).await?;
1573    /// //assert_eq!(size, 0);
1574    ///
1575    /// let send_result = socket.send(b"hello world").await;
1576    /// assert!(send_result.is_err());
1577    ///
1578    /// # Ok(())
1579    /// # }
1580    /// ```
1581    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
1582        self.io.shutdown(how)
1583    }
1584}
1585
1586impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
1587    type Error = io::Error;
1588
1589    /// Consumes stream, returning the Tokio I/O object.
1590    ///
1591    /// This is equivalent to
1592    /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
1593    fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> {
1594        Self::from_std(stream)
1595    }
1596}
1597
1598impl fmt::Debug for UnixDatagram {
1599    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1600        self.io.fmt(f)
1601    }
1602}
1603
1604impl AsRawFd for UnixDatagram {
1605    fn as_raw_fd(&self) -> RawFd {
1606        self.io.as_raw_fd()
1607    }
1608}
1609
1610impl AsFd for UnixDatagram {
1611    fn as_fd(&self) -> BorrowedFd<'_> {
1612        unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1613    }
1614}