tokio/runtime/io/
registration_set.rs1use crate::loom::sync::atomic::AtomicUsize;
2use crate::runtime::io::ScheduledIo;
3use crate::util::linked_list::{self, LinkedList};
4
5use std::io;
6use std::ptr::NonNull;
7use std::sync::atomic::Ordering::{Acquire, Release};
8use std::sync::Arc;
9
10const NOTIFY_AFTER: usize = 16;
12
13pub(super) struct RegistrationSet {
14 num_pending_release: AtomicUsize,
15}
16
17pub(super) struct Synced {
18 is_shutdown: bool,
21
22 registrations: LinkedList<Arc<ScheduledIo>, ScheduledIo>,
24
25 pending_release: Vec<Arc<ScheduledIo>>,
30}
31
32impl RegistrationSet {
33 pub(super) fn new() -> (RegistrationSet, Synced) {
34 let set = RegistrationSet {
35 num_pending_release: AtomicUsize::new(0),
36 };
37
38 let synced = Synced {
39 is_shutdown: false,
40 registrations: LinkedList::new(),
41 pending_release: Vec::with_capacity(NOTIFY_AFTER),
42 };
43
44 (set, synced)
45 }
46
47 pub(super) fn is_shutdown(&self, synced: &Synced) -> bool {
48 synced.is_shutdown
49 }
50
51 pub(super) fn needs_release(&self) -> bool {
53 self.num_pending_release.load(Acquire) != 0
54 }
55
56 pub(super) fn allocate(&self, synced: &mut Synced) -> io::Result<Arc<ScheduledIo>> {
57 if synced.is_shutdown {
58 return Err(io::Error::new(
59 io::ErrorKind::Other,
60 crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
61 ));
62 }
63
64 let ret = Arc::new(ScheduledIo::default());
65
66 synced.registrations.push_front(ret.clone());
68
69 Ok(ret)
70 }
71
72 pub(super) fn deregister(&self, synced: &mut Synced, registration: &Arc<ScheduledIo>) -> bool {
75 synced.pending_release.push(registration.clone());
76
77 let len = synced.pending_release.len();
78 self.num_pending_release.store(len, Release);
79
80 len == NOTIFY_AFTER
81 }
82
83 pub(super) fn shutdown(&self, synced: &mut Synced) -> Vec<Arc<ScheduledIo>> {
84 if synced.is_shutdown {
85 return vec![];
86 }
87
88 synced.is_shutdown = true;
89 synced.pending_release.clear();
90
91 let mut ret = vec![];
96
97 while let Some(io) = synced.registrations.pop_back() {
98 ret.push(io);
99 }
100
101 ret
102 }
103
104 pub(super) fn release(&self, synced: &mut Synced) {
105 let pending = std::mem::take(&mut synced.pending_release);
106
107 for io in pending {
108 unsafe { self.remove(synced, &io) }
110 }
111
112 self.num_pending_release.store(0, Release);
113 }
114
115 pub(super) unsafe fn remove(&self, synced: &mut Synced, io: &Arc<ScheduledIo>) {
118 let io = unsafe { NonNull::new_unchecked(Arc::as_ptr(io).cast_mut()) };
120
121 super::EXPOSE_IO.unexpose_provenance(io.as_ptr());
122 let _ = synced.registrations.remove(io);
123 }
124}
125
126unsafe impl linked_list::Link for Arc<ScheduledIo> {
128 type Handle = Arc<ScheduledIo>;
129 type Target = ScheduledIo;
130
131 fn as_raw(handle: &Self::Handle) -> NonNull<ScheduledIo> {
132 unsafe { NonNull::new_unchecked(Arc::as_ptr(handle) as *mut _) }
134 }
135
136 unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Arc<ScheduledIo> {
137 unsafe { Arc::from_raw(ptr.as_ptr() as *const _) }
139 }
140
141 unsafe fn pointers(
142 target: NonNull<Self::Target>,
143 ) -> NonNull<linked_list::Pointers<ScheduledIo>> {
144 NonNull::new_unchecked(target.as_ref().linked_list_pointers.get())
145 }
146}