tokio/runtime/task/trace/
mod.rs1use crate::loom::sync::Arc;
2use crate::runtime::context;
3use crate::runtime::scheduler::{self, current_thread, Inject};
4use crate::task::Id;
5
6use backtrace::BacktraceFrame;
7use std::cell::Cell;
8use std::collections::VecDeque;
9use std::ffi::c_void;
10use std::fmt;
11use std::future::Future;
12use std::pin::Pin;
13use std::ptr::{self, NonNull};
14use std::task::{self, Poll};
15
16mod symbol;
17mod tree;
18
19use symbol::Symbol;
20use tree::Tree;
21
22use super::{Notified, OwnedTasks, Schedule};
23
24type Backtrace = Vec<BacktraceFrame>;
25type SymbolTrace = Vec<Symbol>;
26
27pub(crate) struct Context {
29 active_frame: Cell<Option<NonNull<Frame>>>,
32 collector: Cell<Option<Trace>>,
34}
35
36struct Frame {
38 inner_addr: *const c_void,
40
41 parent: Option<NonNull<Frame>>,
43}
44
45#[derive(Clone, Debug)]
50pub(crate) struct Trace {
51 backtraces: Vec<Backtrace>,
54}
55
56pin_project_lite::pin_project! {
57 #[derive(Debug, Clone)]
58 #[must_use = "futures do nothing unless you `.await` or poll them"]
59 pub struct Root<T> {
61 #[pin]
62 future: T,
63 }
64}
65
66const FAIL_NO_THREAD_LOCAL: &str = "The Tokio thread-local has been destroyed \
67 as part of shutting down the current \
68 thread, so collecting a taskdump is not \
69 possible.";
70
71impl Context {
72 pub(crate) const fn new() -> Self {
73 Context {
74 active_frame: Cell::new(None),
75 collector: Cell::new(None),
76 }
77 }
78
79 unsafe fn try_with_current<F, R>(f: F) -> Option<R>
82 where
83 F: FnOnce(&Self) -> R,
84 {
85 crate::runtime::context::with_trace(f)
86 }
87
88 unsafe fn with_current_frame<F, R>(f: F) -> R
89 where
90 F: FnOnce(&Cell<Option<NonNull<Frame>>>) -> R,
91 {
92 Self::try_with_current(|context| f(&context.active_frame)).expect(FAIL_NO_THREAD_LOCAL)
93 }
94
95 fn with_current_collector<F, R>(f: F) -> R
96 where
97 F: FnOnce(&Cell<Option<Trace>>) -> R,
98 {
99 unsafe {
102 Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL)
103 }
104 }
105
106 pub(crate) fn is_tracing() -> bool {
108 Self::with_current_collector(|maybe_collector| {
109 let collector = maybe_collector.take();
110 let result = collector.is_some();
111 maybe_collector.set(collector);
112 result
113 })
114 }
115}
116
117impl Trace {
118 #[inline(never)]
121 pub(crate) fn capture<F, R>(f: F) -> (R, Trace)
122 where
123 F: FnOnce() -> R,
124 {
125 let collector = Trace { backtraces: vec![] };
126
127 let previous = Context::with_current_collector(|current| current.replace(Some(collector)));
128
129 let result = f();
130
131 let collector =
132 Context::with_current_collector(|current| current.replace(previous)).unwrap();
133
134 (result, collector)
135 }
136
137 #[inline(never)]
139 pub(crate) fn root<F>(future: F) -> Root<F> {
140 Root { future }
141 }
142
143 pub(crate) fn backtraces(&self) -> &[Backtrace] {
144 &self.backtraces
145 }
146}
147
148#[inline(never)]
158pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
159 let did_trace = unsafe {
161 Context::try_with_current(|context_cell| {
162 if let Some(mut collector) = context_cell.collector.take() {
163 let mut frames = vec![];
164 let mut above_leaf = false;
165
166 if let Some(active_frame) = context_cell.active_frame.get() {
167 let active_frame = active_frame.as_ref();
168
169 backtrace::trace(|frame| {
170 let below_root = !ptr::eq(frame.symbol_address(), active_frame.inner_addr);
171
172 if above_leaf && below_root {
175 frames.push(frame.to_owned().into());
176 }
177
178 if ptr::eq(frame.symbol_address(), trace_leaf as *const _) {
179 above_leaf = true;
180 }
181
182 below_root
184 });
185 }
186 collector.backtraces.push(frames);
187 context_cell.collector.set(Some(collector));
188 true
189 } else {
190 false
191 }
192 })
193 .unwrap_or(false)
194 };
195
196 if did_trace {
197 context::with_scheduler(|scheduler| {
200 if let Some(scheduler) = scheduler {
201 match scheduler {
202 scheduler::Context::CurrentThread(s) => s.defer.defer(cx.waker()),
203 #[cfg(feature = "rt-multi-thread")]
204 scheduler::Context::MultiThread(s) => s.defer.defer(cx.waker()),
205 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
206 scheduler::Context::MultiThreadAlt(_) => unimplemented!(),
207 }
208 }
209 });
210
211 Poll::Pending
212 } else {
213 Poll::Ready(())
214 }
215}
216
217impl fmt::Display for Trace {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 Tree::from_trace(self.clone()).fmt(f)
220 }
221}
222
223fn defer<F: FnOnce() -> R, R>(f: F) -> impl Drop {
224 use std::mem::ManuallyDrop;
225
226 struct Defer<F: FnOnce() -> R, R>(ManuallyDrop<F>);
227
228 impl<F: FnOnce() -> R, R> Drop for Defer<F, R> {
229 #[inline(always)]
230 fn drop(&mut self) {
231 unsafe {
232 ManuallyDrop::take(&mut self.0)();
233 }
234 }
235 }
236
237 Defer(ManuallyDrop::new(f))
238}
239
240impl<T: Future> Future for Root<T> {
241 type Output = T::Output;
242
243 #[inline(never)]
244 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
245 unsafe {
248 let mut frame = Frame {
249 inner_addr: Self::poll as *const c_void,
250 parent: None,
251 };
252
253 Context::with_current_frame(|current| {
254 frame.parent = current.take();
255 current.set(Some(NonNull::from(&frame)));
256 });
257
258 let _restore = defer(|| {
259 Context::with_current_frame(|current| {
260 current.set(frame.parent);
261 });
262 });
263
264 let this = self.project();
265 this.future.poll(cx)
266 }
267 }
268}
269
270pub(in crate::runtime) fn trace_current_thread(
272 owned: &OwnedTasks<Arc<current_thread::Handle>>,
273 local: &mut VecDeque<Notified<Arc<current_thread::Handle>>>,
274 injection: &Inject<Arc<current_thread::Handle>>,
275) -> Vec<(Id, Trace)> {
276 let mut dequeued = Vec::new();
279
280 while let Some(task) = local.pop_back() {
281 dequeued.push(task);
282 }
283
284 while let Some(task) = injection.pop() {
285 dequeued.push(task);
286 }
287
288 trace_owned(owned, dequeued)
290}
291
292cfg_rt_multi_thread! {
293 use crate::loom::sync::Mutex;
294 use crate::runtime::scheduler::multi_thread;
295 use crate::runtime::scheduler::multi_thread::Synced;
296 use crate::runtime::scheduler::inject::Shared;
297
298 pub(in crate::runtime) unsafe fn trace_multi_thread(
304 owned: &OwnedTasks<Arc<multi_thread::Handle>>,
305 local: &mut multi_thread::queue::Local<Arc<multi_thread::Handle>>,
306 synced: &Mutex<Synced>,
307 injection: &Shared<Arc<multi_thread::Handle>>,
308 ) -> Vec<(Id, Trace)> {
309 let mut dequeued = Vec::new();
310
311 while let Some(notified) = local.pop() {
313 dequeued.push(notified);
314 }
315
316 let mut synced = synced.lock();
318 while let Some(notified) = injection.pop(&mut synced.inject) {
319 dequeued.push(notified);
320 }
321
322 drop(synced);
323
324 trace_owned(owned, dequeued)
327 }
328}
329
330fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<(Id, Trace)> {
337 let mut tasks = dequeued;
338 owned.for_each(|task| {
341 if let Some(notified) = task.notify_for_tracing() {
345 tasks.push(notified);
346 }
347 });
351
352 tasks
353 .into_iter()
354 .map(|task| {
355 let local_notified = owned.assert_owner(task);
356 let id = local_notified.task.id();
357 let ((), trace) = Trace::capture(|| local_notified.run());
358 (id, trace)
359 })
360 .collect()
361}