Rust Futures and Tasks
A tale of concurrency and parallelism
— 7 minWith the recent talk about “Contexts” in the Rust community, and some other thoughts I had recently, I want to explore in a bit more detail what the difference between Futures and Tasks is in Rust.
The difference between Futures and Tasks is like the difference between concurrency and parallelism.
The difference is quite subtle, even considering just the words. I don’t even know if my native german language even has different words for those concepts?
Lets look at a small snippet of the Wikipedia entry on Concurrency:
Concurrency is not parallelism: concurrency is about dealing with lots of things at once but parallelism is about doing lots of things at once. Concurrency is about structure, parallelism is about execution.
Well that is not all too helpful. Lets maybe approach this question from a different perspective then.
I we think of everyday life, we sometimes do actual work, or we are just waiting for something to happen. Waiting would be boring though. So what do we humans do in such a case? We start working on something else in the meantime. We call this “multitasking”.
NOTE: While explaining this to my wife, we figured out that humans use the word multitasking for both concepts. Oh well.
The basic functionality that Futures provide us with their conveniently named
await
keyword is the ability to say:
Hey! I’m not doing any useful work right now, I’m just waiting.
This waiting can be anything. Waiting for time to pass, for things to arrive on the network, or for a computation to finish.
We can create a simple future that simulates this:
async fn future(prefix: &str, num: usize) {
// simulates some IO wait
tokio::time::sleep(Duration::from_millis(400)).await;
// simulates some CPU workload
thread::sleep(Duration::from_millis(100));
println!("{}-{} finished", prefix, num);
}
If we want to execute a couple of these, we can first try to do so one after the other:
let non_static_str = String::from("serial");
let start = Instant::now();
let futures = (0..n).map(|i| future(&non_static_str, i));
for future in futures {
future.await;
}
println!("{}: {:?}", non_static_str, start.elapsed());
With this output:
serial-0 finished
serial-1 finished
serial-2 finished
serial-3 finished
serial-4 finished
serial-5 finished
serial-6 finished
serial-7 finished
serial: 4.1303916s
Or showing this visually:
┌─────────────┬─────────┬─────────────┬─────────┬───┐
│ … waiting … │ working │ … waiting … │ working │ … │
└─────────────┴─────────┴─────────────┴─────────┴───┘
Not too efficient. We haven’t gained anything. But we can improve this if we know that the work we want to do is sufficiently independent of each other.
# Concurrency
I think this is what the Wikipedia article meant by “Concurrency is about structure”:
let non_static_str = String::from("futures");
let start = Instant::now();
let futures = (0..n).map(|i| future(&non_static_str, i));
join_all(futures).await;
println!("{}: {:?}", non_static_str, start.elapsed());
And lets run this:
futures-0 finished
futures-1 finished
futures-2 finished
futures-3 finished
futures-4 finished
futures-5 finished
futures-6 finished
futures-7 finished
futures: 1.2765159s
Or showing this visually:
┌─────────────┬─────────┬─────────┬───┐
│ … waiting … │ working │ working │ … │
└─────────────┴─────────┴─────────┴───┘
Adding concurrency here means that we were able to compress overlapping wait times.
This is good, but not quite optimal as the future itself only uses a single
thread of execution. So if we only ever use futures::future::join_all
, Rust is
no different from other programming languages that have a single threaded event
loop.
# Parallelism
Lets try to add the “Parallelism is about execution” part
by using tokio::spawn
to turn the future into an independent task.
let non_static_str = String::from("tasks");
let start = Instant::now();
let futures = (0..n).map(|i| tokio::spawn(future(&non_static_str, i)));
join_all(futures).await;
println!("{}: {:?}", non_static_str, start.elapsed());
Well, it turns out that this is not possible quite that easily:
error[E0597]: `non_static_str` does not live long enough
--> playground\futuresntasks\src\main.rs:XX:55
|
XX | let futures = (0..n).map(|i| tokio::spawn(future(&non_static_str, i)));
| --- --------^^^^^^^^^^^^^^----
| | | |
| | | borrowed value does not live long enough
| | argument requires that `non_static_str` is borrowed for `'static`
| value captured here
...
XX | }
| - `non_static_str` dropped here while still borrowed
Turns out our futures are not sufficiently independent, which in the case of
tasks means they need to be fully independent. If we look at the signature of
the tokio::spawn
function, the 'static
lifetime signifies this requirement.
And the Send
lifetime is the part that makes this truly parallel (or rather,
which makes this parallelism safe).
pub fn spawn<T>(future: T) -> JoinHandle<T::Output> where
T: Future + Send + 'static,
T::Output: Send + 'static,
{}
So in order to make the future fully independent, we can give it its own copy of all the data it needs to access:
let non_static_str = String::from("tasks");
let start = Instant::now();
let futures = (0..n).map(|i| {
let cloned_str = non_static_str.clone(); // <-
tokio::spawn(async move { future(&cloned_str, i).await })
});
join_all(futures).await;
println!("{}: {:?}", non_static_str, start.elapsed());
With each task being completely independent now, we can run out example again:
tasks-5 finished
tasks-4 finished
tasks-7 finished
tasks-0 finished
tasks-2 finished
tasks-1 finished
tasks-6 finished
tasks-3 finished
tasks: 513.6456ms
Or showing this visually:
┌─────────────┬─────────┐
Thread 1: │ … waiting … │ working │
├─────────────┼─────────┤
Thread 2: │ … waiting … │ working │
└─────────────┴─────────┘
Thread n: …
This is nice! We have parallelized our tasks across multiple threads. However this comes at a price, in this case cloning of our data.
# Mutability
We have learned that futures-level concurrency can use shared data without the need for cloning. So how does this work with mutability?
async fn mut_future(num: usize, finished: &mut Vec<usize>) {
finished.push(num);
}
let mut finished = vec![];
mut_future(0, &mut finished).await;
let futures = (0..n).map(|i| mut_future(i, &mut finished));
join_all(futures).await;
println!("finished tasks: {:?}", finished);
Well, we are out of luck here, as rust enforces the same borrowing rules as for other mutable borrows:
error: captured variable cannot escape `FnMut` closure body
--> playground\futuresntasks\src\main.rs:XX:34
|
XX | let mut finished = vec![];
| ------------ variable defined here
XX | let futures = (0..n).map(|i| mut_future(i, &mut finished));
| - ^^^^^^^^^^^^^^^^^^^--------^
| | | |
| | | variable captured here
| | returns a reference to a captured variable which escapes the closure body | inferred to be a `FnMut` closure
|
= note: `FnMut` closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
Rust thus forces us to use on of the shared mutability primitives. Lets give that a try:
let finished = Rc::new(std::cell::RefCell::new(vec![]));
let futures = (0..n).map(|i| {
let finished = finished.clone();
async move {
let mut finished = finished.borrow_mut();
mut_future(i, &mut finished).await
}
});
join_all(futures).await;
let finished = Rc::try_unwrap(finished).unwrap().into_inner();
println!("finished tasks: {:?}", finished);
NOTE: The example also works without Rc
, I just wanted to highlight the usage
of lightweight shared mutability/ownership primitives. This also proves our
observation from above that we are single threaded.
We are also using RefCell
here, which “Panics if the value is currently borrowed”.
That can very well happen in case of concurrency. We are just lucky that out
mut_future
does not actually await
internally. This type is thus a prime
candidate for the proposed must_not_suspend
lint.
The safety guarantees of Rust are about memory safety and avoiding memory races. You can still do dumb stuff like the above in safe Rust, along with leaking memory, introducing deadlocks, and having logic bugs.
If we go further and try to introduce a tokio::spawn
like before, Rust will
dutifully remind us that we need to use the thread safe companions of
Rc
/RefCell
in that case:
error: future cannot be sent between threads safely
--> playground\futuresntasks\src\main.rs:XXX:9
|
XXX | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `Rc<RefCell<Vec<usize>>>`
note: captured value is not `Send`
--> playground\futuresntasks\src\main.rs:XXX:32
|
XXX | let mut finished = finished.borrow_mut();
| ^^^^^^^^ has type `Rc<RefCell<Vec<usize>>>` which is not `Send`
note: required by a bound in `tokio::spawn`
--> tokio-1.15.0\src\task\spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: future cannot be sent between threads safely
--> playground\futuresntasks\src\main.rs:XXX:9
|
XXX | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `Sync` is not implemented for `Cell<isize>`
note: future is not `Send` as this value is used across an await
--> playground\futuresntasks\src\main.rs:XXX:13
|
XXX | let mut finished = finished.borrow_mut();
| ------------ has type `RefMut<'_, Vec<usize>>` which is not `Send`
XXX | mut_future(i, &mut finished).await
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut finished` maybe used later
XXX | })
| - `mut finished` is later dropped here
note: required by a bound in `tokio::spawn`
--> tokio-1.15.0\src\task\spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
Going one step further, we even start using tokio::sync::Mutex
, which is
a locking primitive better optimized for async contexts, as it allows us to
literally “wait for the lock”, and to hold it across await
points.
If we change our type to Arc<tokio::sync::Mutex<Vec<_>>>
, along with necessary
code changes, we come up with a working solution:
let finished = Arc::new(tokio::sync::Mutex::new(vec![]));
let futures = (0..n).map(|i| {
let finished = finished.clone();
tokio::spawn(async move {
let mut finished = finished.lock().await;
mut_future(i, &mut finished).await
})
});
join_all(futures).await;
let finished = Arc::try_unwrap(finished).unwrap().into_inner();
println!("finished tasks: {:?}", finished);
# Conclusion
Well there we have it. There is pros and cons to both futures (well, futures::join
in that sense) and tasks.
- Futures can work with non-
'static
references. - Tasks on the other hand need to own all their data, leading to more
Clone
-ing orArc
-ing. - Futures are essentially single-threaded.
- While Tasks can spread CPU intensive work across more cores.
- UPDATE: almost forget the most important difference:
- Futures need to be actively polled.
- While Tasks are fire and forget. Cancellation works using the
JoinHandle::abort
API.
So which one should you chose? Well that is totally up to you! What I haven’t
done here is actually measure things. Such as:
What is the cost of the additional Clone
-ing? How much more work is put on the
Runtime/executor? How does this change the throughput of the complete system?
The average latency and the latency distribution? In the end this very much
depends on the system.
But what I can tell you from experience is that especially refactoring from
Futures to Tasks can be painful sometimes. We have seen an example of
a “future is not Send
” error. We were lucky in this case because the compiler
told us exactly why. I had to struggle with other cases in which the compiler
presented me with a list of completely undecipherable types that was not at all
helpful.
# Threads and Cores
While writing this, it also came to me that we have this multiplexing on different layers.
Futures concurrency is scheduling/multiplexing multiple computations on a single thread. Tasks and executors schedule M tasks on N operating system threads. Similarly the operating system also schedules M threads to N processor cores.
The idea here is that higher level we go, we have less overhead. Scheduling tasks on an executor or threads on the operating system is not free.
And in the end, we have a completely different beast altogether. Modern CPUs have this thing called simultaneous multithreading, or hardware multithreading. This is when the CPU offers more logical cores than it has physical cores. So it can do actual work while other threads are waiting for data to be copied from main memory into CPU caches. Or other neat tricks to speed up the total throughput of the system.