Async Rust, Part Two: Tasks

2024 October 23

In the introduction we said that async/await was about futures and tasks. Part One was all about futures, and now we can talk about tasks. Luckily, we've already seen one, though we didn't call it that. The last version of our main loop in Part One looked like this:

let mut joined_future = Box::pin(future::join_all(futures));
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
while joined_future.as_mut().poll(&mut context).is_pending() {

}

That joined_future is the simplest possible example of a task. It's a top-level future that's owned and polled by the main loop. Here we only have one task, but there's nothing stopping us from having more than one. And if we had a collection of tasks, we could even add to that collection at runtime.

That's what tokio::task::spawn does. We can rewrite our original Tokio example using spawn instead of join_all:

#[tokio::main]
async fn main() {
let mut task_handles = Vec::new();
for n in 1..=10 {
task_handles.push(tokio::task::spawn(foo(n)));
}
for handle in task_handles {
handle.await.unwrap();
}
}

foo is still an async fn, but otherwise this is very similar to our original thread::spawn example. Like threads, but unlike ordinary futures, tasks start running in the background as soon as we spawn them, so .awaiting a task handle works like joining a thread handle. It's common in network services to have a main loop that listens for new connections and spawns a thread to handle each connection. Async tasks let us use this same pattern without the performance overhead of threads.It's possible to do this with future combinators too, but common ones like join_all and select! assume a static set of futures. If you want to add futures dynamically, you need a fancier collection like FuturesUnordered. Runtimes like Tokio can also execute different tasks on different threads ("M:N threading"), but joined futures run on the same thread. That's exactly what we'll do in Part Three.

So, building on the main loop we wrote in Part One, we're going to write our own spawn. We'll do that in three steps: First we'll make space for multiple tasks in the main loop, then we'll write the spawn function to add new tasks, and finally we'll implement JoinHandle.

Dyn

We already know how to poll many futures at once, because that's what we did when we implemented JoinAll. How much of that code can we copy/paste?

One thing we need to change is the type of the futures Vec. Our JoinAll used Vec<Pin<Box<F>>>,We're still not paying much attention to Pin, but Box is about to do some important work. where F was a generic type parameter, but our main function doesn't have any type parameters. We also want the new Vec to be able to hold futures of different types at the same time.In other words we want a "heterogeneous" collection. JoinAll can do this too, if you set F to the same type we're about to use. The Rust feature we need here is "dynamic trait objects", dyn Trait.dyn Trait isn't specific to async. You might have seen it before in error handling, where Box<dyn Error> is a catch-all type for the ? operator. If you're coming from C++, dyn Trait is the closest thing Rust has to "virtual inheritance". If this is your first time seeing it, you might want to play with the Rust by Example page for dyn. Let's start with a type alias so we don't have to write this more than once:This is where we start to care about the difference between T and Box<T>. Because dyn Trait is a "dynamically sized type", we can't hold an object of that type directly in a local variable or a Vec element. We have to Box it.

type DynFuture = Pin<Box<dyn Future<Output = ()>>>;

Note that DynFuture doesn't have type parameters. We can fit any boxed future into this one type, as long as its Output is (). Now instead of building a join_future in our main function, we'll build a Vec<DynFuture>, and we'll start calling these futures "tasks":Box::pin(foo(n)) is still a concrete future type, but pushing it into the Vec<DynFuture> "coerces" the concrete type to dyn Future. Specifically, it's an "unsized coercion".

fn main() {
let mut tasks: Vec<DynFuture> = Vec::new();
for n in 1..=10 {
tasks.push(Box::pin(foo(n)));
}
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);

We can manage the Vec<DynFuture> using retain_mut like JoinAll did, removing futures from the Vec as soon as they're Ready. We need to rearrange the while loop into a loop/break so that we can do all the polling, then check whether we're done, then handle Wakers.We're still getting the next wake time with .next().expect(…), which panics if WAKE_TIMES is empty. If we forgot to register a wake time somehow, panicking instead of busy looping would help us catch that mistake, but it also means we need to be careful with the order of operations in our main loop. Now it looks like this:

