tokio/runtime/scheduler/multi_thread/worker.rs
1//! A scheduler is initialized with a fixed number of workers. Each worker is
2//! driven by a thread. Each worker has a "core" which contains data such as the
3//! run queue and other state. When `block_in_place` is called, the worker's
4//! "core" is handed off to a new thread allowing the scheduler to continue to
5//! make progress while the originating thread blocks.
6//!
7//! # Shutdown
8//!
9//! Shutting down the runtime involves the following steps:
10//!
11//! 1. The Shared::close method is called. This closes the inject queue and
12//! `OwnedTasks` instance and wakes up all worker threads.
13//!
14//! 2. Each worker thread observes the close signal next time it runs
15//! Core::maintenance by checking whether the inject queue is closed.
16//! The `Core::is_shutdown` flag is set to true.
17//!
18//! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19//! will keep removing tasks from `OwnedTasks` until it is empty. No new
20//! tasks can be pushed to the `OwnedTasks` during or after this step as it
21//! was closed in step 1.
22//!
23//! 5. The workers call Shared::shutdown to enter the single-threaded phase of
24//! shutdown. These calls will push their core to `Shared::shutdown_cores`,
25//! and the last thread to push its core will finish the shutdown procedure.
26//!
27//! 6. The local run queue of each core is emptied, then the inject queue is
28//! emptied.
29//!
30//! At this point, shutdown has completed. It is not possible for any of the
31//! collections to contain any tasks at this point, as each collection was
32//! closed first, then emptied afterwards.
33//!
34//! ## Spawns during shutdown
35//!
36//! When spawning tasks during shutdown, there are two cases:
37//!
38//! * The spawner observes the `OwnedTasks` being open, and the inject queue is
39//! closed.
40//! * The spawner observes the `OwnedTasks` being closed and doesn't check the
41//! inject queue.
42//!
43//! The first case can only happen if the `OwnedTasks::bind` call happens before
44//! or during step 1 of shutdown. In this case, the runtime will clean up the
45//! task in step 3 of shutdown.
46//!
47//! In the latter case, the task was not spawned and the task is immediately
48//! cancelled by the spawner.
49//!
50//! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51//! collection to have a closed bit. With a close bit on only the inject queue,
52//! spawning could run in to a situation where a task is successfully bound long
53//! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54//! the first spawning situation could result in the notification being pushed
55//! to the inject queue after step 6 of shutdown, which would leave a task in
56//! the inject queue indefinitely. This would be a ref-count cycle and a memory
57//! leak.
58
59use crate::loom::sync::{Arc, Mutex};
60use crate::runtime;
61use crate::runtime::scheduler::multi_thread::{
62 idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
63};
64use crate::runtime::scheduler::{inject, Defer, Lock};
65use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
66use crate::runtime::{blocking, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics};
67use crate::runtime::{context, TaskHooks};
68use crate::task::coop;
69use crate::util::atomic_cell::AtomicCell;
70use crate::util::rand::{FastRand, RngSeedGenerator};
71
72use std::cell::RefCell;
73use std::task::Waker;
74use std::thread;
75use std::time::Duration;
76
77mod metrics;
78
79cfg_taskdump! {
80 mod taskdump;
81}
82
83cfg_not_taskdump! {
84 mod taskdump_mock;
85}
86
87/// A scheduler worker
88pub(super) struct Worker {
89 /// Reference to scheduler's handle
90 handle: Arc<Handle>,
91
92 /// Index holding this worker's remote state
93 index: usize,
94
95 /// Used to hand-off a worker's core to another thread.
96 core: AtomicCell<Core>,
97}
98
99/// Core data
100struct Core {
101 /// Used to schedule bookkeeping tasks every so often.
102 tick: u32,
103
104 /// When a task is scheduled from a worker, it is stored in this slot. The
105 /// worker will check this slot for a task **before** checking the run
106 /// queue. This effectively results in the **last** scheduled task to be run
107 /// next (LIFO). This is an optimization for improving locality which
108 /// benefits message passing patterns and helps to reduce latency.
109 lifo_slot: Option<Notified>,
110
111 /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
112 /// they go to the back of the `run_queue`.
113 lifo_enabled: bool,
114
115 /// The worker-local run queue.
116 run_queue: queue::Local<Arc<Handle>>,
117
118 /// True if the worker is currently searching for more work. Searching
119 /// involves attempting to steal from other workers.
120 is_searching: bool,
121
122 /// True if the scheduler is being shutdown
123 is_shutdown: bool,
124
125 /// True if the scheduler is being traced
126 is_traced: bool,
127
128 /// Parker
129 ///
130 /// Stored in an `Option` as the parker is added / removed to make the
131 /// borrow checker happy.
132 park: Option<Parker>,
133
134 /// Per-worker runtime stats
135 stats: Stats,
136
137 /// How often to check the global queue
138 global_queue_interval: u32,
139
140 /// Fast random number generator.
141 rand: FastRand,
142}
143
144/// State shared across all workers
145pub(crate) struct Shared {
146 /// Per-worker remote state. All other workers have access to this and is
147 /// how they communicate between each other.
148 remotes: Box<[Remote]>,
149
150 /// Global task queue used for:
151 /// 1. Submit work to the scheduler while **not** currently on a worker thread.
152 /// 2. Submit work to the scheduler when a worker run queue is saturated
153 pub(super) inject: inject::Shared<Arc<Handle>>,
154
155 /// Coordinates idle workers
156 idle: Idle,
157
158 /// Collection of all active tasks spawned onto this executor.
159 pub(crate) owned: OwnedTasks<Arc<Handle>>,
160
161 /// Data synchronized by the scheduler mutex
162 pub(super) synced: Mutex<Synced>,
163
164 /// Cores that have observed the shutdown signal
165 ///
166 /// The core is **not** placed back in the worker to avoid it from being
167 /// stolen by a thread that was spawned as part of `block_in_place`.
168 #[allow(clippy::vec_box)] // we're moving an already-boxed value
169 shutdown_cores: Mutex<Vec<Box<Core>>>,
170
171 /// The number of cores that have observed the trace signal.
172 pub(super) trace_status: TraceStatus,
173
174 /// Scheduler configuration options
175 config: Config,
176
177 /// Collects metrics from the runtime.
178 pub(super) scheduler_metrics: SchedulerMetrics,
179
180 pub(super) worker_metrics: Box<[WorkerMetrics]>,
181
182 /// Only held to trigger some code on drop. This is used to get internal
183 /// runtime metrics that can be useful when doing performance
184 /// investigations. This does nothing (empty struct, no drop impl) unless
185 /// the `tokio_internal_mt_counters` `cfg` flag is set.
186 _counters: Counters,
187}
188
189/// Data synchronized by the scheduler mutex
190pub(crate) struct Synced {
191 /// Synchronized state for `Idle`.
192 pub(super) idle: idle::Synced,
193
194 /// Synchronized state for `Inject`.
195 pub(crate) inject: inject::Synced,
196}
197
198/// Used to communicate with a worker from other threads.
199struct Remote {
200 /// Steals tasks from this worker.
201 pub(super) steal: queue::Steal<Arc<Handle>>,
202
203 /// Unparks the associated worker thread
204 unpark: Unparker,
205}
206
207/// Thread-local context
208pub(crate) struct Context {
209 /// Worker
210 worker: Arc<Worker>,
211
212 /// Core data
213 core: RefCell<Option<Box<Core>>>,
214
215 /// Tasks to wake after resource drivers are polled. This is mostly to
216 /// handle yielded tasks.
217 pub(crate) defer: Defer,
218}
219
220/// Starts the workers
221pub(crate) struct Launch(Vec<Arc<Worker>>);
222
223/// Running a task may consume the core. If the core is still available when
224/// running the task completes, it is returned. Otherwise, the worker will need
225/// to stop processing.
226type RunResult = Result<Box<Core>, ()>;
227
228/// A task handle
229type Task = task::Task<Arc<Handle>>;
230
231/// A notified task handle
232type Notified = task::Notified<Arc<Handle>>;
233
234/// Value picked out of thin-air. Running the LIFO slot a handful of times
235/// seems sufficient to benefit from locality. More than 3 times probably is
236/// overweighing. The value can be tuned in the future with data that shows
237/// improvements.
238const MAX_LIFO_POLLS_PER_TICK: usize = 3;
239
240pub(super) fn create(
241 size: usize,
242 park: Parker,
243 driver_handle: driver::Handle,
244 blocking_spawner: blocking::Spawner,
245 seed_generator: RngSeedGenerator,
246 config: Config,
247) -> (Arc<Handle>, Launch) {
248 let mut cores = Vec::with_capacity(size);
249 let mut remotes = Vec::with_capacity(size);
250 let mut worker_metrics = Vec::with_capacity(size);
251
252 // Create the local queues
253 for _ in 0..size {
254 let (steal, run_queue) = queue::local();
255
256 let park = park.clone();
257 let unpark = park.unpark();
258 let metrics = WorkerMetrics::from_config(&config);
259 let stats = Stats::new(&metrics);
260
261 cores.push(Box::new(Core {
262 tick: 0,
263 lifo_slot: None,
264 lifo_enabled: !config.disable_lifo_slot,
265 run_queue,
266 is_searching: false,
267 is_shutdown: false,
268 is_traced: false,
269 park: Some(park),
270 global_queue_interval: stats.tuned_global_queue_interval(&config),
271 stats,
272 rand: FastRand::from_seed(config.seed_generator.next_seed()),
273 }));
274
275 remotes.push(Remote { steal, unpark });
276 worker_metrics.push(metrics);
277 }
278
279 let (idle, idle_synced) = Idle::new(size);
280 let (inject, inject_synced) = inject::Shared::new();
281
282 let remotes_len = remotes.len();
283 let handle = Arc::new(Handle {
284 task_hooks: TaskHooks::from_config(&config),
285 shared: Shared {
286 remotes: remotes.into_boxed_slice(),
287 inject,
288 idle,
289 owned: OwnedTasks::new(size),
290 synced: Mutex::new(Synced {
291 idle: idle_synced,
292 inject: inject_synced,
293 }),
294 shutdown_cores: Mutex::new(vec![]),
295 trace_status: TraceStatus::new(remotes_len),
296 config,
297 scheduler_metrics: SchedulerMetrics::new(),
298 worker_metrics: worker_metrics.into_boxed_slice(),
299 _counters: Counters,
300 },
301 driver: driver_handle,
302 blocking_spawner,
303 seed_generator,
304 });
305
306 let mut launch = Launch(vec![]);
307
308 for (index, core) in cores.drain(..).enumerate() {
309 launch.0.push(Arc::new(Worker {
310 handle: handle.clone(),
311 index,
312 core: AtomicCell::new(Some(core)),
313 }));
314 }
315
316 (handle, launch)
317}
318
319#[track_caller]
320pub(crate) fn block_in_place<F, R>(f: F) -> R
321where
322 F: FnOnce() -> R,
323{
324 // Try to steal the worker core back
325 struct Reset {
326 take_core: bool,
327 budget: coop::Budget,
328 }
329
330 impl Drop for Reset {
331 fn drop(&mut self) {
332 with_current(|maybe_cx| {
333 if let Some(cx) = maybe_cx {
334 if self.take_core {
335 let core = cx.worker.core.take();
336
337 if core.is_some() {
338 cx.worker.handle.shared.worker_metrics[cx.worker.index]
339 .set_thread_id(thread::current().id());
340 }
341
342 let mut cx_core = cx.core.borrow_mut();
343 assert!(cx_core.is_none());
344 *cx_core = core;
345 }
346
347 // Reset the task budget as we are re-entering the
348 // runtime.
349 coop::set(self.budget);
350 }
351 });
352 }
353 }
354
355 let mut had_entered = false;
356 let mut take_core = false;
357
358 let setup_result = with_current(|maybe_cx| {
359 match (
360 crate::runtime::context::current_enter_context(),
361 maybe_cx.is_some(),
362 ) {
363 (context::EnterRuntime::Entered { .. }, true) => {
364 // We are on a thread pool runtime thread, so we just need to
365 // set up blocking.
366 had_entered = true;
367 }
368 (
369 context::EnterRuntime::Entered {
370 allow_block_in_place,
371 },
372 false,
373 ) => {
374 // We are on an executor, but _not_ on the thread pool. That is
375 // _only_ okay if we are in a thread pool runtime's block_on
376 // method:
377 if allow_block_in_place {
378 had_entered = true;
379 return Ok(());
380 } else {
381 // This probably means we are on the current_thread runtime or in a
382 // LocalSet, where it is _not_ okay to block.
383 return Err(
384 "can call blocking only when running on the multi-threaded runtime",
385 );
386 }
387 }
388 (context::EnterRuntime::NotEntered, true) => {
389 // This is a nested call to block_in_place (we already exited).
390 // All the necessary setup has already been done.
391 return Ok(());
392 }
393 (context::EnterRuntime::NotEntered, false) => {
394 // We are outside of the tokio runtime, so blocking is fine.
395 // We can also skip all of the thread pool blocking setup steps.
396 return Ok(());
397 }
398 }
399
400 let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
401
402 // Get the worker core. If none is set, then blocking is fine!
403 let mut core = match cx.core.borrow_mut().take() {
404 Some(core) => core,
405 None => return Ok(()),
406 };
407
408 // If we heavily call `spawn_blocking`, there might be no available thread to
409 // run this core. Except for the task in the lifo_slot, all tasks can be
410 // stolen, so we move the task out of the lifo_slot to the run_queue.
411 if let Some(task) = core.lifo_slot.take() {
412 core.run_queue
413 .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
414 }
415
416 // We are taking the core from the context and sending it to another
417 // thread.
418 take_core = true;
419
420 // The parker should be set here
421 assert!(core.park.is_some());
422
423 // In order to block, the core must be sent to another thread for
424 // execution.
425 //
426 // First, move the core back into the worker's shared core slot.
427 cx.worker.core.set(core);
428
429 // Next, clone the worker handle and send it to a new thread for
430 // processing.
431 //
432 // Once the blocking task is done executing, we will attempt to
433 // steal the core back.
434 let worker = cx.worker.clone();
435 runtime::spawn_blocking(move || run(worker));
436 Ok(())
437 });
438
439 if let Err(panic_message) = setup_result {
440 panic!("{}", panic_message);
441 }
442
443 if had_entered {
444 // Unset the current task's budget. Blocking sections are not
445 // constrained by task budgets.
446 let _reset = Reset {
447 take_core,
448 budget: coop::stop(),
449 };
450
451 crate::runtime::context::exit_runtime(f)
452 } else {
453 f()
454 }
455}
456
457impl Launch {
458 pub(crate) fn launch(mut self) {
459 for worker in self.0.drain(..) {
460 runtime::spawn_blocking(move || run(worker));
461 }
462 }
463}
464
465fn run(worker: Arc<Worker>) {
466 #[allow(dead_code)]
467 struct AbortOnPanic;
468
469 impl Drop for AbortOnPanic {
470 fn drop(&mut self) {
471 if std::thread::panicking() {
472 eprintln!("worker thread panicking; aborting process");
473 std::process::abort();
474 }
475 }
476 }
477
478 // Catching panics on worker threads in tests is quite tricky. Instead, when
479 // debug assertions are enabled, we just abort the process.
480 #[cfg(debug_assertions)]
481 let _abort_on_panic = AbortOnPanic;
482
483 // Acquire a core. If this fails, then another thread is running this
484 // worker and there is nothing further to do.
485 let core = match worker.core.take() {
486 Some(core) => core,
487 None => return,
488 };
489
490 worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());
491
492 let handle = scheduler::Handle::MultiThread(worker.handle.clone());
493
494 crate::runtime::context::enter_runtime(&handle, true, |_| {
495 // Set the worker context.
496 let cx = scheduler::Context::MultiThread(Context {
497 worker,
498 core: RefCell::new(None),
499 defer: Defer::new(),
500 });
501
502 context::set_scheduler(&cx, || {
503 let cx = cx.expect_multi_thread();
504
505 // This should always be an error. It only returns a `Result` to support
506 // using `?` to short circuit.
507 assert!(cx.run(core).is_err());
508
509 // Check if there are any deferred tasks to notify. This can happen when
510 // the worker core is lost due to `block_in_place()` being called from
511 // within the task.
512 cx.defer.wake();
513 });
514 });
515}
516
517impl Context {
518 fn run(&self, mut core: Box<Core>) -> RunResult {
519 // Reset `lifo_enabled` here in case the core was previously stolen from
520 // a task that had the LIFO slot disabled.
521 self.reset_lifo_enabled(&mut core);
522
523 // Start as "processing" tasks as polling tasks from the local queue
524 // will be one of the first things we do.
525 core.stats.start_processing_scheduled_tasks();
526
527 while !core.is_shutdown {
528 self.assert_lifo_enabled_is_correct(&core);
529
530 if core.is_traced {
531 core = self.worker.handle.trace_core(core);
532 }
533
534 // Increment the tick
535 core.tick();
536
537 // Run maintenance, if needed
538 core = self.maintenance(core);
539
540 // First, check work available to the current worker.
541 if let Some(task) = core.next_task(&self.worker) {
542 core = self.run_task(task, core)?;
543 continue;
544 }
545
546 // We consumed all work in the queues and will start searching for work.
547 core.stats.end_processing_scheduled_tasks();
548
549 // There is no more **local** work to process, try to steal work
550 // from other workers.
551 if let Some(task) = core.steal_work(&self.worker) {
552 // Found work, switch back to processing
553 core.stats.start_processing_scheduled_tasks();
554 core = self.run_task(task, core)?;
555 } else {
556 // Wait for work
557 core = if !self.defer.is_empty() {
558 self.park_timeout(core, Some(Duration::from_millis(0)))
559 } else {
560 self.park(core)
561 };
562 core.stats.start_processing_scheduled_tasks();
563 }
564 }
565
566 core.pre_shutdown(&self.worker);
567 // Signal shutdown
568 self.worker.handle.shutdown_core(core);
569 Err(())
570 }
571
572 fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
573 #[cfg(tokio_unstable)]
574 let task_id = task.task_id();
575
576 let task = self.worker.handle.shared.owned.assert_owner(task);
577
578 // Make sure the worker is not in the **searching** state. This enables
579 // another idle worker to try to steal work.
580 core.transition_from_searching(&self.worker);
581
582 self.assert_lifo_enabled_is_correct(&core);
583
584 // Measure the poll start time. Note that we may end up polling other
585 // tasks under this measurement. In this case, the tasks came from the
586 // LIFO slot and are considered part of the current task for scheduling
587 // purposes. These tasks inherent the "parent"'s limits.
588 core.stats.start_poll();
589
590 // Make the core available to the runtime context
591 *self.core.borrow_mut() = Some(core);
592
593 // Run the task
594 coop::budget(|| {
595 // Unlike the poll time above, poll start callback is attached to the task id,
596 // so it is tightly associated with the actual poll invocation.
597 #[cfg(tokio_unstable)]
598 self.worker.handle.task_hooks.poll_start_callback(task_id);
599
600 task.run();
601
602 #[cfg(tokio_unstable)]
603 self.worker.handle.task_hooks.poll_stop_callback(task_id);
604
605 let mut lifo_polls = 0;
606
607 // As long as there is budget remaining and a task exists in the
608 // `lifo_slot`, then keep running.
609 loop {
610 // Check if we still have the core. If not, the core was stolen
611 // by another worker.
612 let mut core = match self.core.borrow_mut().take() {
613 Some(core) => core,
614 None => {
615 // In this case, we cannot call `reset_lifo_enabled()`
616 // because the core was stolen. The stealer will handle
617 // that at the top of `Context::run`
618 return Err(());
619 }
620 };
621
622 // Check for a task in the LIFO slot
623 let task = match core.lifo_slot.take() {
624 Some(task) => task,
625 None => {
626 self.reset_lifo_enabled(&mut core);
627 core.stats.end_poll();
628 return Ok(core);
629 }
630 };
631
632 if !coop::has_budget_remaining() {
633 core.stats.end_poll();
634
635 // Not enough budget left to run the LIFO task, push it to
636 // the back of the queue and return.
637 core.run_queue.push_back_or_overflow(
638 task,
639 &*self.worker.handle,
640 &mut core.stats,
641 );
642 // If we hit this point, the LIFO slot should be enabled.
643 // There is no need to reset it.
644 debug_assert!(core.lifo_enabled);
645 return Ok(core);
646 }
647
648 // Track that we are about to run a task from the LIFO slot.
649 lifo_polls += 1;
650 super::counters::inc_lifo_schedules();
651
652 // Disable the LIFO slot if we reach our limit
653 //
654 // In ping-ping style workloads where task A notifies task B,
655 // which notifies task A again, continuously prioritizing the
656 // LIFO slot can cause starvation as these two tasks will
657 // repeatedly schedule the other. To mitigate this, we limit the
658 // number of times the LIFO slot is prioritized.
659 if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
660 core.lifo_enabled = false;
661 super::counters::inc_lifo_capped();
662 }
663
664 // Run the LIFO task, then loop
665 *self.core.borrow_mut() = Some(core);
666 let task = self.worker.handle.shared.owned.assert_owner(task);
667
668 #[cfg(tokio_unstable)]
669 let task_id = task.task_id();
670
671 #[cfg(tokio_unstable)]
672 self.worker.handle.task_hooks.poll_start_callback(task_id);
673
674 task.run();
675
676 #[cfg(tokio_unstable)]
677 self.worker.handle.task_hooks.poll_stop_callback(task_id);
678 }
679 })
680 }
681
682 fn reset_lifo_enabled(&self, core: &mut Core) {
683 core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
684 }
685
686 fn assert_lifo_enabled_is_correct(&self, core: &Core) {
687 debug_assert_eq!(
688 core.lifo_enabled,
689 !self.worker.handle.shared.config.disable_lifo_slot
690 );
691 }
692
693 fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
694 if core.tick % self.worker.handle.shared.config.event_interval == 0 {
695 super::counters::inc_num_maintenance();
696
697 core.stats.end_processing_scheduled_tasks();
698
699 // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
700 // to run without actually putting the thread to sleep.
701 core = self.park_timeout(core, Some(Duration::from_millis(0)));
702
703 // Run regularly scheduled maintenance
704 core.maintenance(&self.worker);
705
706 core.stats.start_processing_scheduled_tasks();
707 }
708
709 core
710 }
711
712 /// Parks the worker thread while waiting for tasks to execute.
713 ///
714 /// This function checks if indeed there's no more work left to be done before parking.
715 /// Also important to notice that, before parking, the worker thread will try to take
716 /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
717 /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
718 /// in its own local queue until the queue saturates (ntasks > `LOCAL_QUEUE_CAPACITY`).
719 /// When the local queue is saturated, the overflow tasks are added to the injection queue
720 /// from where other workers can pick them up.
721 /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
722 /// after all the IOs get dispatched
723 fn park(&self, mut core: Box<Core>) -> Box<Core> {
724 if let Some(f) = &self.worker.handle.shared.config.before_park {
725 f();
726 }
727
728 if core.transition_to_parked(&self.worker) {
729 while !core.is_shutdown && !core.is_traced {
730 core.stats.about_to_park();
731 core.stats
732 .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);
733
734 core = self.park_timeout(core, None);
735
736 core.stats.unparked();
737
738 // Run regularly scheduled maintenance
739 core.maintenance(&self.worker);
740
741 if core.transition_from_parked(&self.worker) {
742 break;
743 }
744 }
745 }
746
747 if let Some(f) = &self.worker.handle.shared.config.after_unpark {
748 f();
749 }
750 core
751 }
752
753 fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
754 self.assert_lifo_enabled_is_correct(&core);
755
756 // Take the parker out of core
757 let mut park = core.park.take().expect("park missing");
758
759 // Store `core` in context
760 *self.core.borrow_mut() = Some(core);
761
762 // Park thread
763 if let Some(timeout) = duration {
764 park.park_timeout(&self.worker.handle.driver, timeout);
765 } else {
766 park.park(&self.worker.handle.driver);
767 }
768
769 self.defer.wake();
770
771 // Remove `core` from context
772 core = self.core.borrow_mut().take().expect("core missing");
773
774 // Place `park` back in `core`
775 core.park = Some(park);
776
777 if core.should_notify_others() {
778 self.worker.handle.notify_parked_local();
779 }
780
781 core
782 }
783
784 pub(crate) fn defer(&self, waker: &Waker) {
785 self.defer.defer(waker);
786 }
787
788 #[allow(dead_code)]
789 pub(crate) fn get_worker_index(&self) -> usize {
790 self.worker.index
791 }
792}
793
794impl Core {
795 /// Increment the tick
796 fn tick(&mut self) {
797 self.tick = self.tick.wrapping_add(1);
798 }
799
800 /// Return the next notified task available to this worker.
801 fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
802 if self.tick % self.global_queue_interval == 0 {
803 // Update the global queue interval, if needed
804 self.tune_global_queue_interval(worker);
805
806 worker
807 .handle
808 .next_remote_task()
809 .or_else(|| self.next_local_task())
810 } else {
811 let maybe_task = self.next_local_task();
812
813 if maybe_task.is_some() {
814 return maybe_task;
815 }
816
817 if worker.inject().is_empty() {
818 return None;
819 }
820
821 // Other threads can only **remove** tasks from the current worker's
822 // `run_queue`. So, we can be confident that by the time we call
823 // `run_queue.push_back` below, there will be *at least* `cap`
824 // available slots in the queue.
825 let cap = usize::min(
826 self.run_queue.remaining_slots(),
827 self.run_queue.max_capacity() / 2,
828 );
829
830 // The worker is currently idle, pull a batch of work from the
831 // injection queue. We don't want to pull *all* the work so other
832 // workers can also get some.
833 let n = usize::min(
834 worker.inject().len() / worker.handle.shared.remotes.len() + 1,
835 cap,
836 );
837
838 // Take at least one task since the first task is returned directly
839 // and not pushed onto the local queue.
840 let n = usize::max(1, n);
841
842 let mut synced = worker.handle.shared.synced.lock();
843 // safety: passing in the correct `inject::Synced`.
844 let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
845
846 // Pop the first task to return immediately
847 let ret = tasks.next();
848
849 // Push the rest of the on the run queue
850 self.run_queue.push_back(tasks);
851
852 ret
853 }
854 }
855
856 fn next_local_task(&mut self) -> Option<Notified> {
857 self.lifo_slot.take().or_else(|| self.run_queue.pop())
858 }
859
860 /// Function responsible for stealing tasks from another worker
861 ///
862 /// Note: Only if less than half the workers are searching for tasks to steal
863 /// a new worker will actually try to steal. The idea is to make sure not all
864 /// workers will be trying to steal at the same time.
865 fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
866 if !self.transition_to_searching(worker) {
867 return None;
868 }
869
870 let num = worker.handle.shared.remotes.len();
871 // Start from a random worker
872 let start = self.rand.fastrand_n(num as u32) as usize;
873
874 for i in 0..num {
875 let i = (start + i) % num;
876
877 // Don't steal from ourself! We know we don't have work.
878 if i == worker.index {
879 continue;
880 }
881
882 let target = &worker.handle.shared.remotes[i];
883 if let Some(task) = target
884 .steal
885 .steal_into(&mut self.run_queue, &mut self.stats)
886 {
887 return Some(task);
888 }
889 }
890
891 // Fallback on checking the global queue
892 worker.handle.next_remote_task()
893 }
894
895 fn transition_to_searching(&mut self, worker: &Worker) -> bool {
896 if !self.is_searching {
897 self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
898 }
899
900 self.is_searching
901 }
902
903 fn transition_from_searching(&mut self, worker: &Worker) {
904 if !self.is_searching {
905 return;
906 }
907
908 self.is_searching = false;
909 worker.handle.transition_worker_from_searching();
910 }
911
912 fn has_tasks(&self) -> bool {
913 self.lifo_slot.is_some() || self.run_queue.has_tasks()
914 }
915
916 fn should_notify_others(&self) -> bool {
917 // If there are tasks available to steal, but this worker is not
918 // looking for tasks to steal, notify another worker.
919 if self.is_searching {
920 return false;
921 }
922 self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
923 }
924
925 /// Prepares the worker state for parking.
926 ///
927 /// Returns true if the transition happened, false if there is work to do first.
928 fn transition_to_parked(&mut self, worker: &Worker) -> bool {
929 // Workers should not park if they have work to do
930 if self.has_tasks() || self.is_traced {
931 return false;
932 }
933
934 // When the final worker transitions **out** of searching to parked, it
935 // must check all the queues one last time in case work materialized
936 // between the last work scan and transitioning out of searching.
937 let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
938 &worker.handle.shared,
939 worker.index,
940 self.is_searching,
941 );
942
943 // The worker is no longer searching. Setting this is the local cache
944 // only.
945 self.is_searching = false;
946
947 if is_last_searcher {
948 worker.handle.notify_if_work_pending();
949 }
950
951 true
952 }
953
954 /// Returns `true` if the transition happened.
955 fn transition_from_parked(&mut self, worker: &Worker) -> bool {
956 // If a task is in the lifo slot/run queue, then we must unpark regardless of
957 // being notified
958 if self.has_tasks() {
959 // When a worker wakes, it should only transition to the "searching"
960 // state when the wake originates from another worker *or* a new task
961 // is pushed. We do *not* want the worker to transition to "searching"
962 // when it wakes when the I/O driver receives new events.
963 self.is_searching = !worker
964 .handle
965 .shared
966 .idle
967 .unpark_worker_by_id(&worker.handle.shared, worker.index);
968 return true;
969 }
970
971 if worker
972 .handle
973 .shared
974 .idle
975 .is_parked(&worker.handle.shared, worker.index)
976 {
977 return false;
978 }
979
980 // When unparked, the worker is in the searching state.
981 self.is_searching = true;
982 true
983 }
984
985 /// Runs maintenance work such as checking the pool's state.
986 fn maintenance(&mut self, worker: &Worker) {
987 self.stats
988 .submit(&worker.handle.shared.worker_metrics[worker.index]);
989
990 if !self.is_shutdown {
991 // Check if the scheduler has been shutdown
992 let synced = worker.handle.shared.synced.lock();
993 self.is_shutdown = worker.inject().is_closed(&synced.inject);
994 }
995
996 if !self.is_traced {
997 // Check if the worker should be tracing.
998 self.is_traced = worker.handle.shared.trace_status.trace_requested();
999 }
1000 }
1001
1002 /// Signals all tasks to shut down, and waits for them to complete. Must run
1003 /// before we enter the single-threaded phase of shutdown processing.
1004 fn pre_shutdown(&mut self, worker: &Worker) {
1005 // Start from a random inner list
1006 let start = self
1007 .rand
1008 .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
1009 // Signal to all tasks to shut down.
1010 worker
1011 .handle
1012 .shared
1013 .owned
1014 .close_and_shutdown_all(start as usize);
1015
1016 self.stats
1017 .submit(&worker.handle.shared.worker_metrics[worker.index]);
1018 }
1019
1020 /// Shuts down the core.
1021 fn shutdown(&mut self, handle: &Handle) {
1022 // Take the core
1023 let mut park = self.park.take().expect("park missing");
1024
1025 // Drain the queue
1026 while self.next_local_task().is_some() {}
1027
1028 park.shutdown(&handle.driver);
1029 }
1030
1031 fn tune_global_queue_interval(&mut self, worker: &Worker) {
1032 let next = self
1033 .stats
1034 .tuned_global_queue_interval(&worker.handle.shared.config);
1035
1036 // Smooth out jitter
1037 if u32::abs_diff(self.global_queue_interval, next) > 2 {
1038 self.global_queue_interval = next;
1039 }
1040 }
1041}
1042
1043impl Worker {
1044 /// Returns a reference to the scheduler's injection queue.
1045 fn inject(&self) -> &inject::Shared<Arc<Handle>> {
1046 &self.handle.shared.inject
1047 }
1048}
1049
1050// TODO: Move `Handle` impls into handle.rs
1051impl task::Schedule for Arc<Handle> {
1052 fn release(&self, task: &Task) -> Option<Task> {
1053 self.shared.owned.remove(task)
1054 }
1055
1056 fn schedule(&self, task: Notified) {
1057 self.schedule_task(task, false);
1058 }
1059
1060 fn hooks(&self) -> TaskHarnessScheduleHooks {
1061 TaskHarnessScheduleHooks {
1062 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
1063 }
1064 }
1065
1066 fn yield_now(&self, task: Notified) {
1067 self.schedule_task(task, true);
1068 }
1069}
1070
1071impl Handle {
1072 pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1073 with_current(|maybe_cx| {
1074 if let Some(cx) = maybe_cx {
1075 // Make sure the task is part of the **current** scheduler.
1076 if self.ptr_eq(&cx.worker.handle) {
1077 // And the current thread still holds a core
1078 if let Some(core) = cx.core.borrow_mut().as_mut() {
1079 self.schedule_local(core, task, is_yield);
1080 return;
1081 }
1082 }
1083 }
1084
1085 // Otherwise, use the inject queue.
1086 self.push_remote_task(task);
1087 self.notify_parked_remote();
1088 });
1089 }
1090
1091 pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
1092 if let Some(task) = task {
1093 self.schedule_task(task, false);
1094 }
1095 }
1096
1097 fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
1098 core.stats.inc_local_schedule_count();
1099
1100 // Spawning from the worker thread. If scheduling a "yield" then the
1101 // task must always be pushed to the back of the queue, enabling other
1102 // tasks to be executed. If **not** a yield, then there is more
1103 // flexibility and the task may go to the front of the queue.
1104 let should_notify = if is_yield || !core.lifo_enabled {
1105 core.run_queue
1106 .push_back_or_overflow(task, self, &mut core.stats);
1107 true
1108 } else {
1109 // Push to the LIFO slot
1110 let prev = core.lifo_slot.take();
1111 let ret = prev.is_some();
1112
1113 if let Some(prev) = prev {
1114 core.run_queue
1115 .push_back_or_overflow(prev, self, &mut core.stats);
1116 }
1117
1118 core.lifo_slot = Some(task);
1119
1120 ret
1121 };
1122
1123 // Only notify if not currently parked. If `park` is `None`, then the
1124 // scheduling is from a resource driver. As notifications often come in
1125 // batches, the notification is delayed until the park is complete.
1126 if should_notify && core.park.is_some() {
1127 self.notify_parked_local();
1128 }
1129 }
1130
1131 fn next_remote_task(&self) -> Option<Notified> {
1132 if self.shared.inject.is_empty() {
1133 return None;
1134 }
1135
1136 let mut synced = self.shared.synced.lock();
1137 // safety: passing in correct `idle::Synced`
1138 unsafe { self.shared.inject.pop(&mut synced.inject) }
1139 }
1140
1141 fn push_remote_task(&self, task: Notified) {
1142 self.shared.scheduler_metrics.inc_remote_schedule_count();
1143
1144 let mut synced = self.shared.synced.lock();
1145 // safety: passing in correct `idle::Synced`
1146 unsafe {
1147 self.shared.inject.push(&mut synced.inject, task);
1148 }
1149 }
1150
1151 pub(super) fn close(&self) {
1152 if self
1153 .shared
1154 .inject
1155 .close(&mut self.shared.synced.lock().inject)
1156 {
1157 self.notify_all();
1158 }
1159 }
1160
1161 fn notify_parked_local(&self) {
1162 super::counters::inc_num_inc_notify_local();
1163
1164 if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1165 super::counters::inc_num_unparks_local();
1166 self.shared.remotes[index].unpark.unpark(&self.driver);
1167 }
1168 }
1169
1170 fn notify_parked_remote(&self) {
1171 if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1172 self.shared.remotes[index].unpark.unpark(&self.driver);
1173 }
1174 }
1175
1176 pub(super) fn notify_all(&self) {
1177 for remote in &self.shared.remotes[..] {
1178 remote.unpark.unpark(&self.driver);
1179 }
1180 }
1181
1182 fn notify_if_work_pending(&self) {
1183 for remote in &self.shared.remotes[..] {
1184 if !remote.steal.is_empty() {
1185 self.notify_parked_local();
1186 return;
1187 }
1188 }
1189
1190 if !self.shared.inject.is_empty() {
1191 self.notify_parked_local();
1192 }
1193 }
1194
1195 fn transition_worker_from_searching(&self) {
1196 if self.shared.idle.transition_worker_from_searching() {
1197 // We are the final searching worker. Because work was found, we
1198 // need to notify another worker.
1199 self.notify_parked_local();
1200 }
1201 }
1202
1203 /// Signals that a worker has observed the shutdown signal and has replaced
1204 /// its core back into its handle.
1205 ///
1206 /// If all workers have reached this point, the final cleanup is performed.
1207 fn shutdown_core(&self, core: Box<Core>) {
1208 let mut cores = self.shared.shutdown_cores.lock();
1209 cores.push(core);
1210
1211 if cores.len() != self.shared.remotes.len() {
1212 return;
1213 }
1214
1215 debug_assert!(self.shared.owned.is_empty());
1216
1217 for mut core in cores.drain(..) {
1218 core.shutdown(self);
1219 }
1220
1221 // Drain the injection queue
1222 //
1223 // We already shut down every task, so we can simply drop the tasks.
1224 while let Some(task) = self.next_remote_task() {
1225 drop(task);
1226 }
1227 }
1228
1229 fn ptr_eq(&self, other: &Handle) -> bool {
1230 std::ptr::eq(self, other)
1231 }
1232}
1233
1234impl Overflow<Arc<Handle>> for Handle {
1235 fn push(&self, task: task::Notified<Arc<Handle>>) {
1236 self.push_remote_task(task);
1237 }
1238
1239 fn push_batch<I>(&self, iter: I)
1240 where
1241 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1242 {
1243 unsafe {
1244 self.shared.inject.push_batch(self, iter);
1245 }
1246 }
1247}
1248
1249pub(crate) struct InjectGuard<'a> {
1250 lock: crate::loom::sync::MutexGuard<'a, Synced>,
1251}
1252
1253impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
1254 fn as_mut(&mut self) -> &mut inject::Synced {
1255 &mut self.lock.inject
1256 }
1257}
1258
1259impl<'a> Lock<inject::Synced> for &'a Handle {
1260 type Handle = InjectGuard<'a>;
1261
1262 fn lock(self) -> Self::Handle {
1263 InjectGuard {
1264 lock: self.shared.synced.lock(),
1265 }
1266 }
1267}
1268
1269#[track_caller]
1270fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1271 use scheduler::Context::MultiThread;
1272
1273 context::with_scheduler(|ctx| match ctx {
1274 Some(MultiThread(ctx)) => f(Some(ctx)),
1275 _ => f(None),
1276 })
1277}