1use crate::loom::sync::{Arc, Condvar, Mutex, MutexGuard};
60use crate::runtime;
61use crate::runtime::driver::Driver;
62use crate::runtime::scheduler::multi_thread_alt::{
63 idle, queue, stats, Counters, Handle, Idle, Overflow, Stats, TraceStatus,
64};
65use crate::runtime::scheduler::{self, inject, Lock};
66use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
67use crate::runtime::{blocking, driver, task, Config, SchedulerMetrics, WorkerMetrics};
68use crate::runtime::{context, TaskHooks};
69use crate::task::coop;
70use crate::util::atomic_cell::AtomicCell;
71use crate::util::rand::{FastRand, RngSeedGenerator};
72
73use std::cell::{Cell, RefCell};
74use std::task::Waker;
75use std::time::Duration;
76use std::{cmp, thread};
77
78cfg_unstable_metrics! {
79 mod metrics;
80}
81
82mod taskdump_mock;
83
84pub(super) struct Worker {
88 tick: u32,
90
91 pub(super) is_shutdown: bool,
93
94 is_traced: bool,
96
97 num_seq_local_queue_polls: u32,
100
101 global_queue_interval: u32,
103
104 workers_to_notify: Vec<usize>,
106
107 idle_snapshot: idle::Snapshot,
109
110 stats: stats::Ephemeral,
111}
112
113#[repr(align(128))]
117pub(super) struct Core {
118 pub(super) index: usize,
120
121 lifo_slot: Option<Notified>,
122
123 run_queue: queue::Local<Arc<Handle>>,
125
126 pub(super) is_searching: bool,
129
130 stats: Stats,
132
133 rand: FastRand,
135}
136
137pub(crate) struct Shared {
139 remotes: Box<[Remote]>,
141
142 pub(super) inject: inject::Shared<Arc<Handle>>,
146
147 idle: Idle,
149
150 pub(super) owned: OwnedTasks<Arc<Handle>>,
152
153 pub(super) synced: Mutex<Synced>,
155
156 driver: AtomicCell<Driver>,
159
160 pub(super) condvars: Vec<Condvar>,
163
164 pub(super) trace_status: TraceStatus,
166
167 config: Config,
169
170 pub(super) scheduler_metrics: SchedulerMetrics,
172
173 pub(super) worker_metrics: Box<[WorkerMetrics]>,
174
175 _counters: Counters,
180}
181
182pub(crate) struct Synced {
184 pub(super) assigned_cores: Vec<Option<Box<Core>>>,
187
188 shutdown_cores: Vec<Box<Core>>,
193
194 shutdown_driver: Option<Box<Driver>>,
196
197 pub(super) idle: idle::Synced,
199
200 pub(crate) inject: inject::Synced,
202}
203
204struct Remote {
206 pub(super) steal: queue::Steal<Arc<Handle>>,
215}
216
217pub(crate) struct Context {
219 handle: Arc<Handle>,
221
222 index: usize,
224
225 lifo_enabled: Cell<bool>,
227
228 core: RefCell<Option<Box<Core>>>,
230
231 handoff_core: Arc<AtomicCell<Core>>,
233
234 pub(crate) defer: RefCell<Vec<Notified>>,
237}
238
239type RunResult = Result<Box<Core>, ()>;
243type NextTaskResult = Result<(Option<Notified>, Box<Core>), ()>;
244
245type Task = task::Task<Arc<Handle>>;
247
248type Notified = task::Notified<Arc<Handle>>;
250
251const MAX_LIFO_POLLS_PER_TICK: usize = 3;
256
257pub(super) fn create(
258 num_cores: usize,
259 driver: Driver,
260 driver_handle: driver::Handle,
261 blocking_spawner: blocking::Spawner,
262 seed_generator: RngSeedGenerator,
263 config: Config,
264) -> runtime::Handle {
265 let mut num_workers = num_cores;
266
267 if driver.is_enabled() {
270 num_workers += 1;
271 }
272
273 let mut cores = Vec::with_capacity(num_cores);
274 let mut remotes = Vec::with_capacity(num_cores);
275 let mut worker_metrics = Vec::with_capacity(num_cores);
277
278 for i in 0..num_cores {
280 let (steal, run_queue) = queue::local(config.local_queue_capacity);
281
282 let metrics = WorkerMetrics::from_config(&config);
283 let stats = Stats::new(&metrics);
284
285 cores.push(Box::new(Core {
286 index: i,
287 lifo_slot: None,
288 run_queue,
289 is_searching: false,
290 stats,
291 rand: FastRand::from_seed(config.seed_generator.next_seed()),
292 }));
293
294 remotes.push(Remote {
295 steal,
296 });
298 worker_metrics.push(metrics);
299 }
300
301 let (idle, idle_synced) = Idle::new(cores, num_workers);
304 let (inject, inject_synced) = inject::Shared::new();
305
306 let handle = Arc::new(Handle {
307 task_hooks: TaskHooks::from_config(&config),
308 shared: Shared {
309 remotes: remotes.into_boxed_slice(),
310 inject,
311 idle,
312 owned: OwnedTasks::new(num_cores),
313 synced: Mutex::new(Synced {
314 assigned_cores: (0..num_workers).map(|_| None).collect(),
315 shutdown_cores: Vec::with_capacity(num_cores),
316 shutdown_driver: None,
317 idle: idle_synced,
318 inject: inject_synced,
319 }),
320 driver: AtomicCell::new(Some(Box::new(driver))),
321 condvars: (0..num_workers).map(|_| Condvar::new()).collect(),
322 trace_status: TraceStatus::new(num_cores),
323 config,
324 scheduler_metrics: SchedulerMetrics::new(),
325 worker_metrics: worker_metrics.into_boxed_slice(),
326 _counters: Counters,
327 },
328 driver: driver_handle,
329 blocking_spawner,
330 seed_generator,
331 });
332
333 let rt_handle = runtime::Handle {
334 inner: scheduler::Handle::MultiThreadAlt(handle),
335 };
336
337 for index in 0..num_workers {
339 let handle = rt_handle.inner.expect_multi_thread_alt();
340 let h2 = handle.clone();
341 let handoff_core = Arc::new(AtomicCell::new(None));
342
343 handle
344 .blocking_spawner
345 .spawn_blocking(&rt_handle, move || run(index, h2, handoff_core, false));
346 }
347
348 rt_handle
349}
350
351#[track_caller]
352pub(crate) fn block_in_place<F, R>(f: F) -> R
353where
354 F: FnOnce() -> R,
355{
356 struct Reset(coop::Budget);
358
359 impl Drop for Reset {
360 fn drop(&mut self) {
361 with_current(|maybe_cx| {
362 if let Some(cx) = maybe_cx {
363 let core = cx.handoff_core.take();
364 let mut cx_core = cx.core.borrow_mut();
365 assert!(cx_core.is_none());
366 *cx_core = core;
367
368 coop::set(self.0);
371 }
372 });
373 }
374 }
375
376 let mut had_entered = false;
377
378 let setup_result = with_current(|maybe_cx| {
379 match (
380 crate::runtime::context::current_enter_context(),
381 maybe_cx.is_some(),
382 ) {
383 (context::EnterRuntime::Entered { .. }, true) => {
384 had_entered = true;
387 }
388 (
389 context::EnterRuntime::Entered {
390 allow_block_in_place,
391 },
392 false,
393 ) => {
394 if allow_block_in_place {
398 had_entered = true;
399 return Ok(());
400 } else {
401 return Err(
404 "can call blocking only when running on the multi-threaded runtime",
405 );
406 }
407 }
408 (context::EnterRuntime::NotEntered, true) => {
409 return Ok(());
412 }
413 (context::EnterRuntime::NotEntered, false) => {
414 return Ok(());
417 }
418 }
419
420 let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
421
422 let core = match cx.core.borrow_mut().take() {
424 Some(core) => core,
425 None => return Ok(()),
426 };
427
428 cx.handoff_core.set(core);
433
434 let index = cx.index;
440 let handle = cx.handle.clone();
441 let handoff_core = cx.handoff_core.clone();
442 runtime::spawn_blocking(move || run(index, handle, handoff_core, true));
443 Ok(())
444 });
445
446 if let Err(panic_message) = setup_result {
447 panic!("{}", panic_message);
448 }
449
450 if had_entered {
451 let _reset = Reset(coop::stop());
454
455 crate::runtime::context::exit_runtime(f)
456 } else {
457 f()
458 }
459}
460
461fn run(
462 index: usize,
463 handle: Arc<Handle>,
464 handoff_core: Arc<AtomicCell<Core>>,
465 blocking_in_place: bool,
466) {
467 struct AbortOnPanic;
468
469 impl Drop for AbortOnPanic {
470 fn drop(&mut self) {
471 if std::thread::panicking() {
472 eprintln!("worker thread panicking; aborting process");
473 std::process::abort();
474 }
475 }
476 }
477
478 #[cfg(debug_assertions)]
481 let _abort_on_panic = AbortOnPanic;
482
483 let num_workers = handle.shared.condvars.len();
484
485 let mut worker = Worker {
486 tick: 0,
487 num_seq_local_queue_polls: 0,
488 global_queue_interval: Stats::DEFAULT_GLOBAL_QUEUE_INTERVAL,
489 is_shutdown: false,
490 is_traced: false,
491 workers_to_notify: Vec::with_capacity(num_workers - 1),
492 idle_snapshot: idle::Snapshot::new(&handle.shared.idle),
493 stats: stats::Ephemeral::new(),
494 };
495
496 let sched_handle = scheduler::Handle::MultiThreadAlt(handle.clone());
497
498 crate::runtime::context::enter_runtime(&sched_handle, true, |_| {
499 let cx = scheduler::Context::MultiThreadAlt(Context {
501 index,
502 lifo_enabled: Cell::new(!handle.shared.config.disable_lifo_slot),
503 handle,
504 core: RefCell::new(None),
505 handoff_core,
506 defer: RefCell::new(Vec::with_capacity(64)),
507 });
508
509 context::set_scheduler(&cx, || {
510 let cx = cx.expect_multi_thread_alt();
511
512 let res = worker.run(&cx, blocking_in_place);
514 debug_assert!(res.is_err());
517
518 if !cx.defer.borrow().is_empty() {
522 worker.schedule_deferred_without_core(&cx, &mut cx.shared().synced.lock());
523 }
524 });
525 });
526}
527
528macro_rules! try_task {
529 ($e:expr) => {{
530 let (task, core) = $e?;
531 if task.is_some() {
532 return Ok((task, core));
533 }
534 core
535 }};
536}
537
538macro_rules! try_task_new_batch {
539 ($w:expr, $e:expr) => {{
540 let (task, mut core) = $e?;
541 if task.is_some() {
542 core.stats.start_processing_scheduled_tasks(&mut $w.stats);
543 return Ok((task, core));
544 }
545 core
546 }};
547}
548
549impl Worker {
550 fn run(&mut self, cx: &Context, blocking_in_place: bool) -> RunResult {
551 let (maybe_task, mut core) = {
552 if blocking_in_place {
553 if let Some(core) = cx.handoff_core.take() {
554 (None, core)
555 } else {
556 return Err(());
558 }
559 } else {
560 let mut synced = cx.shared().synced.lock();
561
562 if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
564 let maybe_task = cx.shared().next_remote_task_synced(&mut synced);
566 (maybe_task, core)
567 } else {
568 self.wait_for_core(cx, synced)?
570 }
571 }
572 };
573
574 cx.shared().worker_metrics[core.index].set_thread_id(thread::current().id());
575 core.stats.start_processing_scheduled_tasks(&mut self.stats);
576
577 if let Some(task) = maybe_task {
578 core = self.run_task(cx, core, task)?;
579 }
580
581 while !self.is_shutdown {
582 let (maybe_task, c) = self.next_task(cx, core)?;
583 core = c;
584
585 if let Some(task) = maybe_task {
586 core = self.run_task(cx, core, task)?;
587 } else {
588 assert!(self.is_shutdown);
591 break;
592 }
593 }
594
595 cx.shared().shutdown_core(&cx.handle, core);
596
597 self.shutdown_clear_defer(cx);
600
601 Err(())
602 }
603
604 fn try_acquire_available_core(
606 &mut self,
607 cx: &Context,
608 synced: &mut Synced,
609 ) -> Option<Box<Core>> {
610 if let Some(mut core) = cx
611 .shared()
612 .idle
613 .try_acquire_available_core(&mut synced.idle)
614 {
615 self.reset_acquired_core(cx, synced, &mut core);
616 Some(core)
617 } else {
618 None
619 }
620 }
621
622 fn wait_for_core(
624 &mut self,
625 cx: &Context,
626 mut synced: MutexGuard<'_, Synced>,
627 ) -> NextTaskResult {
628 if cx.shared().idle.needs_searching() {
629 if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
630 cx.shared().idle.transition_worker_to_searching(&mut core);
631 return Ok((None, core));
632 }
633 }
634
635 cx.shared()
636 .idle
637 .transition_worker_to_parked(&mut synced, cx.index);
638
639 let mut core = loop {
641 if let Some(core) = synced.assigned_cores[cx.index].take() {
642 break core;
643 }
644
645 if cx.shared().inject.is_closed(&synced.inject) {
647 self.shutdown_clear_defer(cx);
648 return Err(());
649 }
650
651 synced = cx.shared().condvars[cx.index].wait(synced).unwrap();
652 };
653
654 self.reset_acquired_core(cx, &mut synced, &mut core);
655
656 if self.is_shutdown {
657 return Ok((None, core));
659 }
660
661 let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
662 let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n);
663
664 core.stats.unparked();
665 self.flush_metrics(cx, &mut core);
666
667 Ok((maybe_task, core))
668 }
669
670 fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) {
672 self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config);
673
674 self.reset_lifo_enabled(cx);
677
678 #[cfg(not(loom))]
680 debug_assert!(core.run_queue.is_empty());
681
682 self.update_global_flags(cx, synced);
684 }
685
686 fn next_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
689 self.assert_lifo_enabled_is_correct(cx);
690
691 if self.is_traced {
692 core = cx.handle.trace_core(core);
693 }
694
695 self.tick = self.tick.wrapping_add(1);
697
698 core = try_task!(self.maybe_maintenance(&cx, core));
701
702 core = try_task!(self.next_notified_task(cx, core));
705
706 core.stats.end_processing_scheduled_tasks(&mut self.stats);
708
709 super::counters::inc_num_no_local_work();
710
711 if !cx.defer.borrow().is_empty() {
712 try_task_new_batch!(self, self.park_yield(cx, core));
715
716 panic!("what happened to the deferred tasks? 🤔");
717 }
718
719 while !self.is_shutdown {
720 core = try_task_new_batch!(self, self.search_for_work(cx, core));
724
725 debug_assert!(cx.defer.borrow().is_empty());
726 core = try_task_new_batch!(self, self.park(cx, core));
727 }
728
729 self.shutdown_clear_defer(cx);
731
732 Ok((None, core))
733 }
734
735 fn next_notified_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
736 self.num_seq_local_queue_polls += 1;
737
738 if self.num_seq_local_queue_polls % self.global_queue_interval == 0 {
739 super::counters::inc_global_queue_interval();
740
741 self.num_seq_local_queue_polls = 0;
742
743 self.tune_global_queue_interval(cx, &mut core);
745
746 if let Some(task) = self.next_remote_task(cx) {
747 return Ok((Some(task), core));
748 }
749 }
750
751 if let Some(task) = core.next_local_task() {
752 return Ok((Some(task), core));
753 }
754
755 self.next_remote_task_batch(cx, core)
756 }
757
758 fn next_remote_task(&self, cx: &Context) -> Option<Notified> {
759 if cx.shared().inject.is_empty() {
760 return None;
761 }
762
763 let mut synced = cx.shared().synced.lock();
764 cx.shared().next_remote_task_synced(&mut synced)
765 }
766
767 fn next_remote_task_batch(&self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
768 if cx.shared().inject.is_empty() {
769 return Ok((None, core));
770 }
771
772 let cap = usize::min(
777 core.run_queue.remaining_slots(),
778 usize::max(core.run_queue.max_capacity() / 2, 1),
779 );
780
781 let mut synced = cx.shared().synced.lock();
782 let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, cap);
783 Ok((maybe_task, core))
784 }
785
786 fn next_remote_task_batch_synced(
787 &self,
788 cx: &Context,
789 synced: &mut Synced,
790 core: &mut Core,
791 max: usize,
792 ) -> Option<Notified> {
793 super::counters::inc_num_remote_batch();
794
795 let n = if core.is_searching {
799 cx.shared().inject.len() / cx.shared().idle.num_searching() + 1
800 } else {
801 cx.shared().inject.len() / cx.shared().remotes.len() + 1
802 };
803
804 let n = usize::min(n, max) + 1;
805
806 let mut tasks = unsafe { cx.shared().inject.pop_n(&mut synced.inject, n) };
808
809 let ret = tasks.next();
811
812 core.run_queue.push_back(tasks);
814
815 ret
816 }
817
818 fn search_for_work(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
824 #[cfg(not(loom))]
825 const ROUNDS: usize = 4;
826
827 #[cfg(loom)]
828 const ROUNDS: usize = 1;
829
830 debug_assert!(core.lifo_slot.is_none());
831 #[cfg(not(loom))]
832 debug_assert!(core.run_queue.is_empty());
833
834 if !core.run_queue.can_steal() {
835 return Ok((None, core));
836 }
837
838 if !self.transition_to_searching(cx, &mut core) {
839 return Ok((None, core));
840 }
841
842 cx.shared().idle.snapshot(&mut self.idle_snapshot);
846
847 let num = cx.shared().remotes.len();
848
849 for i in 0..ROUNDS {
850 let start = core.rand.fastrand_n(num as u32) as usize;
852
853 if let Some(task) = self.steal_one_round(cx, &mut core, start) {
854 return Ok((Some(task), core));
855 }
856
857 core = try_task!(self.next_remote_task_batch(cx, core));
858
859 if i > 0 {
860 super::counters::inc_num_spin_stall();
861 std::thread::sleep(std::time::Duration::from_micros(i as u64));
862 }
863 }
864
865 Ok((None, core))
866 }
867
868 fn steal_one_round(&self, cx: &Context, core: &mut Core, start: usize) -> Option<Notified> {
869 let num = cx.shared().remotes.len();
870
871 for i in 0..num {
872 let i = (start + i) % num;
873
874 if i == core.index {
876 continue;
877 }
878
879 if self.idle_snapshot.is_idle(i) {
881 continue;
882 }
883
884 let target = &cx.shared().remotes[i];
885
886 if let Some(task) = target
887 .steal
888 .steal_into(&mut core.run_queue, &mut core.stats)
889 {
890 return Some(task);
891 }
892 }
893
894 None
895 }
896
897 fn run_task(&mut self, cx: &Context, mut core: Box<Core>, task: Notified) -> RunResult {
898 let task = cx.shared().owned.assert_owner(task);
899
900 if self.transition_from_searching(cx, &mut core) {
903 super::counters::inc_num_relay_search();
904 cx.shared().notify_parked_local();
905 }
906
907 self.assert_lifo_enabled_is_correct(cx);
908
909 core.stats.start_poll(&mut self.stats);
914
915 *cx.core.borrow_mut() = Some(core);
917
918 coop::budget(|| {
920 super::counters::inc_num_polls();
921 task.run();
922 let mut lifo_polls = 0;
923
924 loop {
927 let mut core = match cx.core.borrow_mut().take() {
930 Some(core) => core,
931 None => {
932 return Err(());
936 }
937 };
938
939 let task = match core.next_lifo_task() {
941 Some(task) => task,
942 None => {
943 self.reset_lifo_enabled(cx);
944 core.stats.end_poll();
945 return Ok(core);
946 }
947 };
948
949 if !coop::has_budget_remaining() {
950 core.stats.end_poll();
951
952 core.run_queue
955 .push_back_or_overflow(task, cx.shared(), &mut core.stats);
956 debug_assert!(cx.lifo_enabled.get());
959 return Ok(core);
960 }
961
962 lifo_polls += 1;
964 super::counters::inc_lifo_schedules();
965
966 if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
974 cx.lifo_enabled.set(false);
975 super::counters::inc_lifo_capped();
976 }
977
978 *cx.core.borrow_mut() = Some(core);
980 let task = cx.shared().owned.assert_owner(task);
981 super::counters::inc_num_lifo_polls();
982 task.run();
983 }
984 })
985 }
986
987 fn schedule_deferred_with_core<'a>(
988 &mut self,
989 cx: &'a Context,
990 mut core: Box<Core>,
991 synced: impl FnOnce() -> MutexGuard<'a, Synced>,
992 ) -> NextTaskResult {
993 let mut defer = cx.defer.borrow_mut();
994
995 let task = defer.pop();
997
998 if task.is_none() {
999 return Ok((None, core));
1000 }
1001
1002 if !defer.is_empty() {
1003 let mut synced = synced();
1004
1005 let num_fanout = cmp::min(defer.len(), cx.shared().idle.num_idle(&synced.idle));
1007
1008 let num_fanout = cmp::min(2, num_fanout);
1015
1016 if num_fanout > 0 {
1017 cx.shared()
1018 .push_remote_task_batch_synced(&mut synced, defer.drain(..num_fanout));
1019
1020 cx.shared()
1021 .idle
1022 .notify_mult(&mut synced, &mut self.workers_to_notify, num_fanout);
1023 }
1024
1025 drop(synced);
1027 }
1028
1029 for worker in self.workers_to_notify.drain(..) {
1031 cx.shared().condvars[worker].notify_one()
1032 }
1033
1034 if !defer.is_empty() {
1035 for task in defer.drain(..) {
1037 core.run_queue
1038 .push_back_or_overflow(task, cx.shared(), &mut core.stats);
1039 }
1040
1041 cx.shared().notify_parked_local();
1042 }
1043
1044 Ok((task, core))
1045 }
1046
1047 fn schedule_deferred_without_core<'a>(&mut self, cx: &Context, synced: &mut Synced) {
1048 let mut defer = cx.defer.borrow_mut();
1049 let num = defer.len();
1050
1051 if num > 0 {
1052 cx.shared()
1054 .push_remote_task_batch_synced(synced, defer.drain(..));
1055
1056 debug_assert!(self.workers_to_notify.is_empty());
1057
1058 cx.shared()
1060 .idle
1061 .notify_mult(synced, &mut self.workers_to_notify, num);
1062
1063 for worker in self.workers_to_notify.drain(..) {
1065 cx.shared().condvars[worker].notify_one()
1066 }
1067 }
1068 }
1069
1070 fn maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1071 if self.tick % cx.shared().config.event_interval == 0 {
1072 super::counters::inc_num_maintenance();
1073
1074 core.stats.end_processing_scheduled_tasks(&mut self.stats);
1075
1076 core = try_task_new_batch!(self, self.park_yield(cx, core));
1078
1079 core.stats.start_processing_scheduled_tasks(&mut self.stats);
1080 }
1081
1082 Ok((None, core))
1083 }
1084
1085 fn flush_metrics(&self, cx: &Context, core: &mut Core) {
1086 core.stats.submit(&cx.shared().worker_metrics[core.index]);
1087 }
1088
1089 fn update_global_flags(&mut self, cx: &Context, synced: &mut Synced) {
1090 if !self.is_shutdown {
1091 self.is_shutdown = cx.shared().inject.is_closed(&synced.inject);
1092 }
1093
1094 if !self.is_traced {
1095 self.is_traced = cx.shared().trace_status.trace_requested();
1096 }
1097 }
1098
1099 fn park_yield(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult {
1100 if let Some(mut driver) = cx.shared().driver.take() {
1103 driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
1104
1105 cx.shared().driver.set(driver);
1106 }
1107
1108 let (maybe_task, mut core) =
1110 self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())?;
1111
1112 self.flush_metrics(cx, &mut core);
1113 self.update_global_flags(cx, &mut cx.shared().synced.lock());
1114
1115 Ok((maybe_task, core))
1116 }
1117
1118 fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1136 if let Some(f) = &cx.shared().config.before_park {
1137 f();
1138 }
1139
1140 if self.can_transition_to_parked(&mut core) {
1141 debug_assert!(!self.is_shutdown);
1142 debug_assert!(!self.is_traced);
1143
1144 core = try_task!(self.do_park(cx, core));
1145 }
1146
1147 if let Some(f) = &cx.shared().config.after_unpark {
1148 f();
1149 }
1150
1151 Ok((None, core))
1152 }
1153
1154 fn do_park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1155 let was_searching = core.is_searching;
1156
1157 let mut synced = cx.shared().synced.lock();
1159
1160 #[cfg(not(loom))]
1162 debug_assert!(core.run_queue.is_empty());
1163
1164 let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
1166 if let Some(task) = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n) {
1167 return Ok((Some(task), core));
1168 }
1169
1170 if !was_searching {
1171 if cx
1172 .shared()
1173 .idle
1174 .transition_worker_to_searching_if_needed(&mut synced.idle, &mut core)
1175 {
1176 return Ok((None, core));
1178 }
1179 }
1180
1181 super::counters::inc_num_parks();
1182 core.stats.about_to_park();
1183 self.flush_metrics(cx, &mut core);
1185
1186 self.update_global_flags(cx, &mut synced);
1188
1189 if self.is_shutdown {
1190 return Ok((None, core));
1191 }
1192
1193 core.is_searching = false;
1195 cx.shared().idle.release_core(&mut synced, core);
1196
1197 drop(synced);
1198
1199 if was_searching {
1200 if cx.shared().idle.transition_worker_from_searching() {
1201 for i in 0..cx.shared().remotes.len() {
1204 if !cx.shared().remotes[i].steal.is_empty() {
1205 let mut synced = cx.shared().synced.lock();
1206
1207 if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
1209 cx.shared().idle.transition_worker_to_searching(&mut core);
1210 return Ok((None, core));
1211 } else {
1212 break;
1214 }
1215 }
1216 }
1217 }
1218 }
1219
1220 if let Some(mut driver) = cx.shared().take_driver() {
1221 driver.park(&cx.handle.driver);
1223
1224 synced = cx.shared().synced.lock();
1225
1226 if cx.shared().inject.is_closed(&mut synced.inject) {
1227 synced.shutdown_driver = Some(driver);
1228 self.shutdown_clear_defer(cx);
1229 cx.shared().shutdown_finalize(&cx.handle, &mut synced);
1230 return Err(());
1231 }
1232
1233 cx.shared().driver.set(driver);
1235
1236 if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
1238 self.schedule_deferred_with_core(cx, core, move || synced)
1240 } else {
1241 self.schedule_deferred_without_core(cx, &mut synced);
1243
1244 self.wait_for_core(cx, synced)
1246 }
1247 } else {
1248 synced = cx.shared().synced.lock();
1249
1250 self.wait_for_core(cx, synced)
1252 }
1253 }
1254
1255 fn transition_to_searching(&self, cx: &Context, core: &mut Core) -> bool {
1256 if !core.is_searching {
1257 cx.shared().idle.try_transition_worker_to_searching(core);
1258 }
1259
1260 core.is_searching
1261 }
1262
1263 fn transition_from_searching(&self, cx: &Context, core: &mut Core) -> bool {
1265 if !core.is_searching {
1266 return false;
1267 }
1268
1269 core.is_searching = false;
1270 cx.shared().idle.transition_worker_from_searching()
1271 }
1272
1273 fn can_transition_to_parked(&self, core: &mut Core) -> bool {
1274 !self.has_tasks(core) && !self.is_shutdown && !self.is_traced
1275 }
1276
1277 fn has_tasks(&self, core: &Core) -> bool {
1278 core.lifo_slot.is_some() || !core.run_queue.is_empty()
1279 }
1280
1281 fn reset_lifo_enabled(&self, cx: &Context) {
1282 cx.lifo_enabled
1283 .set(!cx.handle.shared.config.disable_lifo_slot);
1284 }
1285
1286 fn assert_lifo_enabled_is_correct(&self, cx: &Context) {
1287 debug_assert_eq!(
1288 cx.lifo_enabled.get(),
1289 !cx.handle.shared.config.disable_lifo_slot
1290 );
1291 }
1292
1293 fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) {
1294 let next = core.stats.tuned_global_queue_interval(&cx.shared().config);
1295
1296 if u32::abs_diff(self.global_queue_interval, next) > 2 {
1298 self.global_queue_interval = next;
1299 }
1300 }
1301
1302 fn shutdown_clear_defer(&self, cx: &Context) {
1303 let mut defer = cx.defer.borrow_mut();
1304
1305 for task in defer.drain(..) {
1306 drop(task);
1307 }
1308 }
1309}
1310
1311impl Context {
1312 pub(crate) fn defer(&self, waker: &Waker) {
1313 waker.wake_by_ref();
1315 }
1316
1317 fn shared(&self) -> &Shared {
1318 &self.handle.shared
1319 }
1320
1321 #[cfg_attr(not(feature = "time"), allow(dead_code))]
1322 pub(crate) fn get_worker_index(&self) -> usize {
1323 self.index
1324 }
1325}
1326
1327impl Core {
1328 fn next_local_task(&mut self) -> Option<Notified> {
1329 self.next_lifo_task().or_else(|| self.run_queue.pop())
1330 }
1331
1332 fn next_lifo_task(&mut self) -> Option<Notified> {
1333 self.lifo_slot.take()
1334 }
1335}
1336
1337impl Shared {
1338 fn next_remote_task_synced(&self, synced: &mut Synced) -> Option<Notified> {
1339 unsafe { self.inject.pop(&mut synced.inject) }
1341 }
1342
1343 pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1344 use std::ptr;
1345
1346 with_current(|maybe_cx| {
1347 if let Some(cx) = maybe_cx {
1348 if ptr::eq(self, &cx.handle.shared) {
1350 if let Some(core) = cx.core.borrow_mut().as_mut() {
1352 if is_yield {
1353 cx.defer.borrow_mut().push(task);
1354 } else {
1355 self.schedule_local(cx, core, task);
1356 }
1357 } else {
1358 cx.defer.borrow_mut().push(task);
1362 }
1363 return;
1364 }
1365 }
1366
1367 self.schedule_remote(task);
1369 })
1370 }
1371
1372 fn schedule_local(&self, cx: &Context, core: &mut Core, task: Notified) {
1373 core.stats.inc_local_schedule_count();
1374
1375 if cx.lifo_enabled.get() {
1376 let prev = std::mem::replace(&mut core.lifo_slot, Some(task));
1378 if let Some(prev) = prev {
1381 core.run_queue
1382 .push_back_or_overflow(prev, self, &mut core.stats);
1383 } else {
1384 return;
1385 }
1386 } else {
1387 core.run_queue
1388 .push_back_or_overflow(task, self, &mut core.stats);
1389 }
1390
1391 self.notify_parked_local();
1392 }
1393
1394 fn notify_parked_local(&self) {
1395 super::counters::inc_num_inc_notify_local();
1396 self.idle.notify_local(self);
1397 }
1398
1399 fn schedule_remote(&self, task: Notified) {
1400 super::counters::inc_num_notify_remote();
1401 self.scheduler_metrics.inc_remote_schedule_count();
1402
1403 let mut synced = self.synced.lock();
1404 self.push_remote_task(&mut synced, task);
1406
1407 self.idle.notify_remote(synced, self);
1410 }
1411
1412 pub(super) fn close(&self, handle: &Handle) {
1413 {
1414 let mut synced = self.synced.lock();
1415
1416 if let Some(driver) = self.driver.take() {
1417 synced.shutdown_driver = Some(driver);
1418 }
1419
1420 if !self.inject.close(&mut synced.inject) {
1421 return;
1422 }
1423
1424 self.idle.shutdown(&mut synced, self);
1426 }
1427
1428 self.idle.shutdown_unassigned_cores(handle, self);
1431 }
1432
1433 fn push_remote_task(&self, synced: &mut Synced, task: Notified) {
1434 unsafe {
1436 self.inject.push(&mut synced.inject, task);
1437 }
1438 }
1439
1440 fn push_remote_task_batch<I>(&self, iter: I)
1441 where
1442 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1443 {
1444 unsafe {
1445 self.inject.push_batch(self, iter);
1446 }
1447 }
1448
1449 fn push_remote_task_batch_synced<I>(&self, synced: &mut Synced, iter: I)
1450 where
1451 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1452 {
1453 unsafe {
1454 self.inject.push_batch(&mut synced.inject, iter);
1455 }
1456 }
1457
1458 fn take_driver(&self) -> Option<Box<Driver>> {
1459 if !self.driver_enabled() {
1460 return None;
1461 }
1462
1463 self.driver.take()
1464 }
1465
1466 fn driver_enabled(&self) -> bool {
1467 self.condvars.len() > self.remotes.len()
1468 }
1469
1470 pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
1471 let start = core.rand.fastrand_n(self.owned.get_shard_size() as u32);
1473 self.owned.close_and_shutdown_all(start as usize);
1474
1475 core.stats.submit(&self.worker_metrics[core.index]);
1476
1477 let mut synced = self.synced.lock();
1478 synced.shutdown_cores.push(core);
1479
1480 self.shutdown_finalize(handle, &mut synced);
1481 }
1482
1483 pub(super) fn shutdown_finalize(&self, handle: &Handle, synced: &mut Synced) {
1484 if synced.shutdown_cores.len() != self.remotes.len() {
1486 return;
1487 }
1488
1489 let driver = synced.shutdown_driver.take();
1490
1491 if self.driver_enabled() && driver.is_none() {
1492 return;
1493 }
1494
1495 debug_assert!(self.owned.is_empty());
1496
1497 for mut core in synced.shutdown_cores.drain(..) {
1498 while core.next_local_task().is_some() {}
1500 }
1501
1502 if let Some(mut driver) = driver {
1504 driver.shutdown(&handle.driver);
1505 }
1506
1507 while let Some(task) = self.next_remote_task_synced(synced) {
1514 drop(task);
1515 }
1516 }
1517}
1518
1519impl Overflow<Arc<Handle>> for Shared {
1520 fn push(&self, task: task::Notified<Arc<Handle>>) {
1521 self.push_remote_task(&mut self.synced.lock(), task);
1522 }
1523
1524 fn push_batch<I>(&self, iter: I)
1525 where
1526 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1527 {
1528 self.push_remote_task_batch(iter)
1529 }
1530}
1531
1532impl<'a> Lock<inject::Synced> for &'a Shared {
1533 type Handle = SyncedGuard<'a>;
1534
1535 fn lock(self) -> Self::Handle {
1536 SyncedGuard {
1537 lock: self.synced.lock(),
1538 }
1539 }
1540}
1541
1542impl<'a> Lock<Synced> for &'a Shared {
1543 type Handle = SyncedGuard<'a>;
1544
1545 fn lock(self) -> Self::Handle {
1546 SyncedGuard {
1547 lock: self.synced.lock(),
1548 }
1549 }
1550}
1551
1552impl task::Schedule for Arc<Handle> {
1553 fn release(&self, task: &Task) -> Option<Task> {
1554 self.shared.owned.remove(task)
1555 }
1556
1557 fn schedule(&self, task: Notified) {
1558 self.shared.schedule_task(task, false);
1559 }
1560
1561 fn hooks(&self) -> TaskHarnessScheduleHooks {
1562 TaskHarnessScheduleHooks {
1563 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
1564 }
1565 }
1566
1567 fn yield_now(&self, task: Notified) {
1568 self.shared.schedule_task(task, true);
1569 }
1570}
1571
1572impl AsMut<Synced> for Synced {
1573 fn as_mut(&mut self) -> &mut Synced {
1574 self
1575 }
1576}
1577
1578pub(crate) struct SyncedGuard<'a> {
1579 lock: crate::loom::sync::MutexGuard<'a, Synced>,
1580}
1581
1582impl<'a> AsMut<inject::Synced> for SyncedGuard<'a> {
1583 fn as_mut(&mut self) -> &mut inject::Synced {
1584 &mut self.lock.inject
1585 }
1586}
1587
1588impl<'a> AsMut<Synced> for SyncedGuard<'a> {
1589 fn as_mut(&mut self) -> &mut Synced {
1590 &mut self.lock
1591 }
1592}
1593
1594#[track_caller]
1595fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1596 use scheduler::Context::MultiThreadAlt;
1597
1598 context::with_scheduler(|ctx| match ctx {
1599 Some(MultiThreadAlt(ctx)) => f(Some(ctx)),
1600 _ => f(None),
1601 })
1602}