The Complexities of Rust Async Streams
— 10 minI have been working on a usecase recently where I wanted to do some streaming uploads an downloads, both on the client- and the server-side.
And boy do I need to tell you, its way too confusing and complicated than it probably needs to be. The normal async ecosystem in Rust works quite well without too much hassle. But dealing with streams is a completely different story.
As I see it, there are four parts to this problem:
- There is two complementary, but also kind of competing concepts/traits:
Stream
andAsyncRead
. - Neither of them is a stable part of
std
, thoughStream
is included as the nightly-onlyAsyncIterator
. - There is two competing versions of
AsyncRead
within thefutures
andtokio
crates, and neither are a good fit for newer IO primitives likeio_uring
. - Last but not least, we have to deal with the annoyance of Rust error handling, and a large number of ecosystem crates.
#
AsyncRead
Alright, lets get started. First off, we have the AsyncRead
trait/s:
// the `futures` one:
trait AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, Error>>;
}
// the `tokio` one:
trait AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>>;
}
Both are nearly identical, the only difference is that tokio is using a &mut ReadBuf
instead of a &mut [u8]
.
As far as I understand, this is just a smarter type that can deal better with uninitialized memory.
In order to get a &mut [u8]
, you would either have to initialize (zero) the buffer first, or use unsafe code to tell
the compiler “trust me bro“.
You could think of this a little bit like an async fn
:
trait AsyncRead {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error>;
}
Though there is a subtle but very important difference. The async fn
version returns a future that you can poll
,
whereas with poll_read
, you poll the trait implementor directly, which might simplify some internals.
#
Stream
Now, the Stream
or AsyncIterator
trait looks different, and is more generic than AsyncRead
.
trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
Instead of just dealing with bytes and buffers, the Stream
is generic over anyItem
type.
As we are dealing with IO, bytes and buffer, and we also have to consider possible errors,
we typically have Item = Result<Bytes, io::Error>
.
#
io_uring
Now, both of these traits are not a good fit for io_uring
.
Why is that? Because of cancellation safety.
So what is cancellation safety you might ask. Consider that we call poll_read
with a &mut [u8]
.
We will then get back either a Poll::Ready
, which means the operation has been carried out immediately,
or we get a Poll::Pending
, which says “check back later” (once the Context
has been woken, that is).
In the second case, we can decide to just not call poll_read
ever again, and do something else with our &mut [u8]
.
This just fundamentally does not work with io_uring
. In that world, we submit a request to the kernel, and give it
a pointer to write into. The kernel then does that write whenever, and notifies us when it is finished.
During that time frame, we are not allowed to touch, and in particular to deallocate that &mut [u8]
.
The kernel essentially takes over ownership of that buffer.
For that reason, io_uring
-based APIs look different yet again, similar to this:
trait UringRead {
async fn read(&self, buf: Vec<u8>) -> (Result<usize, Error>, Vec<u8>);
}
I’m oversimplifying here, and there is no crate that actually looks like this. But the point I’m trying to make is this: We pass in ownership of a buffer, and at some later point we get that buffer back, along with a result.
This makes using these APIs slightly more inconvenient:
// instead of:
let mut buf = vec![0; 1024];
let bytes_read = reader.read(&mut buf).await?;
// we have to:
let (res, buf) = reader.read(vec![0; 1024]).await;
let bytes_read = res?;
Having to unwrap the result separately from the function call is a bit hurtful on my eyes, but I think I might get used to this after a while.
# How does IO actually work?
Beware, now I am moving into speculation territory. I am just a humble Rust developer, and no expert on how the (Linux) kernel actually works, and how it interfaces with hardware, so I might get things wrong here. I’m no describing how things are, but how I imagine they could be.
Lets walk back a little bit, and look at the evolution of IO, from sync, over async to io_uring
.
For this example, we are considering a network interface. Networks usually work in packets.
Each of those packets has a maximum transmission unit (MTU) of typically 1500 bytes
, and is received from the network
interface whenever.
Now, if we are doing a blocking call to read
, with a buffer that is large enough, the kernel will just wait for as long
as the packet arrives, and will write it directly to the provided buffer, in a zero-copy fashion (speculation on my end).
This is great, except the waiting part. Thats why we have async
in the first place, right?
In the async
world, using a more-or-less literal translation of the read
call, things look a bit different.
First, we need to explicitly switch to non-blocking mode. In the case of network, we can do that with TcpListener::set_nonblocking
.
The docs have the following to say:
If the IO operation could not be completed and needs to be retried, an error with kind
io::ErrorKind::WouldBlock
is returned.
So what happens is that we call read
once, but most likely we will get a WouldBlock
error, and the async call will return Poll::Pending
.
The async runtime then does its thing so that it is notified when the network packet has arrives and can be read.
It signals that through the provided Context
, and calls poll_read
again.
But because there is now a delay between the packet arriving, and the subsequent call to poll_read
and the underlying read
,
the kernel has to stack the packet in some kind of in-kernel buffer, and then copy the bytes into the buffer provided
by read
on the next call. I don’t think that zero-copy IO is possible in such a case (again, my speculation).
Moving to io_uring
, we again are in a situation where we provide the kernel with a buffer it can directly write into
once the network packet arrives, so it should open us up again for zero-copy IO (speculation).
So what did we learn? That things are complicated for sure :-)
But also that at least in terms of networking, we have some bounded buffers being pushed from the outside (the network)
into our system at some arbitrary time.
This does sound a bit more like Stream
, right?
Though with a Stream
, you are still pulling data out of it (pun intended), whereas the network is pushing packets
in from the outside.
Because of this push/poll difference, we might have to end up with some kind of buffering and copying either way, and it also opens up the other can of worms called backpressure that I don’t want to go into.
# Conversions
Early on, I said that Stream
and AsyncRead
are very similar, and there is ways to convert between either of the two.
As we have learned, they differ primarily in the way they are handling buffers,
apart from the fact that Stream
/AsyncIterator
is a lot broader, and covers a lot more usecases than just IO based on bytes.
To convert between the two, you can use the StreamReader
and ReaderStream
from tokio_util
.
The StreamReader
converts a Stream
of byte chunks (aka Stream<Item = Result<Bytes, io::Error>>
, though it is more generic) into an AsyncRead
.
The inverse ReaderStream
converts an AsyncRead
into a Stream
of byte chunks.
But because both of these work with io::Error
, we often have to convert some other errors into them.
As an example, reqwest
with the stream
feature offers a Response::bytes_stream
method, but it gives you a stream
with a reqwest::Error
. You would have to convert those to io::Error
first.
You do that using TryStreamExt::map_err
from the futures_util
crate.
When you want to send a Stream
, reqwest
offers a reqwest::Body::wrap_stream
, which accepts types convertible to Bytes
and
a Box<dyn Error>
respectively. So depending on types, you would need a manual map_err
here as well.
On the other end of the spectrum, moving from the client to the server, axum
allows you to get an input stream using
axum::Body::into_data_stream
, which gives you a Stream<Item = Result<Bytes, axum::Error>>
.
In the other direction, you have axum::Body::from_stream
, which is pretty much the same as reqwest::Body::wrap_stream
,
as it will take any types convertible to Bytes
and Box<dyn Error>
respectively.
So you will also need to use map_err
if you want to echo the incoming stream right into the output.
Now, coming back to AsyncRead
once more. Lets say we want to deal with compression. For this, we might want to use the
async_compression
crate. As we have two competing AsyncRead
implementations, the crate has feature flags to offer
either a futures
-compatible AsyncRead
, a tokio
compatible one, or both.
As we have talked about previously, networked services deal with packets, and thus buffers, naturally, so in order to
use async_compression
with either reqwest
or axum
, we would need to translate between AsyncRead
and Stream
.
I have outlined above that tokio_util
offers conversion in both ways. The TryStreamExt
from futures_util
however
only offers a conversion in one way, TryStreamExt::into_async_read
. I haven’t found any function/type to do the inverse.
So we pick the tokio
feature, and combine that with the tokio_util
crate to convert between the two. Plus we have
to pull in futures_util
anyway, because we need the TryStreamExt::map_err
to convert between error types.
There is two more topics to cover here. One is making these things a bit more dynamic. By which I mean, what if we only
want to conditionally use async_compression
. As Rust is strictly typed, we can’t just use for example ZstdEncoder
(which implements AsyncRead
) interchangably with axum::BodyDataStream
(glossing over error conversion), unless…
Well unless we box both, and they both just become Pin<Box<dyn Stream>>
(+ Send
of cause).
For this we need StreamExt::boxed
from futures_util
.
The last topic to cover is how to do proper testing. So I have some APIs that expect streams, and I would love to add some tests where I want to just provide some byte-stream literals, and compare the final output of a stream to a byte-string literal.
For the second part, there is TryStreamExt::try_collect
, which piggybacks on the Extend
trait. Which means it does
not work with Vec<u8>
, as that would use u8
items, and not Bytes
. However, BytesMut
will do the job here,
you just have to manually annotate the type.
Going the other direction, from a &'static [u8]
to a Stream
is also surprisingly not easy. There is futures_util::stream::once
,
but it makes a future. Not a big deal, just wrap your value in an async block. For a more direct conversion, you could use
tokio_stream::once
.
Fun fact: tokio_stream
also has a StreamExt
extension trait. This also has a collect
method, which works for Result
just like Iterator::collect
, but it does not work with Bytes
, so it is not really usable for that purpose.
# Conclusion
So I went on quite a tour, explaining the similarities and differences between AsyncRead
and Stream
.
Then I tried to give some pointers of how to convert between those two, and which crates you need for those tasks.
Unfortunately, there is no really simple solution here. And we have to pull in quite some crates, namely tokio_util
,
futures_util
, of course bytes
, and maybe even tokio_stream
. We also have to constantly convert between error types
with TryStreamExt::map_err
.
And all of this for a fairly simple usecase. There is way more complex usecases as well I would imagine.