我如何在异步Rust中从一组futures中等待特定的结果?

huangapple go评论69阅读模式
英文:

How can I wait for a specific result from a pool of futures with async rust?

问题

使用 `futures` crate

我有一个返回布尔值的 futures 向量,我想专门等待返回 true  future

考虑以下一组 futures

```rust
async fn async_function(guess: u8) -> bool {
    let random_wait = rand::thread_rng().gen_range(0..2);
    std::thread::sleep(std::time::Duration::from_secs(random_wait));
    println!("Running guess {guess}");
    guess == 231
}

fn main() {
    let mut pool: Vec<impl Future<Output = bool>> = vec![];
    for guess in 0..=255 {
        pool.push(async_function(guess));
    }
}
  • 我如何等待向量中的 futures?
  • 是否可能等待直到只有一个 future 返回 true?
  • 我可以确定返回 true 的 future 的 guess 的值吗?

我是异步 Rust 的新手,所以我一直在查看 async-book。

从那里,这些是我考虑过的选项:

  • join! 会等待所有线程完成,所以对我来说行不通,因为我想放弃剩下的 futures。

  • select! 似乎不是一个选项,因为我需要在 select 块中指定特定的 future,我不准备编写 255 行的 select。

  • try_join! 诱使我破坏语义,并让我的 async_function 返回 Err(guess),以便它导致 try_join 退出并返回我想要的值。

  • 我尝试使用 async_fn(guess).boxed.into_stream() 然后使用 futures::stream 中的 select_all,但似乎不会并发运行。我看到我的异步函数按顺序运行。


<details>
<summary>英文:</summary>

Using the `futures` crate.

I have a vec of futures which return a bool and I want to wait specifically for the future that returns true.

consider the following pool of futures.

async fn async_function(guess: u8) -> bool {
let random_wait = rand::thread_rng().gen_range(0..2);
std::thread::sleep(std::time::Duration::from_secs(random_wait));
println!("Running guess {guess}");
guess == 231
}

fn main() {
let mut pool: Vec<impl Future<Output = bool>> = vec![];
for guess in 0..=255 {
pool.push(async_function(guess));
}
}


- How do I wait for the futures in the vec?
- Is it possible to wait until only one future returns true?
- Can I identify the value of guess for the future that returns true?

I&#39;m new to async rust, so I&#39;ve been looking at the async-book.

From there, these are the options I&#39;ve considered:

- `join!` waits until all threads are done, so that doesn&#39;t work for me since I want to drop the remaining futures.

- `select!` doesn&#39;t seem to be an option, because I need to specify the specific future in the select block and I&#39;m not about to make 255 line select.

- `try_join!` is tempting me to break semantics and have my async_function return `Err(guess)`so that it causes the try_join to exit and return the value I want.

- I tried using `async_fn(guess).boxed.into_stream()` and then using `select_all` from `futures::stream` but it doesn&#39;t seem to run concurrently. I see my async functions running in order.

</details>


# 答案1
**得分**: 1

以下是翻译好的部分:

"Ok, my thinking of futures was wrong. I knew that they weren't executed immediately, but I wasn't using the executors correctly from the futures crate.

Here's what I've got that seems to work.

let thread_pool = ThreadPool::new().unwrap();
let mut pool = vec![];
for guess in 0..=255 {
    let thread = thread_pool.spawn_with_handle(async_fn(guess)).expect("Failed to spawn thread");
    pool.push(thread.into_stream());
}

let stream = block_on_stream(futures::stream::select_all(pool));
for value in stream {
    println!("Got value {value}");
}

the thread pool executor is what creates the separate threads needed to run. Without this my application was single threaded so no matter what I tried, it would only run functions one at a time, not concurrently.

This way I spawn each request into the thread pool. The thread pool appears to spawn 4 background threads. By pushing them all into a stream, using select_all, and iterating over the stream, my main thread blocks until a new value is available.

There's always 4 workers and the thread pool is scheduling them in the order they were requested like a queue. This is perfect."

<details>
<summary>英文:</summary>

Ok, my thinking of futures was wrong. I knew that they weren&#39;t executed immediately, but I wasn&#39;t using the executors correctly from the futures crate.

Here&#39;s what I&#39;ve got that seems to work.

    let thread_pool = ThreadPool::new().unwrap();
    let mut pool = vec![];
    for guess in 0..=255 {
        let thread = thread_pool.spawn_with_handle(async_fn(guess)).expect(&quot;Failed to spawn thread&quot;);
        pool.push(thread.into_stream());
    }

    let stream = block_on_stream(futures::stream::select_all(pool));
    for value in stream {
        println!(&quot;Got value {value}&quot;);
    }

the thread pool executor is what creates the separate threads needed to run. Without this my application was single threaded so no matter what I tried, it would only run functions one at a time, not concurrently.

This way I spawn each request into the thread pool. The thread pool appears to spawn 4 background threads. By pushing them all into a stream, using select_all, and iterating over the stream, my main thread blocks until a new value is available.

There&#39;s always 4 workers and the thread pool is scheduling them in the order they were requested like a queue. This is perfect.

</details>



huangapple
  • 本文由 发表于 2023年2月10日 05:36:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/75404656.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定