Async Rust, Part Three: IO
2024 October 23
- Introduction
- Part One: Futures
- Part Two: Tasks
- Part Three: IO (you are here)
Of course async/await wasn't invented for sleeping. Our goal all along has been IO, especially network IO. Now that we have futures and tasks, we can start doing some real work.
Let's go back to ordinary, non-async Rust for a moment. We'll start with a toy server program and a client that talks to it. Then we'll use threads to combine the server and several clients into one example that we can run on the Playground. Once that combination is working, we'll translate it into async, building on the main loop we wrote in Part Two.
Here's our toy server:
fn main() -> io::Result<()> {
let listener = TcpListener::bind("0.0.0.0:8000")?;
let mut n = 1;
loop {
let (mut socket, _) = listener.accept()?;
let start_msg = format!("start {n}\n");
socket.write_all(start_msg.as_bytes())?;
thread::sleep(Duration::from_secs(1));
let end_msg = format!("end {n}\n");
socket.write_all(end_msg.as_bytes())?;
n += 1;
}
}
It starts listening on port 8000.​0.0.0.0
is the special IP address that means "all IPv4 interfaces
on this host". It's the standard way to listen for connections coming from
anywhere, at least in examples that don't need IPv6 support. If we bound
the listener to localhost
instead, it would work when the client and the
server were on the same machine, but it would reject connections from the
network. For each connection it receives it
writes a start message, sleeps for one second, and writes an end
message.​We could use write!
or writeln!
instead of format!
to avoid
allocating a String
here, but that results in three separate writes to
the TcpStream
, one for the prefix, one for the number, and one more for
the newline. That's probably slower than allocating. Separate writes also
tend to appear as separate reads on the client side, so we'd need to do
line buffering to avoid garbled output when we run multiple clients at once
below. It's not guaranteed that the format!
approach will come out as one
read, but in small examples like these it generally does. Here's a client for our toy server:
fn main() -> io::Result<()> {
let mut socket = TcpStream::connect("localhost:8000")?;
io::copy(&mut socket, &mut io::stdout())?;
Ok(())
}
This client opens a connection to the server and copies all the bytes it
receives to standard output, as soon as they arrive. It doesn't explicitly
sleep, but it still takes a second, because the server takes a second to finish
responding. Under the covers, io::copy
is a convenience wrapper around the
standard Read::read
and Write::write
methods, and read
blocks until
input arrives.
These programs can't talk to each other on the Playgroud. You might want to
take the time to run them on your computer, or even better on two different
computers on your WiFi network.​In that case you'll need to change localhost
in the client to
the IP address of your server. If you haven't done this before,
seeing it work on a real network is pretty cool. Reviewing the web server
project from Chapter 20 of The Book might be helpful too.
Threads
Let's get this working on the Playground by putting the client and server
together in one program. Since they're both blocking, we'll have to run them on
separate threads. We'll rename their main
functions to client_main
and
server_main
, and while we're at it we'll run ten clients at the same
time:​Note that the return type of handle.join()
in this example is
thread::Result<io::Result<()>>
, i.e. a Result
nested in another
Result
. IO errors from client threads wind up in the inner Result
and
are handled with ?
. The outer Result
represents whether the client
thread panicked, and we propagate those panics with .unwrap()
. The server
thread normally runs forever, so we can't join
it. If it does
short-circuit with an error, though, we don't want that error to be silent.
Unwrapping server thread IO errors prints to stderr in that case, which is
better than nothing.
fn main() -> io::Result<()> {
// Avoid a race between bind and connect by binding before spawn.
let listener = TcpListener::bind("0.0.0.0:8000")?;
// Start the server on a background thread.
thread::spawn(|| server_main(listener).unwrap());
// Run ten clients on ten different threads.
let mut client_handles = Vec::new();
for _ in 1..=10 {
client_handles.push(thread::spawn(client_main));
}
for handle in client_handles {
handle.join().unwrap()?;
}
Ok(())
}
This works on the Playground, but it takes ten seconds. Even though the clients
are running in parallel, the server is only responding to one of them at a
time. Let's make the server spawn a new thread for each incoming
request:​The move
keyword is necessary here because otherwise the closure
would borrow n
, which violates the 'static
requirement of
thread::spawn
. Rust is right to complain about this, because if
server_main
returned while response threads were still running, pointers
to n
would become dangling.
fn one_response(mut socket: TcpStream, n: u64) -> io::Result<()> {
let start_msg = format!("start {n}\n");
socket.write_all(start_msg.as_bytes())?;
thread::sleep(Duration::from_secs(1));
let end_msg = format!("end {n}\n");
socket.write_all(end_msg.as_bytes())?;
Ok(())
}
fn server_main(listener: TcpListener) -> io::Result<()> {
let mut n = 1;
loop {
let (socket, _) = listener.accept()?;
thread::spawn(move || one_response(socket, n).unwrap());
n += 1;
}
}
It still works, and now it only takes one second. This is exactly the behavior we want. Now we're ready for our final project: expanding the main loop from Part Two and translating this example into async.
There are two big problems we need to solve. First, we need IO functions that
return immediately instead of blocking, even when there's no input yet, so that
we can use them in Future::poll
.​Remember that blocking in poll
holds up the entire main loop,
which in our single-threaded implementation will block all tasks. That's
always a performance issue, but in this case it's a correctness issue too.
Once we get this example working, we'll have ten client tasks waiting to
read input from the server task. If a client task blocks the server task,
then input will never arrive, and the program will deadlock. Second, when all our tasks are
waiting for input, we want to sleep instead of busy looping, and we need a way
to wake up when any input arrives.
Non-blocking
There's a solution for the first problem in the standard
library.​Well, there's three quarters of a solution. For the rest
we're gonna cheat… TcpListener
and TcpStream
both have
set_nonblocking
methods, which make accept
, read
, and write
return
ErrorKind::WouldBlock
instead of blocking.
Technically, set_nonblocking
by itself is enough to get async IO working.
Without solving the second problem, we'll burn 100% CPU busy looping until we
exit, but our output will still be correct, and we can lay a lot of groundwork
before we get to the more complicated part.
When we wrote Foo
, JoinAll
, and Sleep
in Part One, each of them required
a struct definition, a poll
function, and a constructor function. To cut down
on boilerplate this time around, we'll use std::future::poll_fn
, which
takes a standalone poll
function and generates the rest of the future.
There are four potentially blocking operations that we need to async-ify.
There's accept
and write
on the server side, and there's connect
and
read
on the client side. Let's start with accept
:​We're writing this as an async function that creates a future
and then immediately awaits it, but we could also have written it as a
non-async function that returns that future. That would be cleaner, but
we'd need lifetimes in the function signature, and the "obvious" way to
write them turns out to be subtly incorrect. The 2024
Edition will fix this by changing the way that "return position impl
Trait
" types "capture" lifetime parameters.
async fn accept(
listener: &mut TcpListener,
) -> io::Result<(TcpStream, SocketAddr)> {
std::future::poll_fn(|context| match listener.accept() {
Ok((stream, addr)) => {
stream.set_nonblocking(true)?;
Poll::Ready(Ok((stream, addr)))
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// TODO: This is a busy loop.
context.waker().wake_by_ref();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}).await
}
The key here is handling WouldBlock
errors by converting them to Pending
.
Calling wake_by_ref
whenever we return Pending
, like we did in the second
version of Sleep
from Part One, makes this a busy loop. We'll
fix that in the next section. We're assuming that the TcpListener
is already
in non-blocking mode,​And we're going to assume that non-blocking calls never return
ErrorKind::Interrupted
/EINTR
, so we don't need an extra line of
code in each example to retry that case. and we're putting the returned TcpStream
into
non-blocking mode too,​Eagle-eyed readers might spot that our poll_fn
closure is using
the ?
operator with set_nonblocking
, even though the closure itself
returns Poll
. This works because there's a Try
implementation for
Poll<Result<…>>
that uses the same associated
Residual
type as the Try
implementation for Result<…>
.
See RFC 3058 for the details of the Try
trait, which are still unstable
as of Rust 1.82. to get ready for async writes.
Next let's implement those writes. If we wanted to copy Tokio, we'd define an
AsyncWrite
trait and make everything generic, but that's a lot of code.
Instead, let's keep it short and hardcode that we're writing to a TcpStream
:
async fn write_all(
mut buf: &[u8],
stream: &mut TcpStream,
) -> io::Result<()> {
std::future::poll_fn(|context| {
while !buf.is_empty() {
match stream.write(&buf) {
Ok(0) => {
let e = io::Error::from(io::ErrorKind::WriteZero);
return Poll::Ready(Err(e));
}
Ok(n) => buf = &buf[n..],
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// TODO: This is a busy loop.
context.waker().wake_by_ref();
return Poll::Pending;
}
Err(e) => return Poll::Ready(Err(e)),
}
}
Poll::Ready(Ok(()))
}).await
}
TcpStream::write
isn't guaranteed to consume all of buf
, so we need to call
it in a loop, bumping buf
forward each time. It's unlikely that we'll see
Ok(0)
from TcpStream
,​Ok(0)
from a write means that either the input buf
was empty,
which is ruled out by our while
condition, or that the writer can't
accept more bytes. The latter mostly applies to not-real-IO writers like
&mut
[u8]
. When real-IO writers like TcpStream
or
File
can't accept more bytes (because the other end is closed or the disk
is full) they usually indicate that with Err
rather than Ok(0)
. but if we do it's better for that to be an
error than an infinite loop. The loop condition also means that we won't make
any calls to write
if buf
is initially empty, which matches the default
behavior of Write::write_all
.​It would be nice to use Write::write_all
directly here and get
the loop and the WriteZero
handling for free. But unfortunately, when
Write::write_all
returns WouldBlock
, it doesn't tell us how many bytes
it wrote before that, and we need that number to update buf
. In contrast,
if Write::write
needs to block after it's already written some bytes, it
returns Ok(n)
first, and then the next call returns WouldBlock
.
Those are the async building blocks we needed for the server, and now we can
write the async version of server_main
:​I'm pretty sure this is the first time we've implicitly relied
on Pin
guarantees for soundness. The compiler-generated one_response
future owns a TcpStream
, but it also passes references to that stream
into write_all
futures, and it owns those too. That would be unsound if
the one_response
future could move (thus moving the TcpStream
) after
those borrows were established.
async fn one_response(mut socket: TcpStream, n: u64) -> io::Result<()> {
let start_msg = format!("start {n}\n");
write_all(start_msg.as_bytes(), &mut socket).await?;
sleep(Duration::from_secs(1)).await;
let end_msg = format!("end {n}\n");
write_all(end_msg.as_bytes(), &mut socket).await?;
Ok(())
}
async fn server_main(mut listener: TcpListener) -> io::Result<()> {
let mut n = 1;
loop {
let (socket, _) = accept(&mut listener).await?;
spawn(async move { one_response(socket, n).await.unwrap() });
n += 1;
}
}
Similar to the threads example we started with, we never join
server tasks, so we use unwrap
to print to stderr if they fail.​In our case panicking in any task will print and then take down
the whole process, because we're not using background threads, and we're
not catching panics. But as we noted with JoinHandle
in Part Two, Tokio
does catch panics, even in single-threaded mode.
Previously we did that inside a closure, and here we do it inside an async
block, which works like an anonymous async
fn
that takes no arguments.
Hopefully that works, but we need to translate the client before we can test it.
We just did async writes, so let's do async reads. The counterpart of
Write::write_all
is Read::read_to_end
, but that's not quite what we want
here. We want to print output as soon as it arrives, rather than collecting it
in a Vec
and printing it all at the end. Let's keep things short again and
hardcode the printing. We'll call it print_all
:​In Tokio we'd use tokio::io::copy
for this, the same way we used
std::io::copy
in the non-async client. Writing a generic, async copy
function would mean we'd need AsyncRead
and AsyncWrite
traits and
implementations too, though, and that's a lot more code.
async fn print_all(stream: &mut TcpStream) -> io::Result<()> {
std::future::poll_fn(|context| {
loop {
let mut buf = [0; 1024];
match stream.read(&mut buf) {
Ok(0) => return Poll::Ready(Ok(())), // EOF
// Assume that printing doesn't block.
Ok(n) => io::stdout().write_all(&buf[..n])?,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// TODO: This is a busy loop.
context.waker().wake_by_ref();
return Poll::Pending;
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}).await
}
Ok(0)
from a read means end-of-file, but otherwise this is similar to
write_all
above.​We're cheating a little bit by assuming that printing doesn't
block, but that's not really any different from using println!
in an
async function, which we've been doing the whole time. Realistically,
programs that write enough to stdout to fill their pipe buffer (tar
or
gzip
for example) can't make progress when their output is blocked
anyway, and it's common to ignore this.
The other async building block we need for our client is connect
, but there
are a couple of problems with that. First, TcpStream::connect
creates a new
stream, and there's no way for us to call set_nonblocking
on that stream
before connect
talks to the network.​We would need to solve this with the socket2
crate, which
separates Socket::new
from Socket::connect
. Second, connect
can include
a DNS lookup, and async DNS is a whole can of worms.​DNS needs to read config files like /etc/resolv.conf
, so the OS
implementation is in libc rather than in the kernel, and libc only exposes
blocking interfaces like getaddrinfo
. Those configs are unstandardized
and platform-specific, and reading them is a pain. Even Tokio punts on this
and makes a blocking call to getaddrinfo
on a thread pool.
For comparison, the net
module in the Golang standard library contains
two DNS implementations, an async resolver for simple
cases, and a fallback resolver that also calls getaddrinfo
on a thread
pool. Solving those
problems here would be a lot of trouble without much benefit…so we're
going to cheat and just assume that connect
doesn't block.​This is big-time cheating, but our example only connects to
itself, so we'll get away with it. In the real world we'd use a proper
async implementation like tokio::net::TcpStream::connect
.
With one real async building block and one blatant lie, we can write
client_main
:
async fn client_main() -> io::Result<()> {
// XXX: Assume that connect() returns quickly.
let mut socket = TcpStream::connect("localhost:8000")?;
socket.set_nonblocking(true)?;
print_all(&mut socket).await?;
Ok(())
}
And finally async_main
:
async fn async_main() -> io::Result<()> {
// Avoid a race between bind and connect by binding before spawn.
let listener = TcpListener::bind("0.0.0.0:8000")?;
listener.set_nonblocking(true)?;
// Start the server on a background task.
spawn(async { server_main(listener).await.unwrap() });
// Run ten clients as ten different tasks.
let mut task_handles = Vec::new();
for _ in 1..=10 {
task_handles.push(spawn(client_main()));
}
for handle in task_handles {
handle.await?;
}
Ok(())
}
It works! It busy loops and burns 100% CPU, but it really does work. That's a lot of groundwork laid.
Poll
The second big problem we need to solve is sleeping the main loop until input
arrives. This isn't something we can do on our own, and we need help from the
OS. We're going to use the poll
"system call" for this,​It's no coincidence that Rust's Future::poll
interface shares its
name with the poll
system call and the C standard library function that
wraps it. They solve different layers of the same problem, managing many IO
operations at the same time without a busy loop. which is
available on all Unix-like OSs, including Linux and macOS.​We use "syscalls" all the time under the covers, but we don't often
call them directly. Basic OS features like files and threads work roughly
the same way across common OSs, so standard library abstractions like
File
and Thread
are usually all we need. But async IO is a different
story: The interfaces provided by different OSs vary widely, and the world
hasn't yet settled on one right way to do it. We'll use poll
in these
examples because it's relatively simple and widely supported, but there are
many other options. The oldest is select
, which is similar to poll
but kind of deprecated. Modern, higher-performance options include
epoll
and io_uring
on Linux, kqueue
on macOS and BSD, and
IOCP on Windows. For a medium-level, cross-platform Rust library that
abstracts over several of these, see mio
. We'll
invoke it using the C standard library function libc::poll
, which looks
like this in Rust:
pub unsafe extern "C" fn poll(
fds: *mut pollfd,
nfds: nfds_t,
timeout: c_int,
) -> c_int
libc::poll
takes a list​As usual with C functions, the list is split into two arguments, a
raw pointer to the first element and a count of elements. of "poll file descriptors" and a timeout in
milliseconds. The timeout will let us wake up for sleeps in addition to IO,
replacing thread::sleep
in our main loop. Each pollfd
looks like this:
struct pollfd {
fd: c_int,
events: c_short,
revents: c_short,
}
The fd
field is a "file descriptor", or in Rust terms a "raw" file
descriptor. It's an identifier that Unix-like OSs use to track open resources
like files and sockets. We can get a descriptor from a TcpListener
or a
TcpStream
by calling .as_raw_fd()
, which returns a RawFd
,
a type alias for c_int
.​Unfortunately, none of these raw file descriptor operations will
compile on Windows. The Windows counterpart of as_raw_fd
is
as_raw_handle
. This is a low enough level of detail that the Rust
standard library doesn't try to abstract over platform differences. The
Unix function isn't defined on Windows targets, and the Windows function
isn't defined on Unix targets. To make code like this portable, we have to
write it at least twice, using #[cfg(unix)]
and #[cfg(windows)]
to gate
each implementation to its target platform.
The events
field is a collection of bitflags indicating what we're waiting
for. The most common events are POLLIN
, meaning input is available, and
POLLOUT
, meaning space is available in output buffers. We'll wait for
POLLIN
when we get WouldBlock
from a read, and we'll wait for POLLOUT
when we get WouldBlock
from a write.
The revents
field ("returned events") is similar but used for output rather
than input. After poll
returns, the bits in this field indicate whether the
corresponding descriptor was one of the ones that caused the wakeup. We could
use this to poll only the specific tasks that the wakeup is for, but for
simplicity we'll ignore this field and poll every task every time we wake up.
Our async IO functions, accept
, write_all
, and print_all
, need a way to
send pollfd
s and Waker
s back to main
, so that main
can call
libc::poll
. We'll add a couple more global Vec
s for this, plus a helper
function to populate them:​Whenever we hold more than one lock at a time, we need to make
sure that all callers lock them in the same order, to avoid deadlocks.
We're locking POLL_FDS
before POLL_WAKERS
here, so we'll do the same in
main
.
static POLL_FDS: Mutex<Vec<libc::pollfd>> = Mutex::new(Vec::new());
static POLL_WAKERS: Mutex<Vec<Waker>> = Mutex::new(Vec::new());
fn register_pollfd(
context: &mut Context,
fd: &impl AsRawFd,
events: libc::c_short,
) {
let mut poll_fds = POLL_FDS.lock().unwrap();
let mut poll_wakers = POLL_WAKERS.lock().unwrap();
poll_fds.push(libc::pollfd {
fd: fd.as_raw_fd(),
events,
revents: 0,
});
poll_wakers.push(context.waker().clone());
}
Now our async IO functions can call register_pollfd
instead of wake_by_ref
.
accept
and print_all
are reads, so they handle WouldBlock
by setting
POLLIN
:
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
register_pollfd(context, listener, libc::POLLIN);
Poll::Pending
}
write_all
handles WouldBlock
by setting POLLOUT
:
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
register_pollfd(context, stream, libc::POLLOUT);
return Poll::Pending;
}
Finally, main
. We'll start by preparing the timeout
argument for
libc::poll
. This is similar to how we've been computing the next wake time
all along, except now we're not guaranteed to have one,​Previously, sleeping forever could only be a bug, but now that we
can wait on IO in addition to sleeping, waiting forever is valid. and we need
to convert it to milliseconds:
// Some tasks might wake other tasks. Re-poll if the AwakeFlag has been
// set. Polling futures that aren't ready yet is inefficient but allowed.
if awake_flag.check_and_clear() {
continue;
}
// All tasks are either sleeping or blocked on IO. Use libc::poll to wait
// for IO on any of the POLL_FDS. If there are any WAKE_TIMES, use the
// earliest as a timeout.
let mut wake_times = WAKE_TIMES.lock().unwrap();
let timeout_ms = if let Some(time) = wake_times.keys().next() {
let duration = time.saturating_duration_since(Instant::now());
duration.as_millis() as libc::c_int
} else {
-1 // infinite timeout
};
After all that preparation, we can replace thread::sleep
with libc::poll
in
the main loop. It's a "foreign" function, so calling it is unsafe
:​We know that the raw pointer is valid, and that libc::poll
won't
retain that pointer after returning. We might also worry about what happens
if one of the descriptors in POLL_FDS
came from a socket that's since
been closed. In that case the descriptor might refer to nothing, or it
might've been reused by the kernel to refer to an unrelated file or socket.
Since libc::poll
doesn't modify any of its arguments (including for
example reading from a file, which would advance the cursor), the worst
that can happen here is a "spurious wakeup", where some event for an
unrelated file wakes us up early. Our code already handles busy loop
polling, so spurious wakeups are no problem.
let mut poll_fds = POLL_FDS.lock().unwrap();
let mut poll_wakers = POLL_WAKERS.lock().unwrap();
let poll_error_code = unsafe {
libc::poll(
poll_fds.as_mut_ptr(),
poll_fds.len() as libc::nfds_t,
timeout_ms,
)
};
if poll_error_code < 0 {
return Err(io::Error::last_os_error());
}
Last of all, when we wake up and libc::poll
returns, we need to clear
POLL_FDS
invoke all the POLL_WAKERS
. The main loop still polls every task
every time, and tasks that aren't Ready
will re-register themselves in
POLL_FDS
before the next sleep.
poll_fds.clear();
poll_wakers.drain(..).for_each(Waker::wake);
// Invoke Wakers from WAKE_TIMES if their time has come.
while let Some(entry) = wake_times.first_entry() {
…
It works!
And that's it. We did it. Our main loop is finally an event loop.
Hopefully this little adventure has made async Rust and async IO in general
seem less magical. There's lots more to explore and look forward to, like
future language features and all the gory details of
Pin
. Good luck out there :)