tokio/runtime/scheduler/multi_thread/worker/
taskdump.rs

1use super::{Core, Handle, Shared};
2
3use crate::loom::sync::Arc;
4use crate::runtime::scheduler::multi_thread::Stats;
5use crate::runtime::task::trace::trace_multi_thread;
6use crate::runtime::{dump, WorkerMetrics};
7
8use std::time::Duration;
9
10impl Handle {
11    pub(super) fn trace_core(&self, mut core: Box<Core>) -> Box<Core> {
12        core.is_traced = false;
13
14        if core.is_shutdown {
15            return core;
16        }
17
18        // wait for other workers, or timeout without tracing
19        let timeout = Duration::from_millis(250); // a _very_ generous timeout
20        let barrier =
21            if let Some(barrier) = self.shared.trace_status.trace_start.wait_timeout(timeout) {
22                barrier
23            } else {
24                // don't attempt to trace
25                return core;
26            };
27
28        if !barrier.is_leader() {
29            // wait for leader to finish tracing
30            self.shared.trace_status.trace_end.wait();
31            return core;
32        }
33
34        // trace
35
36        let owned = &self.shared.owned;
37        let mut local = self.shared.steal_all();
38        let synced = &self.shared.synced;
39        let injection = &self.shared.inject;
40
41        // safety: `trace_multi_thread` is invoked with the same `synced` that `injection`
42        // was created with.
43        let traces = unsafe { trace_multi_thread(owned, &mut local, synced, injection) }
44            .into_iter()
45            .map(|(id, trace)| dump::Task::new(id, trace))
46            .collect();
47
48        let result = dump::Dump::new(traces);
49
50        // stash the result
51        self.shared.trace_status.stash_result(result);
52
53        // allow other workers to proceed
54        self.shared.trace_status.trace_end.wait();
55
56        core
57    }
58}
59
60impl Shared {
61    /// Steal all tasks from remotes into a single local queue.
62    pub(super) fn steal_all(&self) -> super::queue::Local<Arc<Handle>> {
63        let (_steal, mut local) = super::queue::local();
64
65        let worker_metrics = WorkerMetrics::new();
66        let mut stats = Stats::new(&worker_metrics);
67
68        for remote in self.remotes.iter() {
69            let steal = &remote.steal;
70            while !steal.is_empty() {
71                if let Some(task) = steal.steal_into(&mut local, &mut stats) {
72                    local.push_back([task].into_iter());
73                }
74            }
75        }
76
77        local
78    }
79}