Swatinem Blog Resume

Rust Futures and Tasks

A tale of concurrency and parallelism

— 10 min

With the recent talk about “Contexts” in the Rust community, and some other thoughts I had recently, I want to explore in a bit more detail what the difference between Futures and Tasks is in Rust.

The difference between Futures and Tasks is like the difference between concurrency and parallelism.

The difference is quite subtle, even considering just the words. I don’t even know if my native german language even has different words for those concepts?

Lets look at a small snippet of the Wikipedia entry on Concurrency:

Concurrency is not parallelism: concurrency is about dealing with lots of things at once but parallelism is about doing lots of things at once. Concurrency is about structure, parallelism is about execution.

Well that is not all too helpful. Lets maybe approach this question from a different perspective then.

I we think of everyday life, we sometimes do actual work, or we are just waiting for something to happen. Waiting would be boring though. So what do we humans do in such a case? We start working on something else in the meantime. We call this “multitasking”.

NOTE: While explaining this to my wife, we figured out that humans use the word multitasking for both concepts. Oh well.

The basic functionality that Futures provide us with their conveniently named await keyword is the ability to say:

Hey! I’m not doing any useful work right now, I’m just waiting.

This waiting can be anything. Waiting for time to pass, for things to arrive on the network, or for a computation to finish.

We can create a simple future that simulates this:

async fn future(prefix: &str, num: usize) {
    // simulates some IO wait
    tokio::time::sleep(Duration::from_millis(400)).await;

    // simulates some CPU workload
    thread::sleep(Duration::from_millis(100));

    println!("{}-{} finished", prefix, num);
}

If we want to execute a couple of these, we can first try to do so one after the other:

let non_static_str = String::from("serial");
let start = Instant::now();
let futures = (0..n).map(|i| future(&non_static_str, i));
for future in futures {
    future.await;
}
println!("{}: {:?}", non_static_str, start.elapsed());

With this output:

serial-0 finished
serial-1 finished
serial-2 finished
serial-3 finished
serial-4 finished
serial-5 finished
serial-6 finished
serial-7 finished
serial: 4.1303916s

Or showing this visually:

┌─────────────┬─────────┬─────────────┬─────────┬───┐
│ … waiting … │ working │ … waiting … │ working │ … │
└─────────────┴─────────┴─────────────┴─────────┴───┘

Not too efficient. We haven’t gained anything. But we can improve this if we know that the work we want to do is sufficiently independent of each other.

# Concurrency

I think this is what the Wikipedia article meant by “Concurrency is about structure”:

let non_static_str = String::from("futures");
let start = Instant::now();
let futures = (0..n).map(|i| future(&non_static_str, i));
join_all(futures).await;
println!("{}: {:?}", non_static_str, start.elapsed());

And lets run this:

futures-0 finished
futures-1 finished
futures-2 finished
futures-3 finished
futures-4 finished
futures-5 finished
futures-6 finished
futures-7 finished
futures: 1.2765159s

Or showing this visually:

┌─────────────┬─────────┬─────────┬───┐
│ … waiting … │ working │ working │ … │
└─────────────┴─────────┴─────────┴───┘

Adding concurrency here means that we were able to compress overlapping wait times. This is good, but not quite optimal as the future itself only uses a single thread of execution. So if we only ever use futures::future::join_all, Rust is no different from other programming languages that have a single threaded event loop.

# Parallelism

Lets try to add the “Parallelism is about execution” part by using tokio::spawn to turn the future into an independent task.

let non_static_str = String::from("tasks");
let start = Instant::now();
let futures = (0..n).map(|i| tokio::spawn(future(&non_static_str, i)));
join_all(futures).await;
println!("{}: {:?}", non_static_str, start.elapsed());

Well, it turns out that this is not possible quite that easily:

error[E0597]: `non_static_str` does not live long enough
  --> playground\futuresntasks\src\main.rs:XX:55
   |
XX |     let futures = (0..n).map(|i| tokio::spawn(future(&non_static_str, i)));
   |                              ---              --------^^^^^^^^^^^^^^----
   |                              |                |       |
   |                              |                |       borrowed value does not live long enough
   |                              |                argument requires that `non_static_str` is borrowed for `'static`
   |                              value captured here
...
XX | }
   | - `non_static_str` dropped here while still borrowed

Turns out our futures are not sufficiently independent, which in the case of tasks means they need to be fully independent. If we look at the signature of the tokio::spawn function, the 'static lifetime signifies this requirement. And the Send lifetime is the part that makes this truly parallel (or rather, which makes this parallelism safe).

pub fn spawn<T>(future: T) -> JoinHandle<T::Output> where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{}

So in order to make the future fully independent, we can give it its own copy of all the data it needs to access:

let non_static_str = String::from("tasks");
let start = Instant::now();
let futures = (0..n).map(|i| {
    let cloned_str = non_static_str.clone(); // <-
    tokio::spawn(async move { future(&cloned_str, i).await })
});
join_all(futures).await;
println!("{}: {:?}", non_static_str, start.elapsed());

With each task being completely independent now, we can run out example again:

tasks-5 finished
tasks-4 finished
tasks-7 finished
tasks-0 finished
tasks-2 finished
tasks-1 finished
tasks-6 finished
tasks-3 finished
tasks: 513.6456ms

Or showing this visually:

          ┌─────────────┬─────────┐
Thread 1: │ … waiting … │ working │
          ├─────────────┼─────────┤
Thread 2: │ … waiting … │ working │
          └─────────────┴─────────┘
Thread n: …

This is nice! We have parallelized our tasks across multiple threads. However this comes at a price, in this case cloning of our data.

