tokio/runtime/scheduler/multi_thread_alt/
idle.rs

1//! Coordinates idling workers
2
3#![allow(dead_code)]
4
5use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
6use crate::loom::sync::MutexGuard;
7use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Handle, Shared};
8
9use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
10
11pub(super) struct Idle {
12    /// Number of searching cores
13    num_searching: AtomicUsize,
14
15    /// Number of idle cores
16    num_idle: AtomicUsize,
17
18    /// Map of idle cores
19    idle_map: IdleMap,
20
21    /// Used to catch false-negatives when waking workers
22    needs_searching: AtomicBool,
23
24    /// Total number of cores
25    num_cores: usize,
26}
27
28pub(super) struct IdleMap {
29    chunks: Vec<AtomicUsize>,
30}
31
32pub(super) struct Snapshot {
33    chunks: Vec<usize>,
34}
35
36/// Data synchronized by the scheduler mutex
37pub(super) struct Synced {
38    /// Worker IDs that are currently sleeping
39    sleepers: Vec<usize>,
40
41    /// Cores available for workers
42    available_cores: Vec<Box<Core>>,
43}
44
45impl Idle {
46    pub(super) fn new(cores: Vec<Box<Core>>, num_workers: usize) -> (Idle, Synced) {
47        let idle = Idle {
48            num_searching: AtomicUsize::new(0),
49            num_idle: AtomicUsize::new(cores.len()),
50            idle_map: IdleMap::new(&cores),
51            needs_searching: AtomicBool::new(false),
52            num_cores: cores.len(),
53        };
54
55        let synced = Synced {
56            sleepers: Vec::with_capacity(num_workers),
57            available_cores: cores,
58        };
59
60        (idle, synced)
61    }
62
63    pub(super) fn needs_searching(&self) -> bool {
64        self.needs_searching.load(Acquire)
65    }
66
67    pub(super) fn num_idle(&self, synced: &Synced) -> usize {
68        #[cfg(not(loom))]
69        debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire));
70        synced.available_cores.len()
71    }
72
73    pub(super) fn num_searching(&self) -> usize {
74        self.num_searching.load(Acquire)
75    }
76
77    pub(super) fn snapshot(&self, snapshot: &mut Snapshot) {
78        snapshot.update(&self.idle_map)
79    }
80
81    /// Try to acquire an available core
82    pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> {
83        let ret = synced.available_cores.pop();
84
85        if let Some(core) = &ret {
86            // Decrement the number of idle cores
87            let num_idle = self.num_idle.load(Acquire) - 1;
88            debug_assert_eq!(num_idle, synced.available_cores.len());
89            self.num_idle.store(num_idle, Release);
90
91            self.idle_map.unset(core.index);
92            debug_assert!(self.idle_map.matches(&synced.available_cores));
93        }
94
95        ret
96    }
97
98    /// We need at least one searching worker
99    pub(super) fn notify_local(&self, shared: &Shared) {
100        if self.num_searching.load(Acquire) != 0 {
101            // There already is a searching worker. Note, that this could be a
102            // false positive. However, because this method is called **from** a
103            // worker, we know that there is at least one worker currently
104            // awake, so the scheduler won't deadlock.
105            return;
106        }
107
108        if self.num_idle.load(Acquire) == 0 {
109            self.needs_searching.store(true, Release);
110            return;
111        }
112
113        // There aren't any searching workers. Try to initialize one
114        if self
115            .num_searching
116            .compare_exchange(0, 1, AcqRel, Acquire)
117            .is_err()
118        {
119            // Failing the compare_exchange means another thread concurrently
120            // launched a searching worker.
121            return;
122        }
123
124        super::counters::inc_num_unparks_local();
125
126        // Acquire the lock
127        let synced = shared.synced.lock();
128        self.notify_synced(synced, shared);
129    }
130
131    /// Notifies a single worker
132    pub(super) fn notify_remote(&self, synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
133        if synced.idle.sleepers.is_empty() {
134            self.needs_searching.store(true, Release);
135            return;
136        }
137
138        // We need to establish a stronger barrier than with `notify_local`
139        self.num_searching.fetch_add(1, AcqRel);
140
141        self.notify_synced(synced, shared);
142    }
143
144    /// Notify a worker while synced
145    fn notify_synced(&self, mut synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
146        // Find a sleeping worker
147        if let Some(worker) = synced.idle.sleepers.pop() {
148            // Find an available core
149            if let Some(mut core) = self.try_acquire_available_core(&mut synced.idle) {
150                debug_assert!(!core.is_searching);
151                core.is_searching = true;
152
153                // Assign the core to the worker
154                synced.assigned_cores[worker] = Some(core);
155
156                // Drop the lock before notifying the condvar.
157                drop(synced);
158
159                super::counters::inc_num_unparks_remote();
160
161                // Notify the worker
162                shared.condvars[worker].notify_one();
163                return;
164            } else {
165                synced.idle.sleepers.push(worker);
166            }
167        }
168
169        super::counters::inc_notify_no_core();
170
171        // Set the `needs_searching` flag, this happens *while* the lock is held.
172        self.needs_searching.store(true, Release);
173        self.num_searching.fetch_sub(1, Release);
174
175        // Explicit mutex guard drop to show that holding the guard to this
176        // point is significant. `needs_searching` and `num_searching` must be
177        // updated in the critical section.
178        drop(synced);
179    }
180
181    pub(super) fn notify_mult(
182        &self,
183        synced: &mut worker::Synced,
184        workers: &mut Vec<usize>,
185        num: usize,
186    ) {
187        debug_assert!(workers.is_empty());
188
189        for _ in 0..num {
190            if let Some(worker) = synced.idle.sleepers.pop() {
191                // TODO: can this be switched to use next_available_core?
192                if let Some(core) = synced.idle.available_cores.pop() {
193                    debug_assert!(!core.is_searching);
194
195                    self.idle_map.unset(core.index);
196
197                    synced.assigned_cores[worker] = Some(core);
198
199                    workers.push(worker);
200
201                    continue;
202                } else {
203                    synced.idle.sleepers.push(worker);
204                }
205            }
206
207            break;
208        }
209
210        if !workers.is_empty() {
211            debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
212            let num_idle = synced.idle.available_cores.len();
213            self.num_idle.store(num_idle, Release);
214        } else {
215            #[cfg(not(loom))]
216            debug_assert_eq!(
217                synced.idle.available_cores.len(),
218                self.num_idle.load(Acquire)
219            );
220            self.needs_searching.store(true, Release);
221        }
222    }
223
224    pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
225        // Wake every sleeping worker and assign a core to it. There may not be
226        // enough sleeping workers for all cores, but other workers will
227        // eventually find the cores and shut them down.
228        while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() {
229            let worker = synced.idle.sleepers.pop().unwrap();
230            let core = self.try_acquire_available_core(&mut synced.idle).unwrap();
231
232            synced.assigned_cores[worker] = Some(core);
233            shared.condvars[worker].notify_one();
234        }
235
236        debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
237
238        // Wake up any other workers
239        while let Some(index) = synced.idle.sleepers.pop() {
240            shared.condvars[index].notify_one();
241        }
242    }
243
244    pub(super) fn shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared) {
245        // If there are any remaining cores, shut them down here.
246        //
247        // This code is a bit convoluted to avoid lock-reentry.
248        while let Some(core) = {
249            let mut synced = shared.synced.lock();
250            self.try_acquire_available_core(&mut synced.idle)
251        } {
252            shared.shutdown_core(handle, core);
253        }
254    }
255
256    /// The worker releases the given core, making it available to other workers
257    /// that are waiting.
258    pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box<Core>) {
259        // The core should not be searching at this point
260        debug_assert!(!core.is_searching);
261
262        // Check that there are no pending tasks in the global queue
263        debug_assert!(synced.inject.is_empty());
264
265        let num_idle = synced.idle.available_cores.len();
266        #[cfg(not(loom))]
267        debug_assert_eq!(num_idle, self.num_idle.load(Acquire));
268
269        self.idle_map.set(core.index);
270
271        // Store the core in the list of available cores
272        synced.idle.available_cores.push(core);
273
274        debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
275
276        // Update `num_idle`
277        self.num_idle.store(num_idle + 1, Release);
278    }
279
280    pub(super) fn transition_worker_to_parked(&self, synced: &mut worker::Synced, index: usize) {
281        // Store the worker index in the list of sleepers
282        synced.idle.sleepers.push(index);
283
284        // The worker's assigned core slot should be empty
285        debug_assert!(synced.assigned_cores[index].is_none());
286    }
287
288    pub(super) fn try_transition_worker_to_searching(&self, core: &mut Core) {
289        debug_assert!(!core.is_searching);
290
291        let num_searching = self.num_searching.load(Acquire);
292        let num_idle = self.num_idle.load(Acquire);
293
294        if 2 * num_searching >= self.num_cores - num_idle {
295            return;
296        }
297
298        self.transition_worker_to_searching(core);
299    }
300
301    /// Needs to happen while synchronized in order to avoid races
302    pub(super) fn transition_worker_to_searching_if_needed(
303        &self,
304        _synced: &mut Synced,
305        core: &mut Core,
306    ) -> bool {
307        if self.needs_searching.load(Acquire) {
308            // Needs to be called while holding the lock
309            self.transition_worker_to_searching(core);
310            true
311        } else {
312            false
313        }
314    }
315
316    pub(super) fn transition_worker_to_searching(&self, core: &mut Core) {
317        core.is_searching = true;
318        self.num_searching.fetch_add(1, AcqRel);
319        self.needs_searching.store(false, Release);
320    }
321
322    /// A lightweight transition from searching -> running.
323    ///
324    /// Returns `true` if this is the final searching worker. The caller
325    /// **must** notify a new worker.
326    pub(super) fn transition_worker_from_searching(&self) -> bool {
327        let prev = self.num_searching.fetch_sub(1, AcqRel);
328        debug_assert!(prev > 0);
329
330        prev == 1
331    }
332}
333
334const BITS: usize = usize::BITS as usize;
335const BIT_MASK: usize = (usize::BITS - 1) as usize;
336
337impl IdleMap {
338    fn new(cores: &[Box<Core>]) -> IdleMap {
339        let ret = IdleMap::new_n(num_chunks(cores.len()));
340        ret.set_all(cores);
341
342        ret
343    }
344
345    fn new_n(n: usize) -> IdleMap {
346        let chunks = (0..n).map(|_| AtomicUsize::new(0)).collect();
347        IdleMap { chunks }
348    }
349
350    fn set(&self, index: usize) {
351        let (chunk, mask) = index_to_mask(index);
352        let prev = self.chunks[chunk].load(Acquire);
353        let next = prev | mask;
354        self.chunks[chunk].store(next, Release);
355    }
356
357    fn set_all(&self, cores: &[Box<Core>]) {
358        for core in cores {
359            self.set(core.index);
360        }
361    }
362
363    fn unset(&self, index: usize) {
364        let (chunk, mask) = index_to_mask(index);
365        let prev = self.chunks[chunk].load(Acquire);
366        let next = prev & !mask;
367        self.chunks[chunk].store(next, Release);
368    }
369
370    fn matches(&self, idle_cores: &[Box<Core>]) -> bool {
371        let expect = IdleMap::new_n(self.chunks.len());
372        expect.set_all(idle_cores);
373
374        for (i, chunk) in expect.chunks.iter().enumerate() {
375            if chunk.load(Acquire) != self.chunks[i].load(Acquire) {
376                return false;
377            }
378        }
379
380        true
381    }
382}
383
384impl Snapshot {
385    pub(crate) fn new(idle: &Idle) -> Snapshot {
386        let chunks = vec![0; idle.idle_map.chunks.len()];
387        let mut ret = Snapshot { chunks };
388        ret.update(&idle.idle_map);
389        ret
390    }
391
392    fn update(&mut self, idle_map: &IdleMap) {
393        for i in 0..self.chunks.len() {
394            self.chunks[i] = idle_map.chunks[i].load(Acquire);
395        }
396    }
397
398    pub(super) fn is_idle(&self, index: usize) -> bool {
399        let (chunk, mask) = index_to_mask(index);
400        debug_assert!(
401            chunk < self.chunks.len(),
402            "index={}; chunks={}",
403            index,
404            self.chunks.len()
405        );
406        self.chunks[chunk] & mask == mask
407    }
408}
409
410fn num_chunks(max_cores: usize) -> usize {
411    (max_cores / BITS) + 1
412}
413
414fn index_to_mask(index: usize) -> (usize, usize) {
415    let mask = 1 << (index & BIT_MASK);
416    let chunk = index / BITS;
417
418    (chunk, mask)
419}
420
421fn num_active_workers(synced: &Synced) -> usize {
422    synced.available_cores.capacity() - synced.available_cores.len()
423}