Future's liveness problem

  • 18th Jun 2024
  •  • 
  • 10 min read
  •  • 
  • Tags: 
  • rust
  • async

Introduction

Rust's futures work differently to how most other languages support asynchronous computation; they aren't guaranteed to make progress unless you poll them regularly. Specifically, Future has a poll function that takes a Waker and when the future can make progress it calls wake on the Waker and "something" should then call poll again. This is mostly handled by the executor (tokio, embassy, etc.), which polls the top-level future (task) which then calls poll on the future it's holding, and so on until it reaches the bottom-level future that scheduled the call to wake the Waker. This works brilliantly unless something interrupts that chain, causing the bottom-level future to be starved of poll calls and therefore not make progress.

I think Future needs an additional rule that bans this possibility, by requiring that the poll function of every Future that was given the Waker is polled when the task is polled. This rule is annoyingly hard to enforce but has some interesting consequences, especially around async iterators where I think it all but enforces a specific design (poll_progress for those that have been following that thread).

First I should show you the kind of things that can go wrong.

Problems caused by starvation

AsyncMutex deadlocks

Consider the following code (admittedly simplified to the point of being weird):

#[tokio::main]
async fn main() {
    // Create an asynchronous mutex.
    // (what's inside it doesn't matter)
    let mutex = tokio::sync::Mutex::new(());
    let lock = mutex.lock().await;

    // This `a` future:
    // 1. Prints "A1"
    // 2. Locks the mutex for itself
    // 3. Immediately unlocks the mutex
    // 4. Prints "A2"
    let a = Box::pin(async {
        println!("A1");
        let _ = mutex.lock().await;
        println!("A2");
    });
    // This `b` future:
    // 1. Prints "B1"
    // 2. Drops the lock that was acquired
    //    at the top.
    let b = Box::pin(async {
        println!("B1");
        drop(lock);
        println!("B2");
    });

    // This select waits for whichever of
    // `a` or `b` completes first and then
    // returns the future that didn't complete.
    // We don't drop that future until the end
    // of this function.
    let _keep = futures::future::select(a, b).await;

    println!("Main 1");
    // Get a second lock on the mutex,
    // the first should have been dropped
    // by b.
    let _ = mutex.lock().await;
    println!("Main 2");
}

This code reliably deadlocks when run, after outputting:

A1
B1
B2
Main 1

The reason for the deadlock is that:

  1. main obtains a lock on the mutex.
  2. a tries to obtain another lock and gets added to the mutex's wait queue.
  3. b drops the lock which was acquired in main, this wakes up a and it will now acquire the lock the next time it's polled.
  4. b, and therefore the select, completes but a is still alive and runnable and now we'll never call poll on it again.
  5. main tries to acquire the lock again, but it's blocked by a which is ahead of it on the mutex's queue so it returns pending and will never be woken up again.

If select dropped a then it would free up its slot in the mutex's queue and would allow the code to continue, but because a is still alive it deadlocks.

Unexpected timeouts

A slightly more realistic scenario occurs when you have a buffered stream of futures that you iterate over:

#[tokio::main]
async fn main() {
    let stream = (0..num_items())
        .map(|i| async { get_work_item(i).await })
        .buffered(3);
    while let Some(item) = stream.next().await {
        process_work_item(item).await;
    }
}

This looks quite neat, but while process_work_item is being awaited the stream isn't so the futures in the buffer are not making progress. If, for example, get_work_item is retrieving data over a network connection that requires keep-alive messages, then the connection could time out while the stream is processing an item.

The proposed rule

To fix this we need to ensure that when a future requests that it should be polled (by calling wake on its Waker) it is polled again "in a timely manner". The executor's job is the "in a timely manner" bit, it should poll the top-level future (the task) quickly after the wake has happened. When writing a future it's our job to make sure that poll actually reaches the bottom-level future that requested the wake.