# Mutability

We have learned that futures-level concurrency can use shared data without the need for cloning. So how does this work with mutability?

async fn mut_future(num: usize, finished: &mut Vec<usize>) {
    finished.push(num);
}

let mut finished = vec![];
mut_future(0, &mut finished).await;

let futures = (0..n).map(|i| mut_future(i, &mut finished));
join_all(futures).await;

println!("finished tasks: {:?}", finished);

Well, we are out of luck here, as rust enforces the same borrowing rules as for other mutable borrows:

error: captured variable cannot escape `FnMut` closure body
  --> playground\futuresntasks\src\main.rs:XX:34
   |
XX |     let mut finished = vec![];
   |         ------------ variable defined here
XX |     let futures = (0..n).map(|i| mut_future(i, &mut finished));
   |                                - ^^^^^^^^^^^^^^^^^^^--------^
   |                                | |                  |
   |                                | |                  variable captured here
   |                                | returns a reference to a captured variable which escapes the closure body   |                                inferred to be a `FnMut` closure
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape

Rust thus forces us to use on of the shared mutability primitives. Lets give that a try:

let finished = Rc::new(std::cell::RefCell::new(vec![]));
let futures = (0..n).map(|i| {
    let finished = finished.clone();
    async move {
        let mut finished = finished.borrow_mut();
        mut_future(i, &mut finished).await
    }
});
join_all(futures).await;
let finished = Rc::try_unwrap(finished).unwrap().into_inner();
println!("finished tasks: {:?}", finished);

NOTE: The example also works without Rc, I just wanted to highlight the usage of lightweight shared mutability/ownership primitives. This also proves our observation from above that we are single threaded.

We are also using RefCell here, which “Panics if the value is currently borrowed”. That can very well happen in case of concurrency. We are just lucky that out mut_future does not actually await internally. This type is thus a prime candidate for the proposed must_not_suspend lint.

The safety guarantees of Rust are about memory safety and avoiding memory races. You can still do dumb stuff like the above in safe Rust, along with leaking memory, introducing deadlocks, and having logic bugs.

If we go further and try to introduce a tokio::spawn like before, Rust will dutifully remind us that we need to use the thread safe companions of Rc/RefCell in that case:

error: future cannot be sent between threads safely
--> playground\futuresntasks\src\main.rs:XXX:9
    |
XXX |         tokio::spawn(async move {
    |         ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `Rc<RefCell<Vec<usize>>>`
note: captured value is not `Send`
--> playground\futuresntasks\src\main.rs:XXX:32
    |
XXX |             let mut finished = finished.borrow_mut();
    |                                ^^^^^^^^ has type `Rc<RefCell<Vec<usize>>>` which is not `Send`
note: required by a bound in `tokio::spawn`
--> tokio-1.15.0\src\task\spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`
error: future cannot be sent between threads safely
--> playground\futuresntasks\src\main.rs:XXX:9
    |
XXX |         tokio::spawn(async move {
    |         ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `Sync` is not implemented for `Cell<isize>`
note: future is not `Send` as this value is used across an await
--> playground\futuresntasks\src\main.rs:XXX:13
    |
XXX |             let mut finished = finished.borrow_mut();
    |                 ------------ has type `RefMut<'_, Vec<usize>>` which is not `Send`
XXX |             mut_future(i, &mut finished).await
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut finished` maybe used later
XXX |         })
    |         - `mut finished` is later dropped here
note: required by a bound in `tokio::spawn`
--> tokio-1.15.0\src\task\spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

Going one step further, we even start using tokio::sync::Mutex, which is a locking primitive better optimized for async contexts, as it allows us to literally “wait for the lock”, and to hold it across await points. If we change our type to Arc<tokio::sync::Mutex<Vec<_>>>, along with necessary code changes, we come up with a working solution:

let finished = Arc::new(tokio::sync::Mutex::new(vec![]));
let futures = (0..n).map(|i| {
    let finished = finished.clone();
    tokio::spawn(async move {
        let mut finished = finished.lock().await;
        mut_future(i, &mut finished).await
    })
});
join_all(futures).await;
let finished = Arc::try_unwrap(finished).unwrap().into_inner();
println!("finished tasks: {:?}", finished);

# Conclusion

Well there we have it. There is pros and cons to both futures (well, futures::join in that sense) and tasks.

So which one should you chose? Well that is totally up to you! What I haven’t done here is actually measure things. Such as: What is the cost of the additional Clone-ing? How much more work is put on the Runtime/executor? How does this change the throughput of the complete system? The average latency and the latency distribution? In the end this very much depends on the system.

But what I can tell you from experience is that especially refactoring from Futures to Tasks can be painful sometimes. We have seen an example of a “future is not Send error. We were lucky in this case because the compiler told us exactly why. I had to struggle with other cases in which the compiler presented me with a list of completely undecipherable types that was not at all helpful.

# Threads and Cores

While writing this, it also came to me that we have this multiplexing on different layers.

Futures concurrency is scheduling/multiplexing multiple computations on a single thread. Tasks and executors schedule M tasks on N operating system threads. Similarly the operating system also schedules M threads to N processor cores.

The idea here is that higher level we go, we have less overhead. Scheduling tasks on an executor or threads on the operating system is not free.

And in the end, we have a completely different beast altogether. Modern CPUs have this thing called simultaneous multithreading, or hardware multithreading. This is when the CPU offers more logical cores than it has physical cores. So it can do actual work while other threads are waiting for data to be copied from main memory into CPU caches. Or other neat tricks to speed up the total throughput of the system.