tokio/runtime/scheduler/inject/
shared.rs1use super::{Pop, Synced};
2
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::runtime::task;
5
6use std::marker::PhantomData;
7use std::sync::atomic::Ordering::{Acquire, Release};
8
9pub(crate) struct Shared<T: 'static> {
10 pub(super) len: AtomicUsize,
13
14 _p: PhantomData<T>,
15}
16
17unsafe impl<T> Send for Shared<T> {}
18unsafe impl<T> Sync for Shared<T> {}
19
20impl<T: 'static> Shared<T> {
21 pub(crate) fn new() -> (Shared<T>, Synced) {
22 let inject = Shared {
23 len: AtomicUsize::new(0),
24 _p: PhantomData,
25 };
26
27 let synced = Synced {
28 is_closed: false,
29 head: None,
30 tail: None,
31 };
32
33 (inject, synced)
34 }
35
36 pub(crate) fn is_empty(&self) -> bool {
37 self.len() == 0
38 }
39
40 #[cfg(any(tokio_taskdump, feature = "rt-multi-thread"))]
42 pub(crate) fn is_closed(&self, synced: &Synced) -> bool {
43 synced.is_closed
44 }
45
46 pub(crate) fn close(&self, synced: &mut Synced) -> bool {
49 if synced.is_closed {
50 return false;
51 }
52
53 synced.is_closed = true;
54 true
55 }
56
57 pub(crate) fn len(&self) -> usize {
58 self.len.load(Acquire)
59 }
60
61 pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) {
69 if synced.is_closed {
70 return;
71 }
72
73 let len = self.len.unsync_load();
75 let task = task.into_raw();
76
77 debug_assert!(unsafe { task.get_queue_next().is_none() });
79
80 if let Some(tail) = synced.tail {
81 unsafe { tail.set_queue_next(Some(task)) };
84 } else {
85 synced.head = Some(task);
86 }
87
88 synced.tail = Some(task);
89 self.len.store(len + 1, Release);
90 }
91
92 pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> {
98 self.pop_n(synced, 1).next()
99 }
100
101 pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
107 use std::cmp;
108
109 debug_assert!(n > 0);
110
111 let len = self.len.unsync_load();
114 let n = cmp::min(n, len);
115
116 self.len.store(len - n, Release);
118
119 Pop::new(n, synced)
120 }
121}