tokio_stream/wrappers/
interval.rs

1use crate::Stream;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use tokio::time::{Instant, Interval};
5
6/// A wrapper around [`Interval`] that implements [`Stream`].
7///
8/// # Example
9///
10/// ```
11/// use tokio::time::{Duration, Instant, interval};
12/// use tokio_stream::wrappers::IntervalStream;
13/// use tokio_stream::StreamExt;
14///
15/// # #[tokio::main(flavor = "current_thread")]
16/// # async fn main() {
17/// let start = Instant::now();
18/// let interval = interval(Duration::from_millis(10));
19/// let mut stream = IntervalStream::new(interval);
20/// for _ in 0..3 {
21///     if let Some(instant) = stream.next().await {
22///         println!("elapsed: {:.1?}", instant.duration_since(start));
23///     }
24/// }
25/// # }
26/// ```
27///
28/// [`Interval`]: struct@tokio::time::Interval
29/// [`Stream`]: trait@crate::Stream
30#[derive(Debug)]
31#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
32pub struct IntervalStream {
33    inner: Interval,
34}
35
36impl IntervalStream {
37    /// Create a new `IntervalStream`.
38    pub fn new(interval: Interval) -> Self {
39        Self { inner: interval }
40    }
41
42    /// Get back the inner `Interval`.
43    pub fn into_inner(self) -> Interval {
44        self.inner
45    }
46}
47
48impl Stream for IntervalStream {
49    type Item = Instant;
50
51    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
52        self.inner.poll_tick(cx).map(Some)
53    }
54
55    fn size_hint(&self) -> (usize, Option<usize>) {
56        (usize::MAX, None)
57    }
58}
59
60impl AsRef<Interval> for IntervalStream {
61    fn as_ref(&self) -> &Interval {
62        &self.inner
63    }
64}
65
66impl AsMut<Interval> for IntervalStream {
67    fn as_mut(&mut self) -> &mut Interval {
68        &mut self.inner
69    }
70}