tokio/runtime/scheduler/multi_thread_alt/
idle.rs1#![allow(dead_code)]
4
5use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
6use crate::loom::sync::MutexGuard;
7use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Handle, Shared};
8
9use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
10
11pub(super) struct Idle {
12 num_searching: AtomicUsize,
14
15 num_idle: AtomicUsize,
17
18 idle_map: IdleMap,
20
21 needs_searching: AtomicBool,
23
24 num_cores: usize,
26}
27
28pub(super) struct IdleMap {
29 chunks: Vec<AtomicUsize>,
30}
31
32pub(super) struct Snapshot {
33 chunks: Vec<usize>,
34}
35
36pub(super) struct Synced {
38 sleepers: Vec<usize>,
40
41 available_cores: Vec<Box<Core>>,
43}
44
45impl Idle {
46 pub(super) fn new(cores: Vec<Box<Core>>, num_workers: usize) -> (Idle, Synced) {
47 let idle = Idle {
48 num_searching: AtomicUsize::new(0),
49 num_idle: AtomicUsize::new(cores.len()),
50 idle_map: IdleMap::new(&cores),
51 needs_searching: AtomicBool::new(false),
52 num_cores: cores.len(),
53 };
54
55 let synced = Synced {
56 sleepers: Vec::with_capacity(num_workers),
57 available_cores: cores,
58 };
59
60 (idle, synced)
61 }
62
63 pub(super) fn needs_searching(&self) -> bool {
64 self.needs_searching.load(Acquire)
65 }
66
67 pub(super) fn num_idle(&self, synced: &Synced) -> usize {
68 #[cfg(not(loom))]
69 debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire));
70 synced.available_cores.len()
71 }
72
73 pub(super) fn num_searching(&self) -> usize {
74 self.num_searching.load(Acquire)
75 }
76
77 pub(super) fn snapshot(&self, snapshot: &mut Snapshot) {
78 snapshot.update(&self.idle_map)
79 }
80
81 pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> {
83 let ret = synced.available_cores.pop();
84
85 if let Some(core) = &ret {
86 let num_idle = self.num_idle.load(Acquire) - 1;
88 debug_assert_eq!(num_idle, synced.available_cores.len());
89 self.num_idle.store(num_idle, Release);
90
91 self.idle_map.unset(core.index);
92 debug_assert!(self.idle_map.matches(&synced.available_cores));
93 }
94
95 ret
96 }
97
98 pub(super) fn notify_local(&self, shared: &Shared) {
100 if self.num_searching.load(Acquire) != 0 {
101 return;
106 }
107
108 if self.num_idle.load(Acquire) == 0 {
109 self.needs_searching.store(true, Release);
110 return;
111 }
112
113 if self
115 .num_searching
116 .compare_exchange(0, 1, AcqRel, Acquire)
117 .is_err()
118 {
119 return;
122 }
123
124 super::counters::inc_num_unparks_local();
125
126 let synced = shared.synced.lock();
128 self.notify_synced(synced, shared);
129 }
130
131 pub(super) fn notify_remote(&self, synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
133 if synced.idle.sleepers.is_empty() {
134 self.needs_searching.store(true, Release);
135 return;
136 }
137
138 self.num_searching.fetch_add(1, AcqRel);
140
141 self.notify_synced(synced, shared);
142 }
143
144 fn notify_synced(&self, mut synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
146 if let Some(worker) = synced.idle.sleepers.pop() {
148 if let Some(mut core) = self.try_acquire_available_core(&mut synced.idle) {
150 debug_assert!(!core.is_searching);
151 core.is_searching = true;
152
153 synced.assigned_cores[worker] = Some(core);
155
156 drop(synced);
158
159 super::counters::inc_num_unparks_remote();
160
161 shared.condvars[worker].notify_one();
163 return;
164 } else {
165 synced.idle.sleepers.push(worker);
166 }
167 }
168
169 super::counters::inc_notify_no_core();
170
171 self.needs_searching.store(true, Release);
173 self.num_searching.fetch_sub(1, Release);
174
175 drop(synced);
179 }
180
181 pub(super) fn notify_mult(
182 &self,
183 synced: &mut worker::Synced,
184 workers: &mut Vec<usize>,
185 num: usize,
186 ) {
187 debug_assert!(workers.is_empty());
188
189 for _ in 0..num {
190 if let Some(worker) = synced.idle.sleepers.pop() {
191 if let Some(core) = synced.idle.available_cores.pop() {
193 debug_assert!(!core.is_searching);
194
195 self.idle_map.unset(core.index);
196
197 synced.assigned_cores[worker] = Some(core);
198
199 workers.push(worker);
200
201 continue;
202 } else {
203 synced.idle.sleepers.push(worker);
204 }
205 }
206
207 break;
208 }
209
210 if !workers.is_empty() {
211 debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
212 let num_idle = synced.idle.available_cores.len();
213 self.num_idle.store(num_idle, Release);
214 } else {
215 #[cfg(not(loom))]
216 debug_assert_eq!(
217 synced.idle.available_cores.len(),
218 self.num_idle.load(Acquire)
219 );
220 self.needs_searching.store(true, Release);
221 }
222 }
223
224 pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
225 while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() {
229 let worker = synced.idle.sleepers.pop().unwrap();
230 let core = self.try_acquire_available_core(&mut synced.idle).unwrap();
231
232 synced.assigned_cores[worker] = Some(core);
233 shared.condvars[worker].notify_one();
234 }
235
236 debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
237
238 while let Some(index) = synced.idle.sleepers.pop() {
240 shared.condvars[index].notify_one();
241 }
242 }
243
244 pub(super) fn shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared) {
245 while let Some(core) = {
249 let mut synced = shared.synced.lock();
250 self.try_acquire_available_core(&mut synced.idle)
251 } {
252 shared.shutdown_core(handle, core);
253 }
254 }
255
256 pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box<Core>) {
259 debug_assert!(!core.is_searching);
261
262 debug_assert!(synced.inject.is_empty());
264
265 let num_idle = synced.idle.available_cores.len();
266 #[cfg(not(loom))]
267 debug_assert_eq!(num_idle, self.num_idle.load(Acquire));
268
269 self.idle_map.set(core.index);
270
271 synced.idle.available_cores.push(core);
273
274 debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
275
276 self.num_idle.store(num_idle + 1, Release);
278 }
279
280 pub(super) fn transition_worker_to_parked(&self, synced: &mut worker::Synced, index: usize) {
281 synced.idle.sleepers.push(index);
283
284 debug_assert!(synced.assigned_cores[index].is_none());
286 }
287
288 pub(super) fn try_transition_worker_to_searching(&self, core: &mut Core) {
289 debug_assert!(!core.is_searching);
290
291 let num_searching = self.num_searching.load(Acquire);
292 let num_idle = self.num_idle.load(Acquire);
293
294 if 2 * num_searching >= self.num_cores - num_idle {
295 return;
296 }
297
298 self.transition_worker_to_searching(core);
299 }
300
301 pub(super) fn transition_worker_to_searching_if_needed(
303 &self,
304 _synced: &mut Synced,
305 core: &mut Core,
306 ) -> bool {
307 if self.needs_searching.load(Acquire) {
308 self.transition_worker_to_searching(core);
310 true
311 } else {
312 false
313 }
314 }
315
316 pub(super) fn transition_worker_to_searching(&self, core: &mut Core) {
317 core.is_searching = true;
318 self.num_searching.fetch_add(1, AcqRel);
319 self.needs_searching.store(false, Release);
320 }
321
322 pub(super) fn transition_worker_from_searching(&self) -> bool {
327 let prev = self.num_searching.fetch_sub(1, AcqRel);
328 debug_assert!(prev > 0);
329
330 prev == 1
331 }
332}
333
334const BITS: usize = usize::BITS as usize;
335const BIT_MASK: usize = (usize::BITS - 1) as usize;
336
337impl IdleMap {
338 fn new(cores: &[Box<Core>]) -> IdleMap {
339 let ret = IdleMap::new_n(num_chunks(cores.len()));
340 ret.set_all(cores);
341
342 ret
343 }
344
345 fn new_n(n: usize) -> IdleMap {
346 let chunks = (0..n).map(|_| AtomicUsize::new(0)).collect();
347 IdleMap { chunks }
348 }
349
350 fn set(&self, index: usize) {
351 let (chunk, mask) = index_to_mask(index);
352 let prev = self.chunks[chunk].load(Acquire);
353 let next = prev | mask;
354 self.chunks[chunk].store(next, Release);
355 }
356
357 fn set_all(&self, cores: &[Box<Core>]) {
358 for core in cores {
359 self.set(core.index);
360 }
361 }
362
363 fn unset(&self, index: usize) {
364 let (chunk, mask) = index_to_mask(index);
365 let prev = self.chunks[chunk].load(Acquire);
366 let next = prev & !mask;
367 self.chunks[chunk].store(next, Release);
368 }
369
370 fn matches(&self, idle_cores: &[Box<Core>]) -> bool {
371 let expect = IdleMap::new_n(self.chunks.len());
372 expect.set_all(idle_cores);
373
374 for (i, chunk) in expect.chunks.iter().enumerate() {
375 if chunk.load(Acquire) != self.chunks[i].load(Acquire) {
376 return false;
377 }
378 }
379
380 true
381 }
382}
383
384impl Snapshot {
385 pub(crate) fn new(idle: &Idle) -> Snapshot {
386 let chunks = vec![0; idle.idle_map.chunks.len()];
387 let mut ret = Snapshot { chunks };
388 ret.update(&idle.idle_map);
389 ret
390 }
391
392 fn update(&mut self, idle_map: &IdleMap) {
393 for i in 0..self.chunks.len() {
394 self.chunks[i] = idle_map.chunks[i].load(Acquire);
395 }
396 }
397
398 pub(super) fn is_idle(&self, index: usize) -> bool {
399 let (chunk, mask) = index_to_mask(index);
400 debug_assert!(
401 chunk < self.chunks.len(),
402 "index={}; chunks={}",
403 index,
404 self.chunks.len()
405 );
406 self.chunks[chunk] & mask == mask
407 }
408}
409
410fn num_chunks(max_cores: usize) -> usize {
411 (max_cores / BITS) + 1
412}
413
414fn index_to_mask(index: usize) -> (usize, usize) {
415 let mask = 1 << (index & BIT_MASK);
416 let chunk = index / BITS;
417
418 (chunk, mask)
419}
420
421fn num_active_workers(synced: &Synced) -> usize {
422 synced.available_cores.capacity() - synced.available_cores.len()
423}