Swatinem Blog Resume

The Complexities of Rust Async Streams

— 10 min

I have been working on a usecase recently where I wanted to do some streaming uploads an downloads, both on the client- and the server-side.

And boy do I need to tell you, its way too confusing and complicated than it probably needs to be. The normal async ecosystem in Rust works quite well without too much hassle. But dealing with streams is a completely different story.

As I see it, there are four parts to this problem:

# AsyncRead

Alright, lets get started. First off, we have the AsyncRead trait/s:

// the `futures` one:
trait AsyncRead {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize, Error>>;
}
// the `tokio` one:
trait AsyncRead {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<()>>;
}

Both are nearly identical, the only difference is that tokio is using a &mut ReadBuf instead of a &mut [u8]. As far as I understand, this is just a smarter type that can deal better with uninitialized memory. In order to get a &mut [u8], you would either have to initialize (zero) the buffer first, or use unsafe code to tell the compiler “trust me bro“.

You could think of this a little bit like an async fn:

trait AsyncRead {
    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error>;
}

Though there is a subtle but very important difference. The async fn version returns a future that you can poll, whereas with poll_read, you poll the trait implementor directly, which might simplify some internals.

# Stream

Now, the Stream or AsyncIterator trait looks different, and is more generic than AsyncRead.

trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>>;
}

Instead of just dealing with bytes and buffers, the Stream is generic over anyItem type.

As we are dealing with IO, bytes and buffer, and we also have to consider possible errors, we typically have Item = Result<Bytes, io::Error>.

# io_uring

Now, both of these traits are not a good fit for io_uring. Why is that? Because of cancellation safety.

So what is cancellation safety you might ask. Consider that we call poll_read with a &mut [u8]. We will then get back either a Poll::Ready, which means the operation has been carried out immediately, or we get a Poll::Pending, which says “check back later” (once the Context has been woken, that is).

In the second case, we can decide to just not call poll_read ever again, and do something else with our &mut [u8].

This just fundamentally does not work with io_uring. In that world, we submit a request to the kernel, and give it a pointer to write into. The kernel then does that write whenever, and notifies us when it is finished. During that time frame, we are not allowed to touch, and in particular to deallocate that &mut [u8]. The kernel essentially takes over ownership of that buffer.

For that reason, io_uring-based APIs look different yet again, similar to this:

trait UringRead {
    async fn read(&self, buf: Vec<u8>) -> (Result<usize, Error>, Vec<u8>);
}

I’m oversimplifying here, and there is no crate that actually looks like this. But the point I’m trying to make is this: We pass in ownership of a buffer, and at some later point we get that buffer back, along with a result.

This makes using these APIs slightly more inconvenient:

// instead of:
let mut buf = vec![0; 1024];
let bytes_read = reader.read(&mut buf).await?;
// we have to:
let (res, buf) = reader.read(vec![0; 1024]).await;
let bytes_read = res?;

Having to unwrap the result separately from the function call is a bit hurtful on my eyes, but I think I might get used to this after a while.

# How does IO actually work?

Beware, now I am moving into speculation territory. I am just a humble Rust developer, and no expert on how the (Linux) kernel actually works, and how it interfaces with hardware, so I might get things wrong here. I’m no describing how things are, but how I imagine they could be.

Lets walk back a little bit, and look at the evolution of IO, from sync, over async to io_uring.

For this example, we are considering a network interface. Networks usually work in packets. Each of those packets has a maximum transmission unit (MTU) of typically 1500 bytes, and is received from the network interface whenever.

Now, if we are doing a blocking call to read, with a buffer that is large enough, the kernel will just wait for as long as the packet arrives, and will write it directly to the provided buffer, in a zero-copy fashion (speculation on my end).

This is great, except the waiting part. Thats why we have async in the first place, right? In the async world, using a more-or-less literal translation of the read call, things look a bit different.

First, we need to explicitly switch to non-blocking mode. In the case of network, we can do that with TcpListener::set_nonblocking. The docs have the following to say:

If the IO operation could not be completed and needs to be retried, an error with kind io::ErrorKind::WouldBlock is returned.

So what happens is that we call read once, but most likely we will get a WouldBlock error, and the async call will return Poll::Pending. The async runtime then does its thing so that it is notified when the network packet has arrives and can be read. It signals that through the provided Context, and calls poll_read again.

But because there is now a delay between the packet arriving, and the subsequent call to poll_read and the underlying read, the kernel has to stack the packet in some kind of in-kernel buffer, and then copy the bytes into the buffer provided by read on the next call. I don’t think that zero-copy IO is possible in such a case (again, my speculation).

