tokio/runtime/scheduler/multi_thread_alt/
worker.rs

1//! A scheduler is initialized with a fixed number of workers. Each worker is
2//! driven by a thread. Each worker has a "core" which contains data such as the
3//! run queue and other state. When `block_in_place` is called, the worker's
4//! "core" is handed off to a new thread allowing the scheduler to continue to
5//! make progress while the originating thread blocks.
6//!
7//! # Shutdown
8//!
9//! Shutting down the runtime involves the following steps:
10//!
11//!  1. The Shared::close method is called. This closes the inject queue and
12//!     `OwnedTasks` instance and wakes up all worker threads.
13//!
14//!  2. Each worker thread observes the close signal next time it runs
15//!     Core::maintenance by checking whether the inject queue is closed.
16//!     The `Core::is_shutdown` flag is set to true.
17//!
18//!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19//!     will keep removing tasks from `OwnedTasks` until it is empty. No new
20//!     tasks can be pushed to the `OwnedTasks` during or after this step as it
21//!     was closed in step 1.
22//!
23//!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24//!     shutdown. These calls will push their core to `Shared::shutdown_cores`,
25//!     and the last thread to push its core will finish the shutdown procedure.
26//!
27//!  6. The local run queue of each core is emptied, then the inject queue is
28//!     emptied.
29//!
30//! At this point, shutdown has completed. It is not possible for any of the
31//! collections to contain any tasks at this point, as each collection was
32//! closed first, then emptied afterwards.
33//!
34//! ## Spawns during shutdown
35//!
36//! When spawning tasks during shutdown, there are two cases:
37//!
38//!  * The spawner observes the `OwnedTasks` being open, and the inject queue is
39//!    closed.
40//!  * The spawner observes the `OwnedTasks` being closed and doesn't check the
41//!    inject queue.
42//!
43//! The first case can only happen if the `OwnedTasks::bind` call happens before
44//! or during step 1 of shutdown. In this case, the runtime will clean up the
45//! task in step 3 of shutdown.
46//!
47//! In the latter case, the task was not spawned and the task is immediately
48//! cancelled by the spawner.
49//!
50//! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51//! collection to have a closed bit. With a close bit on only the inject queue,
52//! spawning could run in to a situation where a task is successfully bound long
53//! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54//! the first spawning situation could result in the notification being pushed
55//! to the inject queue after step 6 of shutdown, which would leave a task in
56//! the inject queue indefinitely. This would be a ref-count cycle and a memory
57//! leak.
58
59use crate::loom::sync::{Arc, Condvar, Mutex, MutexGuard};
60use crate::runtime;
61use crate::runtime::driver::Driver;
62use crate::runtime::scheduler::multi_thread_alt::{
63    idle, queue, stats, Counters, Handle, Idle, Overflow, Stats, TraceStatus,
64};
65use crate::runtime::scheduler::{self, inject, Lock};
66use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
67use crate::runtime::{blocking, driver, task, Config, SchedulerMetrics, WorkerMetrics};
68use crate::runtime::{context, TaskHooks};
69use crate::task::coop;
70use crate::util::atomic_cell::AtomicCell;
71use crate::util::rand::{FastRand, RngSeedGenerator};
72
73use std::cell::{Cell, RefCell};
74use std::task::Waker;
75use std::time::Duration;
76use std::{cmp, thread};
77
78cfg_unstable_metrics! {
79    mod metrics;
80}
81
82mod taskdump_mock;
83
84/// A scheduler worker
85///
86/// Data is stack-allocated and never migrates threads
87pub(super) struct Worker {
88    /// Used to schedule bookkeeping tasks every so often.
89    tick: u32,
90
91    /// True if the scheduler is being shutdown
92    pub(super) is_shutdown: bool,
93
94    /// True if the scheduler is being traced
95    is_traced: bool,
96
97    /// Counter used to track when to poll from the local queue vs. the
98    /// injection queue
99    num_seq_local_queue_polls: u32,
100
101    /// How often to check the global queue
102    global_queue_interval: u32,
103
104    /// Used to collect a list of workers to notify
105    workers_to_notify: Vec<usize>,
106
107    /// Snapshot of idle core list. This helps speedup stealing
108    idle_snapshot: idle::Snapshot,
109
110    stats: stats::Ephemeral,
111}
112
113/// Core data
114///
115/// Data is heap-allocated and migrates threads.
116#[repr(align(128))]
117pub(super) struct Core {
118    /// Index holding this core's remote/shared state.
119    pub(super) index: usize,
120
121    lifo_slot: Option<Notified>,
122
123    /// The worker-local run queue.
124    run_queue: queue::Local<Arc<Handle>>,
125
126    /// True if the worker is currently searching for more work. Searching
127    /// involves attempting to steal from other workers.
128    pub(super) is_searching: bool,
129
130    /// Per-worker runtime stats
131    stats: Stats,
132
133    /// Fast random number generator.
134    rand: FastRand,
135}
136
137/// State shared across all workers
138pub(crate) struct Shared {
139    /// Per-core remote state.
140    remotes: Box<[Remote]>,
141
142    /// Global task queue used for:
143    ///  1. Submit work to the scheduler while **not** currently on a worker thread.
144    ///  2. Submit work to the scheduler when a worker run queue is saturated
145    pub(super) inject: inject::Shared<Arc<Handle>>,
146
147    /// Coordinates idle workers
148    idle: Idle,
149
150    /// Collection of all active tasks spawned onto this executor.
151    pub(super) owned: OwnedTasks<Arc<Handle>>,
152
153    /// Data synchronized by the scheduler mutex
154    pub(super) synced: Mutex<Synced>,
155
156    /// Power's Tokio's I/O, timers, etc... the responsibility of polling the
157    /// driver is shared across workers.
158    driver: AtomicCell<Driver>,
159
160    /// Condition variables used to unblock worker threads. Each worker thread
161    /// has its own `condvar` it waits on.
162    pub(super) condvars: Vec<Condvar>,
163
164    /// The number of cores that have observed the trace signal.
165    pub(super) trace_status: TraceStatus,
166
167    /// Scheduler configuration options
168    config: Config,
169
170    /// Collects metrics from the runtime.
171    pub(super) scheduler_metrics: SchedulerMetrics,
172
173    pub(super) worker_metrics: Box<[WorkerMetrics]>,
174
175    /// Only held to trigger some code on drop. This is used to get internal
176    /// runtime metrics that can be useful when doing performance
177    /// investigations. This does nothing (empty struct, no drop impl) unless
178    /// the `tokio_internal_mt_counters` `cfg` flag is set.
179    _counters: Counters,
180}
181
182/// Data synchronized by the scheduler mutex
183pub(crate) struct Synced {
184    /// When worker is notified, it is assigned a core. The core is placed here
185    /// until the worker wakes up to take it.
186    pub(super) assigned_cores: Vec<Option<Box<Core>>>,
187
188    /// Cores that have observed the shutdown signal
189    ///
190    /// The core is **not** placed back in the worker to avoid it from being
191    /// stolen by a thread that was spawned as part of `block_in_place`.
192    shutdown_cores: Vec<Box<Core>>,
193
194    /// The driver goes here when shutting down
195    shutdown_driver: Option<Box<Driver>>,
196
197    /// Synchronized state for `Idle`.
198    pub(super) idle: idle::Synced,
199
200    /// Synchronized state for `Inject`.
201    pub(crate) inject: inject::Synced,
202}
203
204/// Used to communicate with a worker from other threads.
205struct Remote {
206    /// When a task is scheduled from a worker, it is stored in this slot. The
207    /// worker will check this slot for a task **before** checking the run
208    /// queue. This effectively results in the **last** scheduled task to be run
209    /// next (LIFO). This is an optimization for improving locality which
210    /// benefits message passing patterns and helps to reduce latency.
211    // lifo_slot: Lifo,
212
213    /// Steals tasks from this worker.
214    pub(super) steal: queue::Steal<Arc<Handle>>,
215}
216
217/// Thread-local context
218pub(crate) struct Context {
219    // Current scheduler's handle
220    handle: Arc<Handle>,
221
222    /// Worker index
223    index: usize,
224
225    /// True when the LIFO slot is enabled
226    lifo_enabled: Cell<bool>,
227
228    /// Core data
229    core: RefCell<Option<Box<Core>>>,
230
231    /// Used to pass cores to other threads when `block_in_place` is called
232    handoff_core: Arc<AtomicCell<Core>>,
233
234    /// Tasks to wake after resource drivers are polled. This is mostly to
235    /// handle yielded tasks.
236    pub(crate) defer: RefCell<Vec<Notified>>,
237}
238
239/// Running a task may consume the core. If the core is still available when
240/// running the task completes, it is returned. Otherwise, the worker will need
241/// to stop processing.
242type RunResult = Result<Box<Core>, ()>;
243type NextTaskResult = Result<(Option<Notified>, Box<Core>), ()>;
244
245/// A task handle
246type Task = task::Task<Arc<Handle>>;
247
248/// A notified task handle
249type Notified = task::Notified<Arc<Handle>>;
250
251/// Value picked out of thin-air. Running the LIFO slot a handful of times
252/// seems sufficient to benefit from locality. More than 3 times probably is
253/// overweighing. The value can be tuned in the future with data that shows
254/// improvements.
255const MAX_LIFO_POLLS_PER_TICK: usize = 3;
256
257pub(super) fn create(
258    num_cores: usize,
259    driver: Driver,
260    driver_handle: driver::Handle,
261    blocking_spawner: blocking::Spawner,
262    seed_generator: RngSeedGenerator,
263    config: Config,
264) -> runtime::Handle {
265    let mut num_workers = num_cores;
266
267    // If the driver is enabled, we need an extra thread to handle polling the
268    // driver when all cores are busy.
269    if driver.is_enabled() {
270        num_workers += 1;
271    }
272
273    let mut cores = Vec::with_capacity(num_cores);
274    let mut remotes = Vec::with_capacity(num_cores);
275    // Worker metrics are actually core based
276    let mut worker_metrics = Vec::with_capacity(num_cores);
277
278    // Create the local queues
279    for i in 0..num_cores {
280        let (steal, run_queue) = queue::local(config.local_queue_capacity);
281
282        let metrics = WorkerMetrics::from_config(&config);
283        let stats = Stats::new(&metrics);
284
285        cores.push(Box::new(Core {
286            index: i,
287            lifo_slot: None,
288            run_queue,
289            is_searching: false,
290            stats,
291            rand: FastRand::from_seed(config.seed_generator.next_seed()),
292        }));
293
294        remotes.push(Remote {
295            steal,
296            // lifo_slot: Lifo::new(),
297        });
298        worker_metrics.push(metrics);
299    }
300
301    // Allocate num-cores + 1 workers, so one worker can handle the I/O driver,
302    // if needed.
303    let (idle, idle_synced) = Idle::new(cores, num_workers);
304    let (inject, inject_synced) = inject::Shared::new();
305
306    let handle = Arc::new(Handle {
307        task_hooks: TaskHooks::from_config(&config),
308        shared: Shared {
309            remotes: remotes.into_boxed_slice(),
310            inject,
311            idle,
312            owned: OwnedTasks::new(num_cores),
313            synced: Mutex::new(Synced {
314                assigned_cores: (0..num_workers).map(|_| None).collect(),
315                shutdown_cores: Vec::with_capacity(num_cores),
316                shutdown_driver: None,
317                idle: idle_synced,
318                inject: inject_synced,
319            }),
320            driver: AtomicCell::new(Some(Box::new(driver))),
321            condvars: (0..num_workers).map(|_| Condvar::new()).collect(),
322            trace_status: TraceStatus::new(num_cores),
323            config,
324            scheduler_metrics: SchedulerMetrics::new(),
325            worker_metrics: worker_metrics.into_boxed_slice(),
326            _counters: Counters,
327        },
328        driver: driver_handle,
329        blocking_spawner,
330        seed_generator,
331    });
332
333    let rt_handle = runtime::Handle {
334        inner: scheduler::Handle::MultiThreadAlt(handle),
335    };
336
337    // Eagerly start worker threads
338    for index in 0..num_workers {
339        let handle = rt_handle.inner.expect_multi_thread_alt();
340        let h2 = handle.clone();
341        let handoff_core = Arc::new(AtomicCell::new(None));
342
343        handle
344            .blocking_spawner
345            .spawn_blocking(&rt_handle, move || run(index, h2, handoff_core, false));
346    }
347
348    rt_handle
349}
350
351#[track_caller]
352pub(crate) fn block_in_place<F, R>(f: F) -> R
353where
354    F: FnOnce() -> R,
355{
356    // Try to steal the worker core back
357    struct Reset(coop::Budget);
358
359    impl Drop for Reset {
360        fn drop(&mut self) {
361            with_current(|maybe_cx| {
362                if let Some(cx) = maybe_cx {
363                    let core = cx.handoff_core.take();
364                    let mut cx_core = cx.core.borrow_mut();
365                    assert!(cx_core.is_none());
366                    *cx_core = core;
367
368                    // Reset the task budget as we are re-entering the
369                    // runtime.
370                    coop::set(self.0);
371                }
372            });
373        }
374    }
375
376    let mut had_entered = false;
377
378    let setup_result = with_current(|maybe_cx| {
379        match (
380            crate::runtime::context::current_enter_context(),
381            maybe_cx.is_some(),
382        ) {
383            (context::EnterRuntime::Entered { .. }, true) => {
384                // We are on a thread pool runtime thread, so we just need to
385                // set up blocking.
386                had_entered = true;
387            }
388            (
389                context::EnterRuntime::Entered {
390                    allow_block_in_place,
391                },
392                false,
393            ) => {
394                // We are on an executor, but _not_ on the thread pool.  That is
395                // _only_ okay if we are in a thread pool runtime's block_on
396                // method:
397                if allow_block_in_place {
398                    had_entered = true;
399                    return Ok(());
400                } else {
401                    // This probably means we are on the current_thread runtime or in a
402                    // LocalSet, where it is _not_ okay to block.
403                    return Err(
404                        "can call blocking only when running on the multi-threaded runtime",
405                    );
406                }
407            }
408            (context::EnterRuntime::NotEntered, true) => {
409                // This is a nested call to block_in_place (we already exited).
410                // All the necessary setup has already been done.
411                return Ok(());
412            }
413            (context::EnterRuntime::NotEntered, false) => {
414                // We are outside of the tokio runtime, so blocking is fine.
415                // We can also skip all of the thread pool blocking setup steps.
416                return Ok(());
417            }
418        }
419
420        let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
421
422        // Get the worker core. If none is set, then blocking is fine!
423        let core = match cx.core.borrow_mut().take() {
424            Some(core) => core,
425            None => return Ok(()),
426        };
427
428        // In order to block, the core must be sent to another thread for
429        // execution.
430        //
431        // First, move the core back into the worker's shared core slot.
432        cx.handoff_core.set(core);
433
434        // Next, clone the worker handle and send it to a new thread for
435        // processing.
436        //
437        // Once the blocking task is done executing, we will attempt to
438        // steal the core back.
439        let index = cx.index;
440        let handle = cx.handle.clone();
441        let handoff_core = cx.handoff_core.clone();
442        runtime::spawn_blocking(move || run(index, handle, handoff_core, true));
443        Ok(())
444    });
445
446    if let Err(panic_message) = setup_result {
447        panic!("{}", panic_message);
448    }
449
450    if had_entered {
451        // Unset the current task's budget. Blocking sections are not
452        // constrained by task budgets.
453        let _reset = Reset(coop::stop());
454
455        crate::runtime::context::exit_runtime(f)
456    } else {
457        f()
458    }
459}
460
461fn run(
462    index: usize,
463    handle: Arc<Handle>,
464    handoff_core: Arc<AtomicCell<Core>>,
465    blocking_in_place: bool,
466) {
467    struct AbortOnPanic;
468
469    impl Drop for AbortOnPanic {
470        fn drop(&mut self) {
471            if std::thread::panicking() {
472                eprintln!("worker thread panicking; aborting process");
473                std::process::abort();
474            }
475        }
476    }
477
478    // Catching panics on worker threads in tests is quite tricky. Instead, when
479    // debug assertions are enabled, we just abort the process.
480    #[cfg(debug_assertions)]
481    let _abort_on_panic = AbortOnPanic;
482
483    let num_workers = handle.shared.condvars.len();
484
485    let mut worker = Worker {
486        tick: 0,
487        num_seq_local_queue_polls: 0,
488        global_queue_interval: Stats::DEFAULT_GLOBAL_QUEUE_INTERVAL,
489        is_shutdown: false,
490        is_traced: false,
491        workers_to_notify: Vec::with_capacity(num_workers - 1),
492        idle_snapshot: idle::Snapshot::new(&handle.shared.idle),
493        stats: stats::Ephemeral::new(),
494    };
495
496    let sched_handle = scheduler::Handle::MultiThreadAlt(handle.clone());
497
498    crate::runtime::context::enter_runtime(&sched_handle, true, |_| {
499        // Set the worker context.
500        let cx = scheduler::Context::MultiThreadAlt(Context {
501            index,
502            lifo_enabled: Cell::new(!handle.shared.config.disable_lifo_slot),
503            handle,
504            core: RefCell::new(None),
505            handoff_core,
506            defer: RefCell::new(Vec::with_capacity(64)),
507        });
508
509        context::set_scheduler(&cx, || {
510            let cx = cx.expect_multi_thread_alt();
511
512            // Run the worker
513            let res = worker.run(&cx, blocking_in_place);
514            // `err` here signifies the core was lost, this is an expected end
515            // state for a worker.
516            debug_assert!(res.is_err());
517
518            // Check if there are any deferred tasks to notify. This can happen when
519            // the worker core is lost due to `block_in_place()` being called from
520            // within the task.
521            if !cx.defer.borrow().is_empty() {
522                worker.schedule_deferred_without_core(&cx, &mut cx.shared().synced.lock());
523            }
524        });
525    });
526}
527
528macro_rules! try_task {
529    ($e:expr) => {{
530        let (task, core) = $e?;
531        if task.is_some() {
532            return Ok((task, core));
533        }
534        core
535    }};
536}
537
538macro_rules! try_task_new_batch {
539    ($w:expr, $e:expr) => {{
540        let (task, mut core) = $e?;
541        if task.is_some() {
542            core.stats.start_processing_scheduled_tasks(&mut $w.stats);
543            return Ok((task, core));
544        }
545        core
546    }};
547}
548
549impl Worker {
550    fn run(&mut self, cx: &Context, blocking_in_place: bool) -> RunResult {
551        let (maybe_task, mut core) = {
552            if blocking_in_place {
553                if let Some(core) = cx.handoff_core.take() {
554                    (None, core)
555                } else {
556                    // Just shutdown
557                    return Err(());
558                }
559            } else {
560                let mut synced = cx.shared().synced.lock();
561
562                // First try to acquire an available core
563                if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
564                    // Try to poll a task from the global queue
565                    let maybe_task = cx.shared().next_remote_task_synced(&mut synced);
566                    (maybe_task, core)
567                } else {
568                    // block the thread to wait for a core to be assigned to us
569                    self.wait_for_core(cx, synced)?
570                }
571            }
572        };
573
574        cx.shared().worker_metrics[core.index].set_thread_id(thread::current().id());
575        core.stats.start_processing_scheduled_tasks(&mut self.stats);
576
577        if let Some(task) = maybe_task {
578            core = self.run_task(cx, core, task)?;
579        }
580
581        while !self.is_shutdown {
582            let (maybe_task, c) = self.next_task(cx, core)?;
583            core = c;
584
585            if let Some(task) = maybe_task {
586                core = self.run_task(cx, core, task)?;
587            } else {
588                // The only reason to get `None` from `next_task` is we have
589                // entered the shutdown phase.
590                assert!(self.is_shutdown);
591                break;
592            }
593        }
594
595        cx.shared().shutdown_core(&cx.handle, core);
596
597        // It is possible that tasks wake others during drop, so we need to
598        // clear the defer list.
599        self.shutdown_clear_defer(cx);
600
601        Err(())
602    }
603
604    // Try to acquire an available core, but do not block the thread
605    fn try_acquire_available_core(
606        &mut self,
607        cx: &Context,
608        synced: &mut Synced,
609    ) -> Option<Box<Core>> {
610        if let Some(mut core) = cx
611            .shared()
612            .idle
613            .try_acquire_available_core(&mut synced.idle)
614        {
615            self.reset_acquired_core(cx, synced, &mut core);
616            Some(core)
617        } else {
618            None
619        }
620    }
621
622    // Block the current thread, waiting for an available core
623    fn wait_for_core(
624        &mut self,
625        cx: &Context,
626        mut synced: MutexGuard<'_, Synced>,
627    ) -> NextTaskResult {
628        if cx.shared().idle.needs_searching() {
629            if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
630                cx.shared().idle.transition_worker_to_searching(&mut core);
631                return Ok((None, core));
632            }
633        }
634
635        cx.shared()
636            .idle
637            .transition_worker_to_parked(&mut synced, cx.index);
638
639        // Wait until a core is available, then exit the loop.
640        let mut core = loop {
641            if let Some(core) = synced.assigned_cores[cx.index].take() {
642                break core;
643            }
644
645            // If shutting down, abort
646            if cx.shared().inject.is_closed(&synced.inject) {
647                self.shutdown_clear_defer(cx);
648                return Err(());
649            }
650
651            synced = cx.shared().condvars[cx.index].wait(synced).unwrap();
652        };
653
654        self.reset_acquired_core(cx, &mut synced, &mut core);
655
656        if self.is_shutdown {
657            // Currently shutting down, don't do any more work
658            return Ok((None, core));
659        }
660
661        let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
662        let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n);
663
664        core.stats.unparked();
665        self.flush_metrics(cx, &mut core);
666
667        Ok((maybe_task, core))
668    }
669
670    /// Ensure core's state is set correctly for the worker to start using.
671    fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) {
672        self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config);
673
674        // Reset `lifo_enabled` here in case the core was previously stolen from
675        // a task that had the LIFO slot disabled.
676        self.reset_lifo_enabled(cx);
677
678        // At this point, the local queue should be empty
679        #[cfg(not(loom))]
680        debug_assert!(core.run_queue.is_empty());
681
682        // Update shutdown state while locked
683        self.update_global_flags(cx, synced);
684    }
685
686    /// Finds the next task to run, this could be from a queue or stealing. If
687    /// none are available, the thread sleeps and tries again.
688    fn next_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
689        self.assert_lifo_enabled_is_correct(cx);
690
691        if self.is_traced {
692            core = cx.handle.trace_core(core);
693        }
694
695        // Increment the tick
696        self.tick = self.tick.wrapping_add(1);
697
698        // Runs maintenance every so often. When maintenance is run, the
699        // driver is checked, which may result in a task being found.
700        core = try_task!(self.maybe_maintenance(&cx, core));
701
702        // Check the LIFO slot, local run queue, and the injection queue for
703        // a notified task.
704        core = try_task!(self.next_notified_task(cx, core));
705
706        // We consumed all work in the queues and will start searching for work.
707        core.stats.end_processing_scheduled_tasks(&mut self.stats);
708
709        super::counters::inc_num_no_local_work();
710
711        if !cx.defer.borrow().is_empty() {
712            // We are deferring tasks, so poll the resource driver and schedule
713            // the deferred tasks.
714            try_task_new_batch!(self, self.park_yield(cx, core));
715
716            panic!("what happened to the deferred tasks? 🤔");
717        }
718
719        while !self.is_shutdown {
720            // Search for more work, this involves trying to poll the resource
721            // driver, steal from other workers, and check the global queue
722            // again.
723            core = try_task_new_batch!(self, self.search_for_work(cx, core));
724
725            debug_assert!(cx.defer.borrow().is_empty());
726            core = try_task_new_batch!(self, self.park(cx, core));
727        }
728
729        // Shutting down, drop any deferred tasks
730        self.shutdown_clear_defer(cx);
731
732        Ok((None, core))
733    }
734
735    fn next_notified_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
736        self.num_seq_local_queue_polls += 1;
737
738        if self.num_seq_local_queue_polls % self.global_queue_interval == 0 {
739            super::counters::inc_global_queue_interval();
740
741            self.num_seq_local_queue_polls = 0;
742
743            // Update the global queue interval, if needed
744            self.tune_global_queue_interval(cx, &mut core);
745
746            if let Some(task) = self.next_remote_task(cx) {
747                return Ok((Some(task), core));
748            }
749        }
750
751        if let Some(task) = core.next_local_task() {
752            return Ok((Some(task), core));
753        }
754
755        self.next_remote_task_batch(cx, core)
756    }
757
758    fn next_remote_task(&self, cx: &Context) -> Option<Notified> {
759        if cx.shared().inject.is_empty() {
760            return None;
761        }
762
763        let mut synced = cx.shared().synced.lock();
764        cx.shared().next_remote_task_synced(&mut synced)
765    }
766
767    fn next_remote_task_batch(&self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
768        if cx.shared().inject.is_empty() {
769            return Ok((None, core));
770        }
771
772        // Other threads can only **remove** tasks from the current worker's
773        // `run_queue`. So, we can be confident that by the time we call
774        // `run_queue.push_back` below, there will be *at least* `cap`
775        // available slots in the queue.
776        let cap = usize::min(
777            core.run_queue.remaining_slots(),
778            usize::max(core.run_queue.max_capacity() / 2, 1),
779        );
780
781        let mut synced = cx.shared().synced.lock();
782        let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, cap);
783        Ok((maybe_task, core))
784    }
785
786    fn next_remote_task_batch_synced(
787        &self,
788        cx: &Context,
789        synced: &mut Synced,
790        core: &mut Core,
791        max: usize,
792    ) -> Option<Notified> {
793        super::counters::inc_num_remote_batch();
794
795        // The worker is currently idle, pull a batch of work from the
796        // injection queue. We don't want to pull *all* the work so other
797        // workers can also get some.
798        let n = if core.is_searching {
799            cx.shared().inject.len() / cx.shared().idle.num_searching() + 1
800        } else {
801            cx.shared().inject.len() / cx.shared().remotes.len() + 1
802        };
803
804        let n = usize::min(n, max) + 1;
805
806        // safety: passing in the correct `inject::Synced`.
807        let mut tasks = unsafe { cx.shared().inject.pop_n(&mut synced.inject, n) };
808
809        // Pop the first task to return immediately
810        let ret = tasks.next();
811
812        // Push the rest of the on the run queue
813        core.run_queue.push_back(tasks);
814
815        ret
816    }
817
818    /// Function responsible for stealing tasks from another worker
819    ///
820    /// Note: Only if less than half the workers are searching for tasks to steal
821    /// a new worker will actually try to steal. The idea is to make sure not all
822    /// workers will be trying to steal at the same time.
823    fn search_for_work(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
824        #[cfg(not(loom))]
825        const ROUNDS: usize = 4;
826
827        #[cfg(loom)]
828        const ROUNDS: usize = 1;
829
830        debug_assert!(core.lifo_slot.is_none());
831        #[cfg(not(loom))]
832        debug_assert!(core.run_queue.is_empty());
833
834        if !core.run_queue.can_steal() {
835            return Ok((None, core));
836        }
837
838        if !self.transition_to_searching(cx, &mut core) {
839            return Ok((None, core));
840        }
841
842        // core = try_task!(self, self.poll_driver(cx, core));
843
844        // Get a snapshot of which workers are idle
845        cx.shared().idle.snapshot(&mut self.idle_snapshot);
846
847        let num = cx.shared().remotes.len();
848
849        for i in 0..ROUNDS {
850            // Start from a random worker
851            let start = core.rand.fastrand_n(num as u32) as usize;
852
853            if let Some(task) = self.steal_one_round(cx, &mut core, start) {
854                return Ok((Some(task), core));
855            }
856
857            core = try_task!(self.next_remote_task_batch(cx, core));
858
859            if i > 0 {
860                super::counters::inc_num_spin_stall();
861                std::thread::sleep(std::time::Duration::from_micros(i as u64));
862            }
863        }
864
865        Ok((None, core))
866    }
867
868    fn steal_one_round(&self, cx: &Context, core: &mut Core, start: usize) -> Option<Notified> {
869        let num = cx.shared().remotes.len();
870
871        for i in 0..num {
872            let i = (start + i) % num;
873
874            // Don't steal from ourself! We know we don't have work.
875            if i == core.index {
876                continue;
877            }
878
879            // If the core is currently idle, then there is nothing to steal.
880            if self.idle_snapshot.is_idle(i) {
881                continue;
882            }
883
884            let target = &cx.shared().remotes[i];
885
886            if let Some(task) = target
887                .steal
888                .steal_into(&mut core.run_queue, &mut core.stats)
889            {
890                return Some(task);
891            }
892        }
893
894        None
895    }
896
897    fn run_task(&mut self, cx: &Context, mut core: Box<Core>, task: Notified) -> RunResult {
898        let task = cx.shared().owned.assert_owner(task);
899
900        // Make sure the worker is not in the **searching** state. This enables
901        // another idle worker to try to steal work.
902        if self.transition_from_searching(cx, &mut core) {
903            super::counters::inc_num_relay_search();
904            cx.shared().notify_parked_local();
905        }
906
907        self.assert_lifo_enabled_is_correct(cx);
908
909        // Measure the poll start time. Note that we may end up polling other
910        // tasks under this measurement. In this case, the tasks came from the
911        // LIFO slot and are considered part of the current task for scheduling
912        // purposes. These tasks inherent the "parent"'s limits.
913        core.stats.start_poll(&mut self.stats);
914
915        // Make the core available to the runtime context
916        *cx.core.borrow_mut() = Some(core);
917
918        // Run the task
919        coop::budget(|| {
920            super::counters::inc_num_polls();
921            task.run();
922            let mut lifo_polls = 0;
923
924            // As long as there is budget remaining and a task exists in the
925            // `lifo_slot`, then keep running.
926            loop {
927                // Check if we still have the core. If not, the core was stolen
928                // by another worker.
929                let mut core = match cx.core.borrow_mut().take() {
930                    Some(core) => core,
931                    None => {
932                        // In this case, we cannot call `reset_lifo_enabled()`
933                        // because the core was stolen. The stealer will handle
934                        // that at the top of `Context::run`
935                        return Err(());
936                    }
937                };
938
939                // Check for a task in the LIFO slot
940                let task = match core.next_lifo_task() {
941                    Some(task) => task,
942                    None => {
943                        self.reset_lifo_enabled(cx);
944                        core.stats.end_poll();
945                        return Ok(core);
946                    }
947                };
948
949                if !coop::has_budget_remaining() {
950                    core.stats.end_poll();
951
952                    // Not enough budget left to run the LIFO task, push it to
953                    // the back of the queue and return.
954                    core.run_queue
955                        .push_back_or_overflow(task, cx.shared(), &mut core.stats);
956                    // If we hit this point, the LIFO slot should be enabled.
957                    // There is no need to reset it.
958                    debug_assert!(cx.lifo_enabled.get());
959                    return Ok(core);
960                }
961
962                // Track that we are about to run a task from the LIFO slot.
963                lifo_polls += 1;
964                super::counters::inc_lifo_schedules();
965
966                // Disable the LIFO slot if we reach our limit
967                //
968                // In ping-ping style workloads where task A notifies task B,
969                // which notifies task A again, continuously prioritizing the
970                // LIFO slot can cause starvation as these two tasks will
971                // repeatedly schedule the other. To mitigate this, we limit the
972                // number of times the LIFO slot is prioritized.
973                if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
974                    cx.lifo_enabled.set(false);
975                    super::counters::inc_lifo_capped();
976                }
977
978                // Run the LIFO task, then loop
979                *cx.core.borrow_mut() = Some(core);
980                let task = cx.shared().owned.assert_owner(task);
981                super::counters::inc_num_lifo_polls();
982                task.run();
983            }
984        })
985    }
986
987    fn schedule_deferred_with_core<'a>(
988        &mut self,
989        cx: &'a Context,
990        mut core: Box<Core>,
991        synced: impl FnOnce() -> MutexGuard<'a, Synced>,
992    ) -> NextTaskResult {
993        let mut defer = cx.defer.borrow_mut();
994
995        // Grab a task to run next
996        let task = defer.pop();
997
998        if task.is_none() {
999            return Ok((None, core));
1000        }
1001
1002        if !defer.is_empty() {
1003            let mut synced = synced();
1004
1005            // Number of tasks we want to try to spread across idle workers
1006            let num_fanout = cmp::min(defer.len(), cx.shared().idle.num_idle(&synced.idle));
1007
1008            // Cap the number of threads woken up at one time. This is to limit
1009            // the number of no-op wakes and reduce mutext contention.
1010            //
1011            // This number was picked after some basic benchmarks, but it can
1012            // probably be tuned using the mean poll time value (slower task
1013            // polls can leverage more woken workers).
1014            let num_fanout = cmp::min(2, num_fanout);
1015
1016            if num_fanout > 0 {
1017                cx.shared()
1018                    .push_remote_task_batch_synced(&mut synced, defer.drain(..num_fanout));
1019
1020                cx.shared()
1021                    .idle
1022                    .notify_mult(&mut synced, &mut self.workers_to_notify, num_fanout);
1023            }
1024
1025            // Do not run the task while holding the lock...
1026            drop(synced);
1027        }
1028
1029        // Notify any workers
1030        for worker in self.workers_to_notify.drain(..) {
1031            cx.shared().condvars[worker].notify_one()
1032        }
1033
1034        if !defer.is_empty() {
1035            // Push the rest of the tasks on the local queue
1036            for task in defer.drain(..) {
1037                core.run_queue
1038                    .push_back_or_overflow(task, cx.shared(), &mut core.stats);
1039            }
1040
1041            cx.shared().notify_parked_local();
1042        }
1043
1044        Ok((task, core))
1045    }
1046
1047    fn schedule_deferred_without_core<'a>(&mut self, cx: &Context, synced: &mut Synced) {
1048        let mut defer = cx.defer.borrow_mut();
1049        let num = defer.len();
1050
1051        if num > 0 {
1052            // Push all tasks to the injection queue
1053            cx.shared()
1054                .push_remote_task_batch_synced(synced, defer.drain(..));
1055
1056            debug_assert!(self.workers_to_notify.is_empty());
1057
1058            // Notify workers
1059            cx.shared()
1060                .idle
1061                .notify_mult(synced, &mut self.workers_to_notify, num);
1062
1063            // Notify any workers
1064            for worker in self.workers_to_notify.drain(..) {
1065                cx.shared().condvars[worker].notify_one()
1066            }
1067        }
1068    }
1069
1070    fn maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1071        if self.tick % cx.shared().config.event_interval == 0 {
1072            super::counters::inc_num_maintenance();
1073
1074            core.stats.end_processing_scheduled_tasks(&mut self.stats);
1075
1076            // Run regularly scheduled maintenance
1077            core = try_task_new_batch!(self, self.park_yield(cx, core));
1078
1079            core.stats.start_processing_scheduled_tasks(&mut self.stats);
1080        }
1081
1082        Ok((None, core))
1083    }
1084
1085    fn flush_metrics(&self, cx: &Context, core: &mut Core) {
1086        core.stats.submit(&cx.shared().worker_metrics[core.index]);
1087    }
1088
1089    fn update_global_flags(&mut self, cx: &Context, synced: &mut Synced) {
1090        if !self.is_shutdown {
1091            self.is_shutdown = cx.shared().inject.is_closed(&synced.inject);
1092        }
1093
1094        if !self.is_traced {
1095            self.is_traced = cx.shared().trace_status.trace_requested();
1096        }
1097    }
1098
1099    fn park_yield(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult {
1100        // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
1101        // to run without actually putting the thread to sleep.
1102        if let Some(mut driver) = cx.shared().driver.take() {
1103            driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
1104
1105            cx.shared().driver.set(driver);
1106        }
1107
1108        // If there are more I/O events, schedule them.
1109        let (maybe_task, mut core) =
1110            self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())?;
1111
1112        self.flush_metrics(cx, &mut core);
1113        self.update_global_flags(cx, &mut cx.shared().synced.lock());
1114
1115        Ok((maybe_task, core))
1116    }
1117
1118    /*
1119    fn poll_driver(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult {
1120        // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
1121        // to run without actually putting the thread to sleep.
1122        if let Some(mut driver) = cx.shared().driver.take() {
1123            driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
1124
1125            cx.shared().driver.set(driver);
1126
1127            // If there are more I/O events, schedule them.
1128            self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())
1129        } else {
1130            Ok((None, core))
1131        }
1132    }
1133    */
1134
1135    fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1136        if let Some(f) = &cx.shared().config.before_park {
1137            f();
1138        }
1139
1140        if self.can_transition_to_parked(&mut core) {
1141            debug_assert!(!self.is_shutdown);
1142            debug_assert!(!self.is_traced);
1143
1144            core = try_task!(self.do_park(cx, core));
1145        }
1146
1147        if let Some(f) = &cx.shared().config.after_unpark {
1148            f();
1149        }
1150
1151        Ok((None, core))
1152    }
1153
1154    fn do_park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1155        let was_searching = core.is_searching;
1156
1157        // Acquire the lock
1158        let mut synced = cx.shared().synced.lock();
1159
1160        // The local queue should be empty at this point
1161        #[cfg(not(loom))]
1162        debug_assert!(core.run_queue.is_empty());
1163
1164        // Try one last time to get tasks
1165        let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
1166        if let Some(task) = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n) {
1167            return Ok((Some(task), core));
1168        }
1169
1170        if !was_searching {
1171            if cx
1172                .shared()
1173                .idle
1174                .transition_worker_to_searching_if_needed(&mut synced.idle, &mut core)
1175            {
1176                // Skip parking, go back to searching
1177                return Ok((None, core));
1178            }
1179        }
1180
1181        super::counters::inc_num_parks();
1182        core.stats.about_to_park();
1183        // Flush metrics to the runtime metrics aggregator
1184        self.flush_metrics(cx, &mut core);
1185
1186        // If the runtime is shutdown, skip parking
1187        self.update_global_flags(cx, &mut synced);
1188
1189        if self.is_shutdown {
1190            return Ok((None, core));
1191        }
1192
1193        // Release the core
1194        core.is_searching = false;
1195        cx.shared().idle.release_core(&mut synced, core);
1196
1197        drop(synced);
1198
1199        if was_searching {
1200            if cx.shared().idle.transition_worker_from_searching() {
1201                // cx.shared().idle.snapshot(&mut self.idle_snapshot);
1202                // We were the last searching worker, we need to do one last check
1203                for i in 0..cx.shared().remotes.len() {
1204                    if !cx.shared().remotes[i].steal.is_empty() {
1205                        let mut synced = cx.shared().synced.lock();
1206
1207                        // Try to get a core
1208                        if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
1209                            cx.shared().idle.transition_worker_to_searching(&mut core);
1210                            return Ok((None, core));
1211                        } else {
1212                            // Fall back to the park routine
1213                            break;
1214                        }
1215                    }
1216                }
1217            }
1218        }
1219
1220        if let Some(mut driver) = cx.shared().take_driver() {
1221            // Wait for driver events
1222            driver.park(&cx.handle.driver);
1223
1224            synced = cx.shared().synced.lock();
1225
1226            if cx.shared().inject.is_closed(&mut synced.inject) {
1227                synced.shutdown_driver = Some(driver);
1228                self.shutdown_clear_defer(cx);
1229                cx.shared().shutdown_finalize(&cx.handle, &mut synced);
1230                return Err(());
1231            }
1232
1233            // Put the driver back
1234            cx.shared().driver.set(driver);
1235
1236            // Try to acquire an available core to schedule I/O events
1237            if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
1238                // This may result in a task being run
1239                self.schedule_deferred_with_core(cx, core, move || synced)
1240            } else {
1241                // Schedule any deferred tasks
1242                self.schedule_deferred_without_core(cx, &mut synced);
1243
1244                // Wait for a core.
1245                self.wait_for_core(cx, synced)
1246            }
1247        } else {
1248            synced = cx.shared().synced.lock();
1249
1250            // Wait for a core to be assigned to us
1251            self.wait_for_core(cx, synced)
1252        }
1253    }
1254
1255    fn transition_to_searching(&self, cx: &Context, core: &mut Core) -> bool {
1256        if !core.is_searching {
1257            cx.shared().idle.try_transition_worker_to_searching(core);
1258        }
1259
1260        core.is_searching
1261    }
1262
1263    /// Returns `true` if another worker must be notified
1264    fn transition_from_searching(&self, cx: &Context, core: &mut Core) -> bool {
1265        if !core.is_searching {
1266            return false;
1267        }
1268
1269        core.is_searching = false;
1270        cx.shared().idle.transition_worker_from_searching()
1271    }
1272
1273    fn can_transition_to_parked(&self, core: &mut Core) -> bool {
1274        !self.has_tasks(core) && !self.is_shutdown && !self.is_traced
1275    }
1276
1277    fn has_tasks(&self, core: &Core) -> bool {
1278        core.lifo_slot.is_some() || !core.run_queue.is_empty()
1279    }
1280
1281    fn reset_lifo_enabled(&self, cx: &Context) {
1282        cx.lifo_enabled
1283            .set(!cx.handle.shared.config.disable_lifo_slot);
1284    }
1285
1286    fn assert_lifo_enabled_is_correct(&self, cx: &Context) {
1287        debug_assert_eq!(
1288            cx.lifo_enabled.get(),
1289            !cx.handle.shared.config.disable_lifo_slot
1290        );
1291    }
1292
1293    fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) {
1294        let next = core.stats.tuned_global_queue_interval(&cx.shared().config);
1295
1296        // Smooth out jitter
1297        if u32::abs_diff(self.global_queue_interval, next) > 2 {
1298            self.global_queue_interval = next;
1299        }
1300    }
1301
1302    fn shutdown_clear_defer(&self, cx: &Context) {
1303        let mut defer = cx.defer.borrow_mut();
1304
1305        for task in defer.drain(..) {
1306            drop(task);
1307        }
1308    }
1309}
1310
1311impl Context {
1312    pub(crate) fn defer(&self, waker: &Waker) {
1313        // TODO: refactor defer across all runtimes
1314        waker.wake_by_ref();
1315    }
1316
1317    fn shared(&self) -> &Shared {
1318        &self.handle.shared
1319    }
1320
1321    #[cfg_attr(not(feature = "time"), allow(dead_code))]
1322    pub(crate) fn get_worker_index(&self) -> usize {
1323        self.index
1324    }
1325}
1326
1327impl Core {
1328    fn next_local_task(&mut self) -> Option<Notified> {
1329        self.next_lifo_task().or_else(|| self.run_queue.pop())
1330    }
1331
1332    fn next_lifo_task(&mut self) -> Option<Notified> {
1333        self.lifo_slot.take()
1334    }
1335}
1336
1337impl Shared {
1338    fn next_remote_task_synced(&self, synced: &mut Synced) -> Option<Notified> {
1339        // safety: we only have access to a valid `Synced` in this file.
1340        unsafe { self.inject.pop(&mut synced.inject) }
1341    }
1342
1343    pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1344        use std::ptr;
1345
1346        with_current(|maybe_cx| {
1347            if let Some(cx) = maybe_cx {
1348                // Make sure the task is part of the **current** scheduler.
1349                if ptr::eq(self, &cx.handle.shared) {
1350                    // And the current thread still holds a core
1351                    if let Some(core) = cx.core.borrow_mut().as_mut() {
1352                        if is_yield {
1353                            cx.defer.borrow_mut().push(task);
1354                        } else {
1355                            self.schedule_local(cx, core, task);
1356                        }
1357                    } else {
1358                        // This can happen if either the core was stolen
1359                        // (`block_in_place`) or the notification happens from
1360                        // the driver.
1361                        cx.defer.borrow_mut().push(task);
1362                    }
1363                    return;
1364                }
1365            }
1366
1367            // Otherwise, use the inject queue.
1368            self.schedule_remote(task);
1369        })
1370    }
1371
1372    fn schedule_local(&self, cx: &Context, core: &mut Core, task: Notified) {
1373        core.stats.inc_local_schedule_count();
1374
1375        if cx.lifo_enabled.get() {
1376            // Push to the LIFO slot
1377            let prev = std::mem::replace(&mut core.lifo_slot, Some(task));
1378            // let prev = cx.shared().remotes[core.index].lifo_slot.swap_local(task);
1379
1380            if let Some(prev) = prev {
1381                core.run_queue
1382                    .push_back_or_overflow(prev, self, &mut core.stats);
1383            } else {
1384                return;
1385            }
1386        } else {
1387            core.run_queue
1388                .push_back_or_overflow(task, self, &mut core.stats);
1389        }
1390
1391        self.notify_parked_local();
1392    }
1393
1394    fn notify_parked_local(&self) {
1395        super::counters::inc_num_inc_notify_local();
1396        self.idle.notify_local(self);
1397    }
1398
1399    fn schedule_remote(&self, task: Notified) {
1400        super::counters::inc_num_notify_remote();
1401        self.scheduler_metrics.inc_remote_schedule_count();
1402
1403        let mut synced = self.synced.lock();
1404        // Push the task in the
1405        self.push_remote_task(&mut synced, task);
1406
1407        // Notify a worker. The mutex is passed in and will be released as part
1408        // of the method call.
1409        self.idle.notify_remote(synced, self);
1410    }
1411
1412    pub(super) fn close(&self, handle: &Handle) {
1413        {
1414            let mut synced = self.synced.lock();
1415
1416            if let Some(driver) = self.driver.take() {
1417                synced.shutdown_driver = Some(driver);
1418            }
1419
1420            if !self.inject.close(&mut synced.inject) {
1421                return;
1422            }
1423
1424            // Set the shutdown flag on all available cores
1425            self.idle.shutdown(&mut synced, self);
1426        }
1427
1428        // Any unassigned cores need to be shutdown, but we have to first drop
1429        // the lock
1430        self.idle.shutdown_unassigned_cores(handle, self);
1431    }
1432
1433    fn push_remote_task(&self, synced: &mut Synced, task: Notified) {
1434        // safety: passing in correct `idle::Synced`
1435        unsafe {
1436            self.inject.push(&mut synced.inject, task);
1437        }
1438    }
1439
1440    fn push_remote_task_batch<I>(&self, iter: I)
1441    where
1442        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1443    {
1444        unsafe {
1445            self.inject.push_batch(self, iter);
1446        }
1447    }
1448
1449    fn push_remote_task_batch_synced<I>(&self, synced: &mut Synced, iter: I)
1450    where
1451        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1452    {
1453        unsafe {
1454            self.inject.push_batch(&mut synced.inject, iter);
1455        }
1456    }
1457
1458    fn take_driver(&self) -> Option<Box<Driver>> {
1459        if !self.driver_enabled() {
1460            return None;
1461        }
1462
1463        self.driver.take()
1464    }
1465
1466    fn driver_enabled(&self) -> bool {
1467        self.condvars.len() > self.remotes.len()
1468    }
1469
1470    pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
1471        // Start from a random inner list
1472        let start = core.rand.fastrand_n(self.owned.get_shard_size() as u32);
1473        self.owned.close_and_shutdown_all(start as usize);
1474
1475        core.stats.submit(&self.worker_metrics[core.index]);
1476
1477        let mut synced = self.synced.lock();
1478        synced.shutdown_cores.push(core);
1479
1480        self.shutdown_finalize(handle, &mut synced);
1481    }
1482
1483    pub(super) fn shutdown_finalize(&self, handle: &Handle, synced: &mut Synced) {
1484        // Wait for all cores
1485        if synced.shutdown_cores.len() != self.remotes.len() {
1486            return;
1487        }
1488
1489        let driver = synced.shutdown_driver.take();
1490
1491        if self.driver_enabled() && driver.is_none() {
1492            return;
1493        }
1494
1495        debug_assert!(self.owned.is_empty());
1496
1497        for mut core in synced.shutdown_cores.drain(..) {
1498            // Drain tasks from the local queue
1499            while core.next_local_task().is_some() {}
1500        }
1501
1502        // Shutdown the driver
1503        if let Some(mut driver) = driver {
1504            driver.shutdown(&handle.driver);
1505        }
1506
1507        // Drain the injection queue
1508        //
1509        // We already shut down every task, so we can simply drop the tasks. We
1510        // cannot call `next_remote_task()` because we already hold the lock.
1511        //
1512        // safety: passing in correct `idle::Synced`
1513        while let Some(task) = self.next_remote_task_synced(synced) {
1514            drop(task);
1515        }
1516    }
1517}
1518
1519impl Overflow<Arc<Handle>> for Shared {
1520    fn push(&self, task: task::Notified<Arc<Handle>>) {
1521        self.push_remote_task(&mut self.synced.lock(), task);
1522    }
1523
1524    fn push_batch<I>(&self, iter: I)
1525    where
1526        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1527    {
1528        self.push_remote_task_batch(iter)
1529    }
1530}
1531
1532impl<'a> Lock<inject::Synced> for &'a Shared {
1533    type Handle = SyncedGuard<'a>;
1534
1535    fn lock(self) -> Self::Handle {
1536        SyncedGuard {
1537            lock: self.synced.lock(),
1538        }
1539    }
1540}
1541
1542impl<'a> Lock<Synced> for &'a Shared {
1543    type Handle = SyncedGuard<'a>;
1544
1545    fn lock(self) -> Self::Handle {
1546        SyncedGuard {
1547            lock: self.synced.lock(),
1548        }
1549    }
1550}
1551
1552impl task::Schedule for Arc<Handle> {
1553    fn release(&self, task: &Task) -> Option<Task> {
1554        self.shared.owned.remove(task)
1555    }
1556
1557    fn schedule(&self, task: Notified) {
1558        self.shared.schedule_task(task, false);
1559    }
1560
1561    fn hooks(&self) -> TaskHarnessScheduleHooks {
1562        TaskHarnessScheduleHooks {
1563            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
1564        }
1565    }
1566
1567    fn yield_now(&self, task: Notified) {
1568        self.shared.schedule_task(task, true);
1569    }
1570}
1571
1572impl AsMut<Synced> for Synced {
1573    fn as_mut(&mut self) -> &mut Synced {
1574        self
1575    }
1576}
1577
1578pub(crate) struct SyncedGuard<'a> {
1579    lock: crate::loom::sync::MutexGuard<'a, Synced>,
1580}
1581
1582impl<'a> AsMut<inject::Synced> for SyncedGuard<'a> {
1583    fn as_mut(&mut self) -> &mut inject::Synced {
1584        &mut self.lock.inject
1585    }
1586}
1587
1588impl<'a> AsMut<Synced> for SyncedGuard<'a> {
1589    fn as_mut(&mut self) -> &mut Synced {
1590        &mut self.lock
1591    }
1592}
1593
1594#[track_caller]
1595fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1596    use scheduler::Context::MultiThreadAlt;
1597
1598    context::with_scheduler(|ctx| match ctx {
1599        Some(MultiThreadAlt(ctx)) => f(Some(ctx)),
1600        _ => f(None),
1601    })
1602}