tokio/runtime/local_runtime/
runtime.rs

1#![allow(irrefutable_let_patterns)]
2
3use crate::runtime::blocking::BlockingPool;
4use crate::runtime::scheduler::CurrentThread;
5use crate::runtime::{context, Builder, EnterGuard, Handle, BOX_FUTURE_THRESHOLD};
6use crate::task::JoinHandle;
7
8use crate::util::trace::SpawnMeta;
9use std::future::Future;
10use std::marker::PhantomData;
11use std::mem;
12use std::time::Duration;
13
14/// A local Tokio runtime.
15///
16/// This runtime is capable of driving tasks which are not `Send + Sync` without the use of a
17/// `LocalSet`, and thus supports `spawn_local` without the need for a `LocalSet` context.
18///
19/// This runtime cannot be moved between threads or driven from different threads.
20///
21/// This runtime is incompatible with `LocalSet`. You should not attempt to drive a `LocalSet` within a
22/// `LocalRuntime`.
23///
24/// Currently, this runtime supports one flavor, which is internally identical to `current_thread`,
25/// save for the aforementioned differences related to `spawn_local`.
26///
27/// For more general information on how to use runtimes, see the [module] docs.
28///
29/// [runtime]: crate::runtime::Runtime
30/// [module]: crate::runtime
31#[derive(Debug)]
32#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
33pub struct LocalRuntime {
34    /// Task scheduler
35    scheduler: LocalRuntimeScheduler,
36
37    /// Handle to runtime, also contains driver handles
38    handle: Handle,
39
40    /// Blocking pool handle, used to signal shutdown
41    blocking_pool: BlockingPool,
42
43    /// Marker used to make this !Send and !Sync.
44    _phantom: PhantomData<*mut u8>,
45}
46
47/// The runtime scheduler is always a `current_thread` scheduler right now.
48#[derive(Debug)]
49pub(crate) enum LocalRuntimeScheduler {
50    /// Execute all tasks on the current-thread.
51    CurrentThread(CurrentThread),
52}
53
54impl LocalRuntime {
55    pub(crate) fn from_parts(
56        scheduler: LocalRuntimeScheduler,
57        handle: Handle,
58        blocking_pool: BlockingPool,
59    ) -> LocalRuntime {
60        LocalRuntime {
61            scheduler,
62            handle,
63            blocking_pool,
64            _phantom: Default::default(),
65        }
66    }
67
68    /// Creates a new local runtime instance with default configuration values.
69    ///
70    /// This results in the scheduler, I/O driver, and time driver being
71    /// initialized.
72    ///
73    /// When a more complex configuration is necessary, the [runtime builder] may be used.
74    ///
75    /// See [module level][mod] documentation for more details.
76    ///
77    /// # Examples
78    ///
79    /// Creating a new `LocalRuntime` with default configuration values.
80    ///
81    /// ```
82    /// use tokio::runtime::LocalRuntime;
83    ///
84    /// let rt = LocalRuntime::new()
85    ///     .unwrap();
86    ///
87    /// // Use the runtime...
88    /// ```
89    ///
90    /// [mod]: crate::runtime
91    /// [runtime builder]: crate::runtime::Builder
92    pub fn new() -> std::io::Result<LocalRuntime> {
93        Builder::new_current_thread()
94            .enable_all()
95            .build_local(&Default::default())
96    }
97
98    /// Returns a handle to the runtime's spawner.
99    ///
100    /// The returned handle can be used to spawn tasks that run on this runtime, and can
101    /// be cloned to allow moving the `Handle` to other threads.
102    ///
103    /// As the handle can be sent to other threads, it can only be used to spawn tasks that are `Send`.
104    ///
105    /// Calling [`Handle::block_on`] on a handle to a `LocalRuntime` is error-prone.
106    /// Refer to the documentation of [`Handle::block_on`] for more.
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// use tokio::runtime::LocalRuntime;
112    ///
113    /// let rt = LocalRuntime::new()
114    ///     .unwrap();
115    ///
116    /// let handle = rt.handle();
117    ///
118    /// // Use the handle...
119    /// ```
120    pub fn handle(&self) -> &Handle {
121        &self.handle
122    }
123
124    /// Spawns a task on the runtime.
125    ///
126    /// This is analogous to the [`spawn`] method on the standard [`Runtime`], but works even if the task is not thread-safe.
127    ///
128    /// [`spawn`]: crate::runtime::Runtime::spawn
129    /// [`Runtime`]: crate::runtime::Runtime
130    ///
131    /// # Examples
132    ///
133    /// ```
134    /// use tokio::runtime::LocalRuntime;
135    ///
136    /// # fn dox() {
137    /// // Create the runtime
138    /// let rt = LocalRuntime::new().unwrap();
139    ///
140    /// // Spawn a future onto the runtime
141    /// rt.spawn_local(async {
142    ///     println!("now running on a worker thread");
143    /// });
144    /// # }
145    /// ```
146    #[track_caller]
147    pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
148    where
149        F: Future + 'static,
150        F::Output: 'static,
151    {
152        let fut_size = std::mem::size_of::<F>();
153        let meta = SpawnMeta::new_unnamed(fut_size);
154
155        // safety: spawn_local can only be called from `LocalRuntime`, which this is
156        unsafe {
157            if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
158                self.handle.spawn_local_named(Box::pin(future), meta)
159            } else {
160                self.handle.spawn_local_named(future, meta)
161            }
162        }
163    }
164
165    /// Runs the provided function on a thread from a dedicated blocking thread pool.
166    ///
167    /// This function _will_ be run on another thread.
168    ///
169    /// See the [documentation in the non-local runtime][Runtime] for more
170    /// information.
171    ///
172    /// [Runtime]: crate::runtime::Runtime::spawn_blocking
173    ///
174    /// # Examples
175    ///
176    /// ```
177    /// use tokio::runtime::LocalRuntime;
178    ///
179    /// # fn dox() {
180    /// // Create the runtime
181    /// let rt = LocalRuntime::new().unwrap();
182    ///
183    /// // Spawn a blocking function onto the runtime
184    /// rt.spawn_blocking(|| {
185    ///     println!("now running on a worker thread");
186    /// });
187    /// # }
188    /// ```
189    #[track_caller]
190    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
191    where
192        F: FnOnce() -> R + Send + 'static,
193        R: Send + 'static,
194    {
195        self.handle.spawn_blocking(func)
196    }
197
198    /// Runs a future to completion on the Tokio runtime. This is the
199    /// runtime's entry point.
200    ///
201    /// See the documentation for [the equivalent method on Runtime][Runtime]
202    /// for more information.
203    ///
204    /// [Runtime]: crate::runtime::Runtime::block_on
205    ///
206    /// # Examples
207    ///
208    /// ```no_run
209    /// use tokio::runtime::LocalRuntime;
210    ///
211    /// // Create the runtime
212    /// let rt  = LocalRuntime::new().unwrap();
213    ///
214    /// // Execute the future, blocking the current thread until completion
215    /// rt.block_on(async {
216    ///     println!("hello");
217    /// });
218    /// ```
219    #[track_caller]
220    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
221        let fut_size = mem::size_of::<F>();
222        let meta = SpawnMeta::new_unnamed(fut_size);
223
224        if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
225            self.block_on_inner(Box::pin(future), meta)
226        } else {
227            self.block_on_inner(future, meta)
228        }
229    }
230
231    #[track_caller]
232    fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
233        #[cfg(all(
234            tokio_unstable,
235            tokio_taskdump,
236            feature = "rt",
237            target_os = "linux",
238            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
239        ))]
240        let future = crate::runtime::task::trace::Trace::root(future);
241
242        #[cfg(all(tokio_unstable, feature = "tracing"))]
243        let future = crate::util::trace::task(
244            future,
245            "block_on",
246            _meta,
247            crate::runtime::task::Id::next().as_u64(),
248        );
249
250        let _enter = self.enter();
251
252        if let LocalRuntimeScheduler::CurrentThread(exec) = &self.scheduler {
253            exec.block_on(&self.handle.inner, future)
254        } else {
255            unreachable!("LocalRuntime only supports current_thread")
256        }
257    }
258
259    /// Enters the runtime context.
260    ///
261    /// This allows you to construct types that must have an executor
262    /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
263    /// also allow you to call methods such as [`tokio::spawn`].
264    ///
265    /// If this is a handle to a [`LocalRuntime`], and this function is being invoked from the same
266    /// thread that the runtime was created on, you will also be able to call
267    /// [`tokio::task::spawn_local`].
268    ///
269    /// [`Sleep`]: struct@crate::time::Sleep
270    /// [`TcpStream`]: struct@crate::net::TcpStream
271    /// [`tokio::spawn`]: fn@crate::spawn
272    /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
273    /// [`tokio::task::spawn_local`]: fn@crate::task::spawn_local
274    ///
275    /// # Example
276    ///
277    /// ```
278    /// use tokio::runtime::LocalRuntime;
279    /// use tokio::task::JoinHandle;
280    ///
281    /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
282    ///     // Had we not used `rt.enter` below, this would panic.
283    ///     tokio::spawn(async move {
284    ///         println!("{}", msg);
285    ///     })
286    /// }
287    ///
288    /// fn main() {
289    ///     let rt = LocalRuntime::new().unwrap();
290    ///
291    ///     let s = "Hello World!".to_string();
292    ///
293    ///     // By entering the context, we tie `tokio::spawn` to this executor.
294    ///     let _guard = rt.enter();
295    ///     let handle = function_that_spawns(s);
296    ///
297    ///     // Wait for the task before we end the test.
298    ///     rt.block_on(handle).unwrap();
299    /// }
300    /// ```
301    pub fn enter(&self) -> EnterGuard<'_> {
302        self.handle.enter()
303    }
304
305    /// Shuts down the runtime, waiting for at most `duration` for all spawned
306    /// work to stop.
307    ///
308    /// Note that `spawn_blocking` tasks, and only `spawn_blocking` tasks, can get left behind if
309    /// the timeout expires.
310    ///
311    /// See the [struct level documentation](LocalRuntime#shutdown) for more details.
312    ///
313    /// # Examples
314    ///
315    /// ```
316    /// use tokio::runtime::LocalRuntime;
317    /// use tokio::task;
318    ///
319    /// use std::thread;
320    /// use std::time::Duration;
321    ///
322    /// fn main() {
323    ///    let runtime = LocalRuntime::new().unwrap();
324    ///
325    ///    runtime.block_on(async move {
326    ///        task::spawn_blocking(move || {
327    ///            thread::sleep(Duration::from_secs(10_000));
328    ///        });
329    ///    });
330    ///
331    ///    runtime.shutdown_timeout(Duration::from_millis(100));
332    /// }
333    /// ```
334    pub fn shutdown_timeout(mut self, duration: Duration) {
335        // Wakeup and shutdown all the worker threads
336        self.handle.inner.shutdown();
337        self.blocking_pool.shutdown(Some(duration));
338    }
339
340    /// Shuts down the runtime, without waiting for any spawned work to stop.
341    ///
342    /// This can be useful if you want to drop a runtime from within another runtime.
343    /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
344    /// to complete, which would normally not be permitted within an asynchronous context.
345    /// By calling `shutdown_background()`, you can drop the runtime from such a context.
346    ///
347    /// Note however, that because we do not wait for any blocking tasks to complete, this
348    /// may result in a resource leak (in that any blocking tasks are still running until they
349    /// return. No other tasks will leak.
350    ///
351    /// See the [struct level documentation](LocalRuntime#shutdown) for more details.
352    ///
353    /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
354    ///
355    /// ```
356    /// use tokio::runtime::LocalRuntime;
357    ///
358    /// fn main() {
359    ///    let runtime = LocalRuntime::new().unwrap();
360    ///
361    ///    runtime.block_on(async move {
362    ///        let inner_runtime = LocalRuntime::new().unwrap();
363    ///        // ...
364    ///        inner_runtime.shutdown_background();
365    ///    });
366    /// }
367    /// ```
368    pub fn shutdown_background(self) {
369        self.shutdown_timeout(Duration::from_nanos(0));
370    }
371
372    /// Returns a view that lets you get information about how the runtime
373    /// is performing.
374    pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
375        self.handle.metrics()
376    }
377}
378
379#[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let
380impl Drop for LocalRuntime {
381    fn drop(&mut self) {
382        if let LocalRuntimeScheduler::CurrentThread(current_thread) = &mut self.scheduler {
383            // This ensures that tasks spawned on the current-thread
384            // runtime are dropped inside the runtime's context.
385            let _guard = context::try_set_current(&self.handle.inner);
386            current_thread.shutdown(&self.handle.inner);
387        } else {
388            unreachable!("LocalRuntime only supports current-thread")
389        }
390    }
391}
392
393impl std::panic::UnwindSafe for LocalRuntime {}
394
395impl std::panic::RefUnwindSafe for LocalRuntime {}