tokio/runtime/metrics/
batch.rs

1use crate::runtime::metrics::{HistogramBatch, WorkerMetrics};
2
3use std::sync::atomic::Ordering::Relaxed;
4use std::time::{Duration, Instant};
5
6pub(crate) struct MetricsBatch {
7    /// Number of times the worker parked.
8    park_count: u64,
9
10    /// Number of times the worker parked and unparked.
11    park_unpark_count: u64,
12
13    /// Number of times the worker woke w/o doing work.
14    noop_count: u64,
15
16    /// Number of tasks stolen.
17    steal_count: u64,
18
19    /// Number of times tasks where stolen.
20    steal_operations: u64,
21
22    /// Number of tasks that were polled by the worker.
23    poll_count: u64,
24
25    /// Number of tasks polled when the worker entered park. This is used to
26    /// track the noop count.
27    poll_count_on_last_park: u64,
28
29    /// Number of tasks that were scheduled locally on this worker.
30    local_schedule_count: u64,
31
32    /// Number of tasks moved to the global queue to make space in the local
33    /// queue
34    overflow_count: u64,
35
36    /// The total busy duration in nanoseconds.
37    busy_duration_total: u64,
38
39    /// Instant at which work last resumed (continued after park).
40    processing_scheduled_tasks_started_at: Instant,
41
42    /// If `Some`, tracks poll times in nanoseconds
43    poll_timer: Option<PollTimer>,
44}
45
46struct PollTimer {
47    /// Histogram of poll counts within each band.
48    poll_counts: HistogramBatch,
49
50    /// Instant when the most recent task started polling.
51    poll_started_at: Instant,
52}
53
54impl MetricsBatch {
55    pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
56        let now = Instant::now();
57
58        MetricsBatch {
59            park_count: 0,
60            park_unpark_count: 0,
61            noop_count: 0,
62            steal_count: 0,
63            steal_operations: 0,
64            poll_count: 0,
65            poll_count_on_last_park: 0,
66            local_schedule_count: 0,
67            overflow_count: 0,
68            busy_duration_total: 0,
69            processing_scheduled_tasks_started_at: now,
70            poll_timer: worker_metrics
71                .poll_count_histogram
72                .as_ref()
73                .map(|worker_poll_counts| PollTimer {
74                    poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
75                    poll_started_at: now,
76                }),
77        }
78    }
79
80    pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
81        worker.mean_poll_time.store(mean_poll_time, Relaxed);
82        worker.park_count.store(self.park_count, Relaxed);
83        worker
84            .park_unpark_count
85            .store(self.park_unpark_count, Relaxed);
86        worker.noop_count.store(self.noop_count, Relaxed);
87        worker.steal_count.store(self.steal_count, Relaxed);
88        worker
89            .steal_operations
90            .store(self.steal_operations, Relaxed);
91        worker.poll_count.store(self.poll_count, Relaxed);
92
93        worker
94            .busy_duration_total
95            .store(self.busy_duration_total, Relaxed);
96
97        worker
98            .local_schedule_count
99            .store(self.local_schedule_count, Relaxed);
100        worker.overflow_count.store(self.overflow_count, Relaxed);
101
102        if let Some(poll_timer) = &self.poll_timer {
103            let dst = worker.poll_count_histogram.as_ref().unwrap();
104            poll_timer.poll_counts.submit(dst);
105        }
106    }
107
108    /// The worker is about to park.
109    pub(crate) fn about_to_park(&mut self) {
110        self.park_count += 1;
111        self.park_unpark_count += 1;
112
113        if self.poll_count_on_last_park == self.poll_count {
114            self.noop_count += 1;
115        } else {
116            self.poll_count_on_last_park = self.poll_count;
117        }
118    }
119
120    /// The worker was unparked.
121    pub(crate) fn unparked(&mut self) {
122        self.park_unpark_count += 1;
123    }
124
125    /// Start processing a batch of tasks
126    pub(crate) fn start_processing_scheduled_tasks(&mut self) {
127        self.processing_scheduled_tasks_started_at = Instant::now();
128    }
129
130    /// Stop processing a batch of tasks
131    pub(crate) fn end_processing_scheduled_tasks(&mut self) {
132        let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
133        self.busy_duration_total += duration_as_u64(busy_duration);
134    }
135
136    /// Start polling an individual task
137    pub(crate) fn start_poll(&mut self) {
138        self.poll_count += 1;
139
140        if let Some(poll_timer) = &mut self.poll_timer {
141            poll_timer.poll_started_at = Instant::now();
142        }
143    }
144
145    /// Stop polling an individual task
146    pub(crate) fn end_poll(&mut self) {
147        if let Some(poll_timer) = &mut self.poll_timer {
148            let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
149            poll_timer.poll_counts.measure(elapsed, 1);
150        }
151    }
152
153    pub(crate) fn inc_local_schedule_count(&mut self) {
154        self.local_schedule_count += 1;
155    }
156}
157
158cfg_rt_multi_thread! {
159    impl MetricsBatch {
160        pub(crate) fn incr_steal_count(&mut self, by: u16) {
161            self.steal_count += by as u64;
162        }
163
164        pub(crate) fn incr_steal_operations(&mut self) {
165            self.steal_operations += 1;
166        }
167
168        pub(crate) fn incr_overflow_count(&mut self) {
169            self.overflow_count += 1;
170        }
171    }
172}
173
174pub(crate) fn duration_as_u64(dur: Duration) -> u64 {
175    u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
176}