let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
loop {
// Poll each task and remove any that are Ready.
let is_pending = |task: &mut DynFuture| {
task.as_mut().poll(&mut context).is_pending()
};
tasks.retain_mut(is_pending);

// If there are no tasks left, we're done.
if tasks.is_empty() {
break;
}

// Otherwise handle WAKE_TIMES and sleep as in Part One...

This works fine, though it might not feel like we've accomplished much. Mostly we just copy/pasted from JoinAll and tweaked the types. But we've laid some important groundwork.

Note that the behavior of this loop is somewhat different from how tasks work in Tokio. Rust normally exits when the main thread is done without waiting for background threads, and Tokio also exits when the main task is done without waiting for background tasks. However, this version of our main loop continues until all tasks are done. It also assumes that tasks have no return value. We'll fix both of these things when we get to JoinHandle below, but let's do spawn first.

Spawn

The spawn function is supposed to insert another future into the tasks Vec. How should it access the Vec? It would be convenient if we could do the same thing we did with WAKE_TIMES and make TASKS a global variable protected by a Mutex, but that's not going to work this time. Our main loop only locks WAKE_TIMES after it's finished polling, but if we made TASKS global, then the main loop would lock it during polling, and any task that called spawn would deadlock.

We'll work around that by maintaining two separate lists. We'll keep the tasks list where it is, local to the main loop, and we'll add a global list called NEW_TASKS. The spawn function can append to NEW_TASKS:We could use a VecDeque instead of a Vec if we wanted to poll tasks in FIFO order instead of LIFO order. We could also use a channel, which as an added benefit would would get rid of the while let footgun below. Opening a channel isn't const, however, so we'd need a OnceLock or similar to initialize the static.

static NEW_TASKS: Mutex<Vec<DynFuture>> = Mutex::new(Vec::new());

fn spawn<F: Future<Output = ()>>(future: F) {
NEW_TASKS.lock().unwrap().push(Box::pin(future));
}

Now the main loop can…wait that doesn't build:

error[E0277]: `(dyn Future<Output = ()> + 'static)` cannot be sent between threads safely
--> tasks_no_send_no_static.rs:43:19
|
43 | static NEW_TASKS: Mutex<Vec<DynFuture>> = Mutex::new(Vec::new());
| ^^^^^^^^^^^^^^^^^^^^^ `(dyn Future<Output = ()> + 'static)` cannot be sent between threads
|
= help: the trait `Send` is not implemented for `(dyn Future<Output = ()> + 'static)`, which is required by
`Mutex<Vec<Pin<Box<(dyn Future<Output = ()> + 'static)>>>>: Sync`

Global variables in Rust have to be Sync, and Mutex<T> is only Sync when T is Send.Send and Sync are the thread safety marker traits in Rust. Another way of putting this requirement is that a Mutex is only safe to share with other threads if the obejct inside of it is safe to move to other threads. DynFuture has to promise that it's Send:

type DynFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

Ok, now the…nope it still doesn't build:

error[E0277]: `F` cannot be sent between threads safely
--> src/main.rs:46:36
|
46 | NEW_TASKS.lock().unwrap().push(Box::pin(future));
| ^^^^^^^^^^^^^^^^ `F` cannot be sent between threads safely
|
= note: required for the cast from `Pin<Box<F>>` to
`Pin<Box<(dyn futures::Future<Output = ()> + std::marker::Send + 'static)>>`

Fair enough, spawn has to make the same promise:

fn spawn<F: Future<Output = ()> + Send>(future: F) { … }

Happy yet? Nope:

error[E0310]: the parameter type `F` may not live long enough
--> src/main.rs:46:36
|
46 | NEW_TASKS.lock().unwrap().push(Box::pin(future));
| ^^^^^^^^^^^^^^^^
| |
| the parameter type `F` must be valid for the static lifetime...
| ...so that the type `F` will meet its required lifetime bounds

Global variables have the 'static lifetime, meaning they don't hold pointers to anything that could go away. Trait objects like DynFuture are 'static by default, but type parameters like F are not. If spawn wants to put F in a global, it also has to promise that F is 'static:Note that join_all in Part One didn't have this 'static requirement. We can have multiple concurrent futures borrowing local variables, but we can't do the same with tasks. On the other hand, it's possible to run different tasks on different threads, as Tokio does by default, but we can't do that with non-'static futures. It would be nice if there was some task equivalent of thread::scope, but that turns out to be an open research question.

fn spawn<F: Future<Output = ()> + Send + 'static>(future: F) { … }

Finally it builds. That was a lot of ceremony just to make a global Vec, but let's think about what we wrote: Instead of a "Vec of futures", NEW_TASKS is a "Vec of thread-safe futures which don't hold any pointers that might become dangling." Rust doesn't have a garbage collector, so dangling pointers would lead to memory corruption bugs, and it's nice that we can just say we don't want those.The thread-safety requirement is arguably too strict, since we're not spawning any threads in this example. Rust doesn't have a way to say "I promise my program is single-threaded," but we could avoid the requirement by using a thread_local! instead of a static. In contrast, Tokio does use threads internally, so the Send requirement on tokio::task::spawn is unavoidable.

Ok…now the main loop can pop from NEW_TASKS and push into tasks. It's not much extra code, but there are a couple pitfalls to watch out for, and this time they're runtime bugs instead of compiler errors. First, we have to poll new tasks as we collect them, rather than waiting until the next iteration of the main loop, so they get a chance to register wakeups before we sleep.We'd notice this mistake immediately below, after we added the async_main function that calls spawn. If our main loop didn't poll those new tasks before it tried to read the next wakeup time, then there wouldn't be a wakeup time, and it would panic. Second, we have to make sure NEW_TASKS is unlocked before we poll, or else we'll recreate the same deadlock we were trying to avoid.Unfortunately this is an easy mistake to make. A method chain like .lock().unwrap().pop() creates a MutexGuard that lasts until the end of the current "temporary scope". In this example as written, that's the semicolon after the let else. But if we combined the inner loop and the let else into a while let, or if we replaced the let else with an if let, the guard would last until the end of the following block, and we'd still be holding the lock when we called poll. If we made this mistake, and if we also made foo call spawn before its first .await, this example would deadlock. This is an unfortunate footgun with Mutex. There's a Clippy lint for it, but as of Rust 1.82 it's still disabled by default.
  The formal rule for this behavior is that the first part of an if or while expression (the "condition") is a temporary scope, but the first part of a match, if let, or while let expression (the "scrutinee") is not. This rule is necessary for matching on borrowing methods like Vec::first or String::trim, but it's unnecessary and counterintuitive with methods like Vec::pop or String::len that return owned values. It might be nice if Rust dropped temporaries as soon as possible, but then drop timing would depend on borrow checker analysis, which isn't generally stable. Some Rust compilers have even skipped borrow checking entirely, since correct programs can be compiled without it.
Here's the expanded main loop:

loop {
// Poll each task, removing any that are Ready.
let is_pending = |task: &mut DynFuture| {
task.as_mut().poll(&mut context).is_pending()
};
tasks.retain_mut(is_pending);

// Collect new tasks, poll them, and keep the ones that are Pending.
loop {
let Some(mut task) = NEW_TASKS.lock().unwrap().pop() else {
break;
};
// Polling this task could spawn more tasks, so it's important that
// NEW_TASKS isn't locked here.
if task.as_mut().poll(&mut context).is_pending() {
tasks.push(task);
}
}

// If there are no tasks left, we're done.
if tasks.is_empty() {
break;
}

// Otherwise handle WAKE_TIMES and sleep as in Part One...

With all that in place, instead of hardcoding the whole task list in main, we can define an async_main function and let it do the spawning:

async fn async_main() {
// The main() loop currently waits for all tasks to finish.
for n in 1..=10 {
spawn(foo(n));
}
}

fn main() {
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
let mut tasks: Vec<DynFuture> = vec![Box::pin(async_main())];

It works! Because of how we push and pop NEW_TASKS, the order of prints is different now. We could fix that, but let's keep it this way. It's a good reminder that, like threads, tasks running at the same time can run in any order.

JoinHandle

As we noted above, Tokio supports tasks that run in the background without blocking program exit, and it also supports tasks with return values.We won't need task return values ourselves, but once we implement blocking, we'll see that carrying a value doesn't add any extra lines of code. We will need non-blocking background tasks when we get to IO, so that our example can exit after "client" tasks are finished, without taking extra steps to shut down the "server" task. Both of those things require tokio::task::spawn to return a tokio::task::JoinHandle, very similar to how thread::spawn returns a thread::JoinHandle. We'll implement our own JoinHandle to get the same features. Also, the only way for our tasks to block so far has been sleep, and introducing a second form of blocking will lead to an interesting bug.

JoinHandle needs to communicate between a task that's finishing and another task that's waiting for it to finish. The waiting side needs somewhere to put its Waker so that the finishing side can invoke it,Note that we only need space for one Waker. It's possible that different calls to poll could supply different Wakers, but the contract of Future::poll is that "only the Waker from the Context passed to the most recent call should be scheduled to receive a wakeup." and the finishing side needs somewhere to put its return value, T, so that the waiting side can receive it. We don't need both of those things at the same time, so we can use an enum. This enum needs to be shared and mutable, so we'll wrap it in an ArcArc is an atomic reference-counted smart pointer, similar to std::shared_ptr in C++. It behaves like a shared reference, but it's not bound to the lifetime of any particular scope. Arc is the standard way to share objects that don't have a fixed scope (so you can't use a shared reference) but also aren't global (so you can't use a static). and a Mutex:If we had used thread_local! instead of static to implement NEW_TASKS above, and avoided the Send requirements that came with that, then we could use Rc and RefCell here instead of Arc and Mutex.

enum JoinState<T> {
Unawaited,
Awaited(Waker),
Ready(T),
Done,
}

struct JoinHandle<T> {
state: Arc<Mutex<JoinState<T>>>,
}

Awaiting the JoinHandle is how we wait for a task to finish, so JoinHandle needs to implement Future. One tricky detail here is that the waiting thread wants to take ownership of the T from JoinState::Ready(T), but Arc<Mutex<JoinState>> only lets us access the JoinState through a reference, so we can't move out the T and "leave a hole" behind that reference. Instead, we need to swap out the whole JoinState with mem::replace:It would be more convenient if we could use mem::take directly on &mut T, but that only works if T implements Default, and we don't want our spawn function to require that. Another option is a library called replace_with, which lets us "leave a hole" behind any &mut T temporarily, but it's not entirely clear whether that approach is sound.

impl<T> Future for JoinHandle<T> {
type Output = T;

fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<T> {
let mut guard = self.state.lock().unwrap();
// Use JoinState::Done as a placeholder, to take ownership of T.
match mem::replace(&mut *guard, JoinState::Done) {
JoinState::Ready(value) => Poll::Ready(value),
JoinState::Unawaited | JoinState::Awaited(_) => {
// Replace the previous Waker, if any.
*guard = JoinState::Awaited(context.waker().clone());
Poll::Pending
}
JoinState::Done => unreachable!("polled again after Ready"),
}
}
}

Futures passed to spawn don't know anything about JoinState, so we'll also need a wrapper function to handle their return values and invoke the Waker if there is one:

async fn wrap_with_join_state<F: Future>(
future: F,
join_state: Arc<Mutex<JoinState<F::Output>>>,
) {
let value = future.await;
let mut guard = join_state.lock().unwrap();
if let JoinState::Awaited(waker) = &*guard {
waker.wake_by_ref();
}
*guard = JoinState::Ready(value)
}

Now we can build a JoinState and apply that wrapper function in spawn, so that it accepts any Output type and returns a JoinHandle:T will need to be Send and 'static just like F. The future returned by wrap_with_join_state needs to be coercible to DynFuture, which means the JoinState<T> that it contains needs to be Send and 'static, which means T needs to be Send and 'static. This time around I'll skip the "discovery" phase and just write the bounds correctly the first time.

fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let join_state = Arc::new(Mutex::new(JoinState::Unawaited));
let join_handle = JoinHandle {
state: Arc::clone(&join_state),
};
let task = Box::pin(wrap_with_join_state(future, join_state));
NEW_TASKS.lock().unwrap().push(task);
join_handle
}