For example, say we're writing a future that has two child futures (A and B) and we want them to run concurrently. When we're polled the first time, we call poll on both A and B with the waker that was passed to us, and both of the return Pending. Some time later, the waker is triggered and we get polled again, when this happens we must (this is the proposed rule) call poll on both A and B again, even if we're only interested in A right now (but will be interested in B later, otherwise we'd just drop B). This is because we don't know which of them triggered the wake, and if we don't poll B then it won't get to do ("in a timely manner") the thing that it wanted to do.

Specifically I think something like the following text should be added to the poll method in the Future docs:

If you call poll and it returns Poll::Pending, when the waker you provided is woken and the task is next polled, you must either call poll again or drop the future.

This text is more general than the above because I want it to apply to cases like poll_read when you're not a Future but you are calling one. I'm not completely happy with the language but that can be sorted out.

This rule bans futures' select and FuturesUnordered. The fix to select is relatively simple: just drop the future that lost the race like futures-lite's race function. Fixing FuturesUnordered is harder, I think it also requires fixing async iterators. Dropping futures all the time does put more pressure on cancellation safety, but that may be a good thing as it's a weak point and making it harder to skate around could end up making everything more resilient. Interestingly tokio's select macro does not break this rule because it takes ownership of the futures passed to it and drops them if they lose the race, however it frequently causes the same problems because people use it with async iterators (discussed below) and due to the following problem.

Annoyingly, it's possible to break this rule today using async/await syntax as &mut Future implements Future, so you can write code like this:

async {
    let mut a = pin!(async { /* ... */ });
    do_thing(&mut a).await;
    // a still lives
}

In this example, if do_thing returns without polling a to completion, even if it follows the rule it will only drop &mut Future and a still remains live in the async block. This is a problem, but I think it's possible to add a lint here which warns against holding futures over an await point if they might have already been polled1.

Async Iterators

This rule has surprisingly strong implications for async iterators: one of the big motivations for async iterators is to replace select in a loop with a merge operation. The merge would combines two streams by racing them for the next value, but importantly that leaves the iterator that lost the race polled without completing. If we want streams to not suffer from starvation then they must also follow this rule, which means that the merge stream must have a way of polling the stream that lost the race.

There are two main options for async iterators that satisfy this (and we can adopt both): internal iteration as discussed by tmandry in this post and poll_progress2 as detailed by withoutboats in this post.

Internal iteration solves the problem by taking control over the iteration, allowing the async iterator to poll it's own internal state and "the body of the for loop" at the same time. However, internal iteration has a few problems which is why Rust switched from them in 2013 (unfortunately most of the discussion here was on the old mailing list which is hard to link to directly, the dump is here though). Chiefly, they're harder to abstract over and break/continue/return don't (necessarily) work correctly, which means the borrow and liveness checkers can't rely on them, which turns out to be quite a hassle.

poll_progress is a recent proposal that allows external iteration by adding a method that allows the iterator to be polled without advancing it. This lets you have your cake and eat it too, and if for await desugars to using this then it would be very easy to use as well.

This doesn't work very well with the async fn next(&self) -> Option<T> proposal for async iterators. For a merged stream the only way to make this work would be to drop the future created by the losing stream, and in general that might lose the element from that stream3. You can't hold onto the future because it can't be polled without calling next again (and you might not want the next element yet) and it would force the async iterator to be pinned, which proponents have been trying to avoid.

What now?

I think this rule would fix a few issues with async in Rust today, and importantly would guide decision making for future (pun intended) enhancements that would make them more reliable. I don't think it can be completely enforced (so unsafe code couldn't rely on it) for backwards compatibility and because it's not obviously possible to enforce in async code, but adding text to the Future trait and a lint for async code would be enough.

Thanks to Yosh Wuyts and Belle Atkins for discussing and reviewing drafts. Discuss this on r/rust.


1

I think we'd also need to decide on whether a future returned by an async function (that is a nested future) is allowed to be already partially polled. I think no, so that the caller can hold onto it for a bit. However, if we did that would make something like future's select correct, and holding the future returned by it over an await point the bug.

2

The original idea appears to have come from Alice Ryhl here.

3

Consider a stream created from a single future that only yields one item. next has to return exactly that future, but if it's dropped then you have no way of recreating it.