tokio/runtime/scheduler/multi_thread_alt/
stats.rs1use crate::runtime::{Config, MetricsBatch, WorkerMetrics};
2
3use std::cmp;
4use std::time::{Duration, Instant};
5
6pub(crate) struct Stats {
9 batch: MetricsBatch,
12
13 task_poll_time_ewma: f64,
19}
20
21pub(crate) struct Ephemeral {
23 processing_scheduled_tasks_started_at: Instant,
28
29 tasks_polled_in_batch: usize,
31
32 #[cfg(debug_assertions)]
34 batch_started: bool,
35}
36
37impl Ephemeral {
38 pub(crate) fn new() -> Ephemeral {
39 Ephemeral {
40 processing_scheduled_tasks_started_at: Instant::now(),
41 tasks_polled_in_batch: 0,
42 #[cfg(debug_assertions)]
43 batch_started: false,
44 }
45 }
46}
47
48const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1;
50
51const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64;
53
54const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127;
56
57const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61;
59
60impl Stats {
61 pub(crate) const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 =
62 TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL;
63
64 pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats {
65 let task_poll_time_ewma =
67 TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64;
68
69 Stats {
70 batch: MetricsBatch::new(worker_metrics),
71 task_poll_time_ewma,
72 }
73 }
74
75 pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 {
76 if let Some(configured) = config.global_queue_interval {
78 return configured;
79 }
80
81 let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32;
83
84 cmp::max(
85 2,
88 cmp::min(
89 MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL,
90 tasks_per_interval,
91 ),
92 )
93 }
94
95 pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
96 self.batch.submit(to, self.task_poll_time_ewma as u64);
97 }
98
99 pub(crate) fn about_to_park(&mut self) {
100 self.batch.about_to_park();
101 }
102
103 pub(crate) fn unparked(&mut self) {
104 self.batch.unparked();
105 }
106
107 pub(crate) fn inc_local_schedule_count(&mut self) {
108 self.batch.inc_local_schedule_count();
109 }
110
111 pub(crate) fn start_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
112 self.batch.start_processing_scheduled_tasks();
113
114 #[cfg(debug_assertions)]
115 {
116 debug_assert!(!ephemeral.batch_started);
117 ephemeral.batch_started = true;
118 }
119
120 ephemeral.processing_scheduled_tasks_started_at = Instant::now();
121 ephemeral.tasks_polled_in_batch = 0;
122 }
123
124 pub(crate) fn end_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
125 self.batch.end_processing_scheduled_tasks();
126
127 #[cfg(debug_assertions)]
128 {
129 debug_assert!(ephemeral.batch_started);
130 ephemeral.batch_started = false;
131 }
132
133 if ephemeral.tasks_polled_in_batch > 0 {
135 let now = Instant::now();
136
137 let elapsed = (now - ephemeral.processing_scheduled_tasks_started_at).as_nanos() as f64;
140 let num_polls = ephemeral.tasks_polled_in_batch as f64;
141
142 let mean_poll_duration = elapsed / num_polls;
144
145 let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls);
147
148 self.task_poll_time_ewma = weighted_alpha * mean_poll_duration
150 + (1.0 - weighted_alpha) * self.task_poll_time_ewma;
151 }
152 }
153
154 pub(crate) fn start_poll(&mut self, ephemeral: &mut Ephemeral) {
155 self.batch.start_poll();
156
157 ephemeral.tasks_polled_in_batch += 1;
158 }
159
160 pub(crate) fn end_poll(&mut self) {
161 self.batch.end_poll();
162 }
163
164 pub(crate) fn incr_steal_count(&mut self, by: u16) {
165 self.batch.incr_steal_count(by);
166 }
167
168 pub(crate) fn incr_steal_operations(&mut self) {
169 self.batch.incr_steal_operations();
170 }
171
172 pub(crate) fn incr_overflow_count(&mut self) {
173 self.batch.incr_overflow_count();
174 }
175}