We'll collect and .await those JoinHandles in async_main, similar to how we managed Tokio tasks above:The Tokio version had an extra .unwrap() after handle.await, because Tokio catches panics and converts them to Results, like .join() does with thread handles. If we wanted to do the same thing, then inside of wrap_with_join_state above we'd use FutureExt::catch_unwind, the async-adapted version of std::panic::catch_unwind. JoinHandle::Output would become the corresponding Result.

async fn async_main() {
let mut task_handles = Vec::new();
for n in 1..=10 {
task_handles.push(spawn(foo(n)));
}
for handle in task_handles {
handle.await;
}
}

Now that we can explicitly wait on tasks, we want our main loop to exit after the main task is finished. Let's split the main task out from the tasks list and call the list other_tasks:Eagle-eyed readers might spot that we're no longer coercing main_task to a DynFuture. That means that async_main doesn't have to return (). We'll take advantage of that in Part Three to return io::Result<()>. Technically async_main doesn't have to be Send anymore either, but we won't mess with that.

fn main() {
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
let mut main_task = Box::pin(async_main());
let mut other_tasks: Vec<DynFuture> = Vec::new();
loop {
// Poll the main task and exit immediately if it's done.
if main_task.as_mut().poll(&mut context).is_ready() {
return;
}
// Poll other tasks and remove any that are Ready.
let is_pending = |task: &mut DynFuture| {
task.as_mut().poll(&mut context).is_pending()
};
other_tasks.retain_mut(is_pending);
// Handle NEW_TASKS and WAKE_TIMES...

Done! That was a lot of changes all at once. Fortunately, it all builds. It even almost works. Our program prints the correct output, but then it panics:


end 3
end 2
end 1
thread 'main' panicked at src/main.rs:143:50:
sleep forever?

This is the interesting bug we were looking forward to.

Waker

The panic is coming from this line, which has been in our main loop since the end of Part One:

let mut wake_times = WAKE_TIMES.lock().unwrap();
let next_wake = wake_times.keys().next().expect("sleep forever?");
thread::sleep(next_wake.saturating_duration_since(Instant::now()));

The loop is about to sleep, so it asks for the next wake time, but the WAKE_TIMES tree is empty. Previously we could assume that if any task returned Pending, there must be at least one wake time registered, because the only source of blocking was Sleep. But now we have a second source of blocking: JoinHandle. If a JoinHandle is Pending, that could be because another task is sleeping and has register a wake time. However, it could also be that that other task is about to return Ready as soon as we poll it, but we haven't polled it yet. This is sensitive to the order of our tasks list. If a task at the front is waiting on a task at the back, we might end up with Pending tasks and yet no wakeups scheduled.

That's exactly what's happened to us. Our main task is probably blocking on the first JoinHandle. The main loop wakes up and polls the main task, and that JoinHandle is still Pending. Then it polls all the other_tasks, and each of them prints an "end" message, signals its JoinHandle, and returns Ready. At that point, we need to poll the main task again instead of trying to sleep. How can we communicate that to the main loop?It might be tempting to remove the .expect(…) and instead continue the main loop when WAKE_TIMES is empty. That would fix this example without any new communication, but more complicated examples would still have timing bugs. If some tasks sleep longer than others, we might need to re-poll immediately even when WAKE_TIMES isn't empty. We could make another static flag, but this time we have a better option. We'll use our Waker.

We've been using futures::task::noop_waker to supply a dummy Waker since Part One. When Sleep was the only source of blocking, there was no way for one task to unblock another, and all we needed from Waker was a placeholder to satisfy the compiler. But now things have changed. Our wrap_with_join_state function is already invoking Wakers correctly when tasks finish, and we want to hear about it when that happens. How do we write our own Waker?

Waker implements From<Arc<W>>, where W is anything with the Wake trait, which requires a wake method.There's also a fancy unsafe way to build a Waker from something called a RawWaker. That's what Tokio does, and it's what we'd have to do if we were targeting a no_std environment without Arc. That method takes Arc<Self>, which is a little funny,Arc is there because Waker is Clone. It would be nice if we could address that more directly with a bound like W: Wake + Clone on the From impl, but that turns out not to work because of a requirement of dyn Trait objects called "object safety". but apart from that it can do whatever we like. The simplest option is to build what's effectively an Arc<Mutex<bool>>AtomicBool would be more efficient, but again Mutex is more familiar and good enough. If you want a three hour deep dive on atomics, listen to "atomic<> Weapons" by Herb Sutter. That talk is focused on C++, but C and Rust both copied the C++ atomic memory model. and to set it to true when any task has received a wakeup.If we wanted to do more bookkeeping, we could also construct a unique Waker for each task and then only poll the tasks that received wakeups. We saw that futures::future::JoinAll does something like this in Part One. We could get this "for free" by replacing our tasks Vec with a FuturesUnordered. That's not so different from a static flag, but it lets other people's futures invoke our Waker without needing to know the private implementation details of our main loop. Here's our glorified bool:

struct AwakeFlag(Mutex<bool>);

impl AwakeFlag {
fn check_and_clear(&self) -> bool {
let mut guard = self.0.lock().unwrap();
let check = *guard;
*guard = false;
check
}
}

impl Wake for AwakeFlag {
fn wake(self: Arc<Self>) {
*self.0.lock().unwrap() = true;
}
}

We can create an AwakeFlag and make a Waker from it at the start of main:

fn main() {
let awake_flag = Arc::new(AwakeFlag(Mutex::new(false)));
let waker = Waker::from(Arc::clone(&awake_flag));
let mut context = Context::from_waker(&waker);

And if that AwakeFlag is set, the main loop should re-poll:The reason I defined check_and_clear above is that we can create another deadlock if we lock awake_flag here but don't drop the MutexGuard as soon as we're done with it. The last thing the main loop does is invoking Wakers, which ends up calling AwakeFlag::wake and taking the same lock.

// Collect new tasks, poll them, and keep the ones that are Pending.
loop {
let Some(mut task) = NEW_TASKS.lock().unwrap().pop() else {
break;
};
if task.as_mut().poll(&mut context).is_pending() {
other_tasks.push(task);
}
}
// Some tasks might wake other tasks. Re-poll if the AwakeFlag has been
// set. Polling futures that aren't ready yet is inefficient but allowed.
if awake_flag.check_and_clear() {
continue;
}
// Otherwise handle WAKERS and sleep as in Part One...

It works! We've implemented tasks.

We're about to move beyond sleeping and printing to look at real IO, and we'll use spawn to handle network connections.