tokio/runtime/scheduler/multi_thread_alt/
stats.rs

1use crate::runtime::{Config, MetricsBatch, WorkerMetrics};
2
3use std::cmp;
4use std::time::{Duration, Instant};
5
6/// Per-worker statistics. This is used for both tuning the scheduler and
7/// reporting runtime-level metrics/stats.
8pub(crate) struct Stats {
9    /// The metrics batch used to report runtime-level metrics/stats to the
10    /// user.
11    batch: MetricsBatch,
12
13    /// Exponentially-weighted moving average of time spent polling scheduled a
14    /// task.
15    ///
16    /// Tracked in nanoseconds, stored as a `f64` since that is what we use with
17    /// the EWMA calculations
18    task_poll_time_ewma: f64,
19}
20
21/// Transient state
22pub(crate) struct Ephemeral {
23    /// Instant at which work last resumed (continued after park).
24    ///
25    /// This duplicates the value stored in `MetricsBatch`. We will unify
26    /// `Stats` and `MetricsBatch` when we stabilize metrics.
27    processing_scheduled_tasks_started_at: Instant,
28
29    /// Number of tasks polled in the batch of scheduled tasks
30    tasks_polled_in_batch: usize,
31
32    /// Used to ensure calls to start / stop batch are paired
33    #[cfg(debug_assertions)]
34    batch_started: bool,
35}
36
37impl Ephemeral {
38    pub(crate) fn new() -> Ephemeral {
39        Ephemeral {
40            processing_scheduled_tasks_started_at: Instant::now(),
41            tasks_polled_in_batch: 0,
42            #[cfg(debug_assertions)]
43            batch_started: false,
44        }
45    }
46}
47
48/// How to weigh each individual poll time, value is plucked from thin air.
49const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1;
50
51/// Ideally, we wouldn't go above this, value is plucked from thin air.
52const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64;
53
54/// Max value for the global queue interval. This is 2x the previous default
55const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127;
56
57/// This is the previous default
58const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61;
59
60impl Stats {
61    pub(crate) const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 =
62        TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL;
63
64    pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats {
65        // Seed the value with what we hope to see.
66        let task_poll_time_ewma =
67            TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64;
68
69        Stats {
70            batch: MetricsBatch::new(worker_metrics),
71            task_poll_time_ewma,
72        }
73    }
74
75    pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 {
76        // If an interval is explicitly set, don't tune.
77        if let Some(configured) = config.global_queue_interval {
78            return configured;
79        }
80
81        // As of Rust 1.45, casts from f64 -> u32 are saturating, which is fine here.
82        let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32;
83
84        cmp::max(
85            // If we are using self-tuning, we don't want to return less than 2 as that would result in the
86            // global queue always getting checked first.
87            2,
88            cmp::min(
89                MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL,
90                tasks_per_interval,
91            ),
92        )
93    }
94
95    pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
96        self.batch.submit(to, self.task_poll_time_ewma as u64);
97    }
98
99    pub(crate) fn about_to_park(&mut self) {
100        self.batch.about_to_park();
101    }
102
103    pub(crate) fn unparked(&mut self) {
104        self.batch.unparked();
105    }
106
107    pub(crate) fn inc_local_schedule_count(&mut self) {
108        self.batch.inc_local_schedule_count();
109    }
110
111    pub(crate) fn start_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
112        self.batch.start_processing_scheduled_tasks();
113
114        #[cfg(debug_assertions)]
115        {
116            debug_assert!(!ephemeral.batch_started);
117            ephemeral.batch_started = true;
118        }
119
120        ephemeral.processing_scheduled_tasks_started_at = Instant::now();
121        ephemeral.tasks_polled_in_batch = 0;
122    }
123
124    pub(crate) fn end_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
125        self.batch.end_processing_scheduled_tasks();
126
127        #[cfg(debug_assertions)]
128        {
129            debug_assert!(ephemeral.batch_started);
130            ephemeral.batch_started = false;
131        }
132
133        // Update the EWMA task poll time
134        if ephemeral.tasks_polled_in_batch > 0 {
135            let now = Instant::now();
136
137            // If we "overflow" this conversion, we have bigger problems than
138            // slightly off stats.
139            let elapsed = (now - ephemeral.processing_scheduled_tasks_started_at).as_nanos() as f64;
140            let num_polls = ephemeral.tasks_polled_in_batch as f64;
141
142            // Calculate the mean poll duration for a single task in the batch
143            let mean_poll_duration = elapsed / num_polls;
144
145            // Compute the alpha weighted by the number of tasks polled this batch.
146            let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls);
147
148            // Now compute the new weighted average task poll time.
149            self.task_poll_time_ewma = weighted_alpha * mean_poll_duration
150                + (1.0 - weighted_alpha) * self.task_poll_time_ewma;
151        }
152    }
153
154    pub(crate) fn start_poll(&mut self, ephemeral: &mut Ephemeral) {
155        self.batch.start_poll();
156
157        ephemeral.tasks_polled_in_batch += 1;
158    }
159
160    pub(crate) fn end_poll(&mut self) {
161        self.batch.end_poll();
162    }
163
164    pub(crate) fn incr_steal_count(&mut self, by: u16) {
165        self.batch.incr_steal_count(by);
166    }
167
168    pub(crate) fn incr_steal_operations(&mut self) {
169        self.batch.incr_steal_operations();
170    }
171
172    pub(crate) fn incr_overflow_count(&mut self) {
173        self.batch.incr_overflow_count();
174    }
175}