Moving to io_uring, we again are in a situation where we provide the kernel with a buffer it can directly write into once the network packet arrives, so it should open us up again for zero-copy IO (speculation).


So what did we learn? That things are complicated for sure :-) But also that at least in terms of networking, we have some bounded buffers being pushed from the outside (the network) into our system at some arbitrary time. This does sound a bit more like Stream, right? Though with a Stream, you are still pulling data out of it (pun intended), whereas the network is pushing packets in from the outside.

Because of this push/poll difference, we might have to end up with some kind of buffering and copying either way, and it also opens up the other can of worms called backpressure that I don’t want to go into.

# Conversions

Early on, I said that Stream and AsyncRead are very similar, and there is ways to convert between either of the two. As we have learned, they differ primarily in the way they are handling buffers, apart from the fact that Stream/AsyncIterator is a lot broader, and covers a lot more usecases than just IO based on bytes.

To convert between the two, you can use the StreamReader and ReaderStream from tokio_util.

The StreamReader converts a Stream of byte chunks (aka Stream<Item = Result<Bytes, io::Error>>, though it is more generic) into an AsyncRead. The inverse ReaderStream converts an AsyncRead into a Stream of byte chunks.

But because both of these work with io::Error, we often have to convert some other errors into them.

As an example, reqwest with the stream feature offers a Response::bytes_stream method, but it gives you a stream with a reqwest::Error. You would have to convert those to io::Error first. You do that using TryStreamExt::map_err from the futures_util crate.

When you want to send a Stream, reqwest offers a reqwest::Body::wrap_stream, which accepts types convertible to Bytes and a Box<dyn Error> respectively. So depending on types, you would need a manual map_err here as well.


On the other end of the spectrum, moving from the client to the server, axum allows you to get an input stream using axum::Body::into_data_stream, which gives you a Stream<Item = Result<Bytes, axum::Error>>. In the other direction, you have axum::Body::from_stream, which is pretty much the same as reqwest::Body::wrap_stream, as it will take any types convertible to Bytes and Box<dyn Error> respectively.

So you will also need to use map_err if you want to echo the incoming stream right into the output.


Now, coming back to AsyncRead once more. Lets say we want to deal with compression. For this, we might want to use the async_compression crate. As we have two competing AsyncRead implementations, the crate has feature flags to offer either a futures-compatible AsyncRead, a tokio compatible one, or both.

As we have talked about previously, networked services deal with packets, and thus buffers, naturally, so in order to use async_compression with either reqwest or axum, we would need to translate between AsyncRead and Stream. I have outlined above that tokio_util offers conversion in both ways. The TryStreamExt from futures_util however only offers a conversion in one way, TryStreamExt::into_async_read. I haven’t found any function/type to do the inverse.

So we pick the tokio feature, and combine that with the tokio_util crate to convert between the two. Plus we have to pull in futures_util anyway, because we need the TryStreamExt::map_err to convert between error types.


There is two more topics to cover here. One is making these things a bit more dynamic. By which I mean, what if we only want to conditionally use async_compression. As Rust is strictly typed, we can’t just use for example ZstdEncoder (which implements AsyncRead) interchangably with axum::BodyDataStream (glossing over error conversion), unless…

Well unless we box both, and they both just become Pin<Box<dyn Stream>> (+ Send of cause). For this we need StreamExt::boxed from futures_util.


The last topic to cover is how to do proper testing. So I have some APIs that expect streams, and I would love to add some tests where I want to just provide some byte-stream literals, and compare the final output of a stream to a byte-string literal.

For the second part, there is TryStreamExt::try_collect, which piggybacks on the Extend trait. Which means it does not work with Vec<u8>, as that would use u8 items, and not Bytes. However, BytesMut will do the job here, you just have to manually annotate the type.

Going the other direction, from a &'static [u8] to a Stream is also surprisingly not easy. There is futures_util::stream::once, but it makes a future. Not a big deal, just wrap your value in an async block. For a more direct conversion, you could use tokio_stream::once.

Fun fact: tokio_stream also has a StreamExt extension trait. This also has a collect method, which works for Result just like Iterator::collect, but it does not work with Bytes, so it is not really usable for that purpose.

# Conclusion

So I went on quite a tour, explaining the similarities and differences between AsyncRead and Stream. Then I tried to give some pointers of how to convert between those two, and which crates you need for those tasks.

Unfortunately, there is no really simple solution here. And we have to pull in quite some crates, namely tokio_util, futures_util, of course bytes, and maybe even tokio_stream. We also have to constantly convert between error types with TryStreamExt::map_err.

And all of this for a fairly simple usecase. There is way more complex usecases as well I would imagine.