tokio/runtime/scheduler/multi_thread_alt/
queue.rs1use crate::loom::cell::UnsafeCell;
4use crate::loom::sync::Arc;
5use crate::runtime::scheduler::multi_thread_alt::{Overflow, Stats};
6use crate::runtime::task;
7
8use std::mem::{self, MaybeUninit};
9use std::ptr;
10use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
11
12cfg_has_atomic_u64! {
16 type UnsignedShort = u32;
17 type UnsignedLong = u64;
18 type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32;
19 type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64;
20}
21cfg_not_has_atomic_u64! {
22 type UnsignedShort = u16;
23 type UnsignedLong = u32;
24 type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16;
25 type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32;
26}
27
28pub(crate) struct Local<T: 'static> {
30 inner: Arc<Inner<T>>,
31}
32
33pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>);
35
36#[repr(align(128))]
37pub(crate) struct Inner<T: 'static> {
38 head: AtomicUnsignedLong,
52
53 tail: AtomicUnsignedShort,
55
56 buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>,
58
59 mask: usize,
60}
61
62unsafe impl<T> Send for Inner<T> {}
63unsafe impl<T> Sync for Inner<T> {}
64
65pub(crate) fn local<T: 'static>(capacity: usize) -> (Steal<T>, Local<T>) {
67 assert!(capacity <= 4096);
68 assert!(capacity >= 1);
69
70 let mut buffer = Vec::with_capacity(capacity);
71
72 for _ in 0..capacity {
73 buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
74 }
75
76 let inner = Arc::new(Inner {
77 head: AtomicUnsignedLong::new(0),
78 tail: AtomicUnsignedShort::new(0),
79 buffer: buffer.into_boxed_slice(),
80 mask: capacity - 1,
81 });
82
83 let local = Local {
84 inner: inner.clone(),
85 };
86
87 let remote = Steal(inner);
88
89 (remote, local)
90}
91
92impl<T> Local<T> {
93 pub(crate) fn remaining_slots(&self) -> usize {
95 self.inner.remaining_slots()
96 }
97
98 pub(crate) fn max_capacity(&self) -> usize {
99 self.inner.buffer.len()
100 }
101
102 pub(crate) fn is_empty(&self) -> bool {
104 self.inner.is_empty()
105 }
106
107 pub(crate) fn can_steal(&self) -> bool {
108 self.remaining_slots() >= self.max_capacity() - self.max_capacity() / 2
109 }
110
111 pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) {
118 let len = tasks.len();
119 assert!(len <= self.inner.buffer.len());
120
121 if len == 0 {
122 return;
124 }
125
126 let head = self.inner.head.load(Acquire);
127 let (steal, real) = unpack(head);
128
129 let mut tail = unsafe { self.inner.tail.unsync_load() };
131
132 if tail.wrapping_sub(steal) <= (self.inner.buffer.len() - len) as UnsignedShort {
133 } else {
137 panic!(
138 "not enough capacity; len={}; tail={}; steal={}; real={}",
139 len, tail, steal, real
140 );
141 }
142
143 for task in tasks {
144 let idx = tail as usize & self.inner.mask;
145
146 self.inner.buffer[idx].with_mut(|ptr| {
147 unsafe {
153 ptr::write((*ptr).as_mut_ptr(), task);
154 }
155 });
156
157 tail = tail.wrapping_add(1);
158 }
159
160 self.inner.tail.store(tail, Release);
161 }
162
163 pub(crate) fn push_back_or_overflow<O: Overflow<T>>(
170 &mut self,
171 mut task: task::Notified<T>,
172 overflow: &O,
173 stats: &mut Stats,
174 ) {
175 let tail = loop {
176 let head = self.inner.head.load(Acquire);
177 let (steal, real) = unpack(head);
178
179 let tail = unsafe { self.inner.tail.unsync_load() };
181
182 if tail.wrapping_sub(steal) < self.inner.buffer.len() as UnsignedShort {
183 break tail;
185 } else if steal != real {
186 super::counters::inc_num_overflows();
187 overflow.push(task);
190 return;
191 } else {
192 super::counters::inc_num_overflows();
193 match self.push_overflow(task, real, tail, overflow, stats) {
196 Ok(_) => return,
197 Err(v) => {
199 task = v;
200 }
201 }
202 }
203 };
204
205 self.push_back_finish(task, tail);
206 }
207
208 fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) {
210 let idx = tail as usize & self.inner.mask;
212
213 self.inner.buffer[idx].with_mut(|ptr| {
214 unsafe {
220 ptr::write((*ptr).as_mut_ptr(), task);
221 }
222 });
223
224 self.inner.tail.store(tail.wrapping_add(1), Release);
227 }
228
229 #[inline(never)]
236 fn push_overflow<O: Overflow<T>>(
237 &mut self,
238 task: task::Notified<T>,
239 head: UnsignedShort,
240 tail: UnsignedShort,
241 overflow: &O,
242 stats: &mut Stats,
243 ) -> Result<(), task::Notified<T>> {
244 let num_tasks_taken: UnsignedShort = (self.inner.buffer.len() / 2) as UnsignedShort;
249
250 assert_eq!(
251 tail.wrapping_sub(head) as usize,
252 self.inner.buffer.len(),
253 "queue is not full; tail = {}; head = {}",
254 tail,
255 head
256 );
257
258 let prev = pack(head, head);
259
260 if self
271 .inner
272 .head
273 .compare_exchange(
274 prev,
275 pack(
276 head.wrapping_add(num_tasks_taken),
277 head.wrapping_add(num_tasks_taken),
278 ),
279 Release,
280 Relaxed,
281 )
282 .is_err()
283 {
284 return Err(task);
288 }
289
290 struct BatchTaskIter<'a, T: 'static> {
292 buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>],
293 mask: usize,
294 head: UnsignedLong,
295 i: UnsignedLong,
296 num: UnsignedShort,
297 }
298 impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> {
299 type Item = task::Notified<T>;
300
301 #[inline]
302 fn next(&mut self) -> Option<task::Notified<T>> {
303 if self.i == UnsignedLong::from(self.num) {
304 None
305 } else {
306 let i_idx = self.i.wrapping_add(self.head) as usize & self.mask;
307 let slot = &self.buffer[i_idx];
308
309 let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
312
313 self.i += 1;
314 Some(task)
315 }
316 }
317 }
318
319 let batch_iter = BatchTaskIter {
322 buffer: &self.inner.buffer,
323 mask: self.inner.mask,
324 head: head as UnsignedLong,
325 i: 0,
326 num: num_tasks_taken,
327 };
328 overflow.push_batch(batch_iter.chain(std::iter::once(task)));
329
330 stats.incr_overflow_count();
332
333 Ok(())
334 }
335
336 pub(crate) fn pop(&mut self) -> Option<task::Notified<T>> {
338 let mut head = self.inner.head.load(Acquire);
339
340 let idx = loop {
341 let (steal, real) = unpack(head);
342
343 let tail = unsafe { self.inner.tail.unsync_load() };
345
346 if real == tail {
347 return None;
349 }
350
351 let next_real = real.wrapping_add(1);
352
353 let next = if steal == real {
356 pack(next_real, next_real)
357 } else {
358 assert_ne!(steal, next_real);
359 pack(steal, next_real)
360 };
361
362 let res = self
364 .inner
365 .head
366 .compare_exchange(head, next, AcqRel, Acquire);
367
368 match res {
369 Ok(_) => break real as usize & self.inner.mask,
370 Err(actual) => head = actual,
371 }
372 };
373
374 Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
375 }
376}
377
378impl<T> Steal<T> {
379 pub(crate) fn is_empty(&self) -> bool {
380 self.0.is_empty()
381 }
382
383 pub(crate) fn steal_into(
385 &self,
386 dst: &mut Local<T>,
387 dst_stats: &mut Stats,
388 ) -> Option<task::Notified<T>> {
389 let dst_tail = unsafe { dst.inner.tail.unsync_load() };
392
393 let (steal, _) = unpack(dst.inner.head.load(Acquire));
397
398 if dst_tail.wrapping_sub(steal) > self.0.buffer.len() as UnsignedShort / 2 {
399 return None;
402 }
403
404 let mut n = self.steal_into2(dst, dst_tail);
407
408 if n == 0 {
409 return None;
411 }
412
413 super::counters::inc_num_steals();
414
415 dst_stats.incr_steal_count(n as u16);
416 dst_stats.incr_steal_operations();
417
418 n -= 1;
420
421 let ret_pos = dst_tail.wrapping_add(n);
422 let ret_idx = ret_pos as usize & dst.inner.mask;
423
424 let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
427
428 if n == 0 {
429 return Some(ret);
431 }
432
433 dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
435
436 Some(ret)
437 }
438
439 fn steal_into2(&self, dst: &mut Local<T>, dst_tail: UnsignedShort) -> UnsignedShort {
442 let mut prev_packed = self.0.head.load(Acquire);
443 let mut next_packed;
444
445 let n = loop {
446 let (src_head_steal, src_head_real) = unpack(prev_packed);
447 let src_tail = self.0.tail.load(Acquire);
448
449 if src_head_steal != src_head_real {
452 return 0;
453 }
454
455 let n = src_tail.wrapping_sub(src_head_real);
457 let n = n - n / 2;
458
459 if n == 0 {
460 return 0;
462 }
463
464 let steal_to = src_head_real.wrapping_add(n);
466 assert_ne!(src_head_steal, steal_to);
467 next_packed = pack(src_head_steal, steal_to);
468
469 let res = self
473 .0
474 .head
475 .compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
476
477 match res {
478 Ok(_) => break n,
479 Err(actual) => prev_packed = actual,
480 }
481 };
482
483 debug_assert!(
484 n <= (self.0.buffer.len() - self.0.buffer.len() / 2) as UnsignedShort,
485 "actual = {}",
486 n
487 );
488
489 let (first, _) = unpack(next_packed);
490
491 for i in 0..n {
493 let src_pos = first.wrapping_add(i);
495 let dst_pos = dst_tail.wrapping_add(i);
496
497 let src_idx = src_pos as usize & self.0.mask;
499 let dst_idx = dst_pos as usize & self.0.mask;
500
501 let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
505
506 dst.inner.buffer[dst_idx]
511 .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
512 }
513
514 let mut prev_packed = next_packed;
515
516 loop {
519 let head = unpack(prev_packed).1;
520 next_packed = pack(head, head);
521
522 let res = self
523 .0
524 .head
525 .compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
526
527 match res {
528 Ok(_) => return n,
529 Err(actual) => {
530 let (actual_steal, actual_real) = unpack(actual);
531
532 assert_ne!(actual_steal, actual_real);
533
534 prev_packed = actual;
535 }
536 }
537 }
538 }
539}
540
541cfg_unstable_metrics! {
542 impl<T> Steal<T> {
543 pub(crate) fn len(&self) -> usize {
544 self.0.len() as _
545 }
546 }
547}
548
549impl<T> Clone for Steal<T> {
550 fn clone(&self) -> Steal<T> {
551 Steal(self.0.clone())
552 }
553}
554
555impl<T> Drop for Local<T> {
556 fn drop(&mut self) {
557 if !std::thread::panicking() {
558 assert!(self.pop().is_none(), "queue not empty");
559 }
560 }
561}
562
563impl<T> Inner<T> {
564 fn remaining_slots(&self) -> usize {
565 let (steal, _) = unpack(self.head.load(Acquire));
566 let tail = self.tail.load(Acquire);
567
568 self.buffer.len() - (tail.wrapping_sub(steal) as usize)
569 }
570
571 fn len(&self) -> UnsignedShort {
572 let (_, head) = unpack(self.head.load(Acquire));
573 let tail = self.tail.load(Acquire);
574
575 tail.wrapping_sub(head)
576 }
577
578 fn is_empty(&self) -> bool {
579 self.len() == 0
580 }
581}
582
583fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) {
586 let real = n & UnsignedShort::MAX as UnsignedLong;
587 let steal = n >> (mem::size_of::<UnsignedShort>() * 8);
588
589 (steal as UnsignedShort, real as UnsignedShort)
590}
591
592fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong {
594 (real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::<UnsignedShort>() * 8))
595}