tokio/runtime/local_runtime/runtime.rs
1#![allow(irrefutable_let_patterns)]
2
3use crate::runtime::blocking::BlockingPool;
4use crate::runtime::scheduler::CurrentThread;
5use crate::runtime::{context, Builder, EnterGuard, Handle, BOX_FUTURE_THRESHOLD};
6use crate::task::JoinHandle;
7
8use crate::util::trace::SpawnMeta;
9use std::future::Future;
10use std::marker::PhantomData;
11use std::mem;
12use std::time::Duration;
13
14/// A local Tokio runtime.
15///
16/// This runtime is capable of driving tasks which are not `Send + Sync` without the use of a
17/// `LocalSet`, and thus supports `spawn_local` without the need for a `LocalSet` context.
18///
19/// This runtime cannot be moved between threads or driven from different threads.
20///
21/// This runtime is incompatible with `LocalSet`. You should not attempt to drive a `LocalSet` within a
22/// `LocalRuntime`.
23///
24/// Currently, this runtime supports one flavor, which is internally identical to `current_thread`,
25/// save for the aforementioned differences related to `spawn_local`.
26///
27/// For more general information on how to use runtimes, see the [module] docs.
28///
29/// [runtime]: crate::runtime::Runtime
30/// [module]: crate::runtime
31#[derive(Debug)]
32#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
33pub struct LocalRuntime {
34 /// Task scheduler
35 scheduler: LocalRuntimeScheduler,
36
37 /// Handle to runtime, also contains driver handles
38 handle: Handle,
39
40 /// Blocking pool handle, used to signal shutdown
41 blocking_pool: BlockingPool,
42
43 /// Marker used to make this !Send and !Sync.
44 _phantom: PhantomData<*mut u8>,
45}
46
47/// The runtime scheduler is always a `current_thread` scheduler right now.
48#[derive(Debug)]
49pub(crate) enum LocalRuntimeScheduler {
50 /// Execute all tasks on the current-thread.
51 CurrentThread(CurrentThread),
52}
53
54impl LocalRuntime {
55 pub(crate) fn from_parts(
56 scheduler: LocalRuntimeScheduler,
57 handle: Handle,
58 blocking_pool: BlockingPool,
59 ) -> LocalRuntime {
60 LocalRuntime {
61 scheduler,
62 handle,
63 blocking_pool,
64 _phantom: Default::default(),
65 }
66 }
67
68 /// Creates a new local runtime instance with default configuration values.
69 ///
70 /// This results in the scheduler, I/O driver, and time driver being
71 /// initialized.
72 ///
73 /// When a more complex configuration is necessary, the [runtime builder] may be used.
74 ///
75 /// See [module level][mod] documentation for more details.
76 ///
77 /// # Examples
78 ///
79 /// Creating a new `LocalRuntime` with default configuration values.
80 ///
81 /// ```
82 /// use tokio::runtime::LocalRuntime;
83 ///
84 /// let rt = LocalRuntime::new()
85 /// .unwrap();
86 ///
87 /// // Use the runtime...
88 /// ```
89 ///
90 /// [mod]: crate::runtime
91 /// [runtime builder]: crate::runtime::Builder
92 pub fn new() -> std::io::Result<LocalRuntime> {
93 Builder::new_current_thread()
94 .enable_all()
95 .build_local(&Default::default())
96 }
97
98 /// Returns a handle to the runtime's spawner.
99 ///
100 /// The returned handle can be used to spawn tasks that run on this runtime, and can
101 /// be cloned to allow moving the `Handle` to other threads.
102 ///
103 /// As the handle can be sent to other threads, it can only be used to spawn tasks that are `Send`.
104 ///
105 /// Calling [`Handle::block_on`] on a handle to a `LocalRuntime` is error-prone.
106 /// Refer to the documentation of [`Handle::block_on`] for more.
107 ///
108 /// # Examples
109 ///
110 /// ```
111 /// use tokio::runtime::LocalRuntime;
112 ///
113 /// let rt = LocalRuntime::new()
114 /// .unwrap();
115 ///
116 /// let handle = rt.handle();
117 ///
118 /// // Use the handle...
119 /// ```
120 pub fn handle(&self) -> &Handle {
121 &self.handle
122 }
123
124 /// Spawns a task on the runtime.
125 ///
126 /// This is analogous to the [`spawn`] method on the standard [`Runtime`], but works even if the task is not thread-safe.
127 ///
128 /// [`spawn`]: crate::runtime::Runtime::spawn
129 /// [`Runtime`]: crate::runtime::Runtime
130 ///
131 /// # Examples
132 ///
133 /// ```
134 /// use tokio::runtime::LocalRuntime;
135 ///
136 /// # fn dox() {
137 /// // Create the runtime
138 /// let rt = LocalRuntime::new().unwrap();
139 ///
140 /// // Spawn a future onto the runtime
141 /// rt.spawn_local(async {
142 /// println!("now running on a worker thread");
143 /// });
144 /// # }
145 /// ```
146 #[track_caller]
147 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
148 where
149 F: Future + 'static,
150 F::Output: 'static,
151 {
152 let fut_size = std::mem::size_of::<F>();
153 let meta = SpawnMeta::new_unnamed(fut_size);
154
155 // safety: spawn_local can only be called from `LocalRuntime`, which this is
156 unsafe {
157 if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
158 self.handle.spawn_local_named(Box::pin(future), meta)
159 } else {
160 self.handle.spawn_local_named(future, meta)
161 }
162 }
163 }
164
165 /// Runs the provided function on a thread from a dedicated blocking thread pool.
166 ///
167 /// This function _will_ be run on another thread.
168 ///
169 /// See the [documentation in the non-local runtime][Runtime] for more
170 /// information.
171 ///
172 /// [Runtime]: crate::runtime::Runtime::spawn_blocking
173 ///
174 /// # Examples
175 ///
176 /// ```
177 /// use tokio::runtime::LocalRuntime;
178 ///
179 /// # fn dox() {
180 /// // Create the runtime
181 /// let rt = LocalRuntime::new().unwrap();
182 ///
183 /// // Spawn a blocking function onto the runtime
184 /// rt.spawn_blocking(|| {
185 /// println!("now running on a worker thread");
186 /// });
187 /// # }
188 /// ```
189 #[track_caller]
190 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
191 where
192 F: FnOnce() -> R + Send + 'static,
193 R: Send + 'static,
194 {
195 self.handle.spawn_blocking(func)
196 }
197
198 /// Runs a future to completion on the Tokio runtime. This is the
199 /// runtime's entry point.
200 ///
201 /// See the documentation for [the equivalent method on Runtime][Runtime]
202 /// for more information.
203 ///
204 /// [Runtime]: crate::runtime::Runtime::block_on
205 ///
206 /// # Examples
207 ///
208 /// ```no_run
209 /// use tokio::runtime::LocalRuntime;
210 ///
211 /// // Create the runtime
212 /// let rt = LocalRuntime::new().unwrap();
213 ///
214 /// // Execute the future, blocking the current thread until completion
215 /// rt.block_on(async {
216 /// println!("hello");
217 /// });
218 /// ```
219 #[track_caller]
220 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
221 let fut_size = mem::size_of::<F>();
222 let meta = SpawnMeta::new_unnamed(fut_size);
223
224 if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
225 self.block_on_inner(Box::pin(future), meta)
226 } else {
227 self.block_on_inner(future, meta)
228 }
229 }
230
231 #[track_caller]
232 fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
233 #[cfg(all(
234 tokio_unstable,
235 tokio_taskdump,
236 feature = "rt",
237 target_os = "linux",
238 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
239 ))]
240 let future = crate::runtime::task::trace::Trace::root(future);
241
242 #[cfg(all(tokio_unstable, feature = "tracing"))]
243 let future = crate::util::trace::task(
244 future,
245 "block_on",
246 _meta,
247 crate::runtime::task::Id::next().as_u64(),
248 );
249
250 let _enter = self.enter();
251
252 if let LocalRuntimeScheduler::CurrentThread(exec) = &self.scheduler {
253 exec.block_on(&self.handle.inner, future)
254 } else {
255 unreachable!("LocalRuntime only supports current_thread")
256 }
257 }
258
259 /// Enters the runtime context.
260 ///
261 /// This allows you to construct types that must have an executor
262 /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
263 /// also allow you to call methods such as [`tokio::spawn`].
264 ///
265 /// If this is a handle to a [`LocalRuntime`], and this function is being invoked from the same
266 /// thread that the runtime was created on, you will also be able to call
267 /// [`tokio::task::spawn_local`].
268 ///
269 /// [`Sleep`]: struct@crate::time::Sleep
270 /// [`TcpStream`]: struct@crate::net::TcpStream
271 /// [`tokio::spawn`]: fn@crate::spawn
272 /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
273 /// [`tokio::task::spawn_local`]: fn@crate::task::spawn_local
274 ///
275 /// # Example
276 ///
277 /// ```
278 /// use tokio::runtime::LocalRuntime;
279 /// use tokio::task::JoinHandle;
280 ///
281 /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
282 /// // Had we not used `rt.enter` below, this would panic.
283 /// tokio::spawn(async move {
284 /// println!("{}", msg);
285 /// })
286 /// }
287 ///
288 /// fn main() {
289 /// let rt = LocalRuntime::new().unwrap();
290 ///
291 /// let s = "Hello World!".to_string();
292 ///
293 /// // By entering the context, we tie `tokio::spawn` to this executor.
294 /// let _guard = rt.enter();
295 /// let handle = function_that_spawns(s);
296 ///
297 /// // Wait for the task before we end the test.
298 /// rt.block_on(handle).unwrap();
299 /// }
300 /// ```
301 pub fn enter(&self) -> EnterGuard<'_> {
302 self.handle.enter()
303 }
304
305 /// Shuts down the runtime, waiting for at most `duration` for all spawned
306 /// work to stop.
307 ///
308 /// Note that `spawn_blocking` tasks, and only `spawn_blocking` tasks, can get left behind if
309 /// the timeout expires.
310 ///
311 /// See the [struct level documentation](LocalRuntime#shutdown) for more details.
312 ///
313 /// # Examples
314 ///
315 /// ```
316 /// use tokio::runtime::LocalRuntime;
317 /// use tokio::task;
318 ///
319 /// use std::thread;
320 /// use std::time::Duration;
321 ///
322 /// fn main() {
323 /// let runtime = LocalRuntime::new().unwrap();
324 ///
325 /// runtime.block_on(async move {
326 /// task::spawn_blocking(move || {
327 /// thread::sleep(Duration::from_secs(10_000));
328 /// });
329 /// });
330 ///
331 /// runtime.shutdown_timeout(Duration::from_millis(100));
332 /// }
333 /// ```
334 pub fn shutdown_timeout(mut self, duration: Duration) {
335 // Wakeup and shutdown all the worker threads
336 self.handle.inner.shutdown();
337 self.blocking_pool.shutdown(Some(duration));
338 }
339
340 /// Shuts down the runtime, without waiting for any spawned work to stop.
341 ///
342 /// This can be useful if you want to drop a runtime from within another runtime.
343 /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
344 /// to complete, which would normally not be permitted within an asynchronous context.
345 /// By calling `shutdown_background()`, you can drop the runtime from such a context.
346 ///
347 /// Note however, that because we do not wait for any blocking tasks to complete, this
348 /// may result in a resource leak (in that any blocking tasks are still running until they
349 /// return. No other tasks will leak.
350 ///
351 /// See the [struct level documentation](LocalRuntime#shutdown) for more details.
352 ///
353 /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
354 ///
355 /// ```
356 /// use tokio::runtime::LocalRuntime;
357 ///
358 /// fn main() {
359 /// let runtime = LocalRuntime::new().unwrap();
360 ///
361 /// runtime.block_on(async move {
362 /// let inner_runtime = LocalRuntime::new().unwrap();
363 /// // ...
364 /// inner_runtime.shutdown_background();
365 /// });
366 /// }
367 /// ```
368 pub fn shutdown_background(self) {
369 self.shutdown_timeout(Duration::from_nanos(0));
370 }
371
372 /// Returns a view that lets you get information about how the runtime
373 /// is performing.
374 pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
375 self.handle.metrics()
376 }
377}
378
379#[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let
380impl Drop for LocalRuntime {
381 fn drop(&mut self) {
382 if let LocalRuntimeScheduler::CurrentThread(current_thread) = &mut self.scheduler {
383 // This ensures that tasks spawned on the current-thread
384 // runtime are dropped inside the runtime's context.
385 let _guard = context::try_set_current(&self.handle.inner);
386 current_thread.shutdown(&self.handle.inner);
387 } else {
388 unreachable!("LocalRuntime only supports current-thread")
389 }
390 }
391}
392
393impl std::panic::UnwindSafe for LocalRuntime {}
394
395impl std::panic::RefUnwindSafe for LocalRuntime {}