英文:
How can I wait for a specific result from a pool of futures with async rust?
问题
使用 `futures` crate。
我有一个返回布尔值的 futures 向量,我想专门等待返回 true 的 future。
考虑以下一组 futures。
```rust
async fn async_function(guess: u8) -> bool {
let random_wait = rand::thread_rng().gen_range(0..2);
std::thread::sleep(std::time::Duration::from_secs(random_wait));
println!("Running guess {guess}");
guess == 231
}
fn main() {
let mut pool: Vec<impl Future<Output = bool>> = vec![];
for guess in 0..=255 {
pool.push(async_function(guess));
}
}
- 我如何等待向量中的 futures?
- 是否可能等待直到只有一个 future 返回 true?
- 我可以确定返回 true 的 future 的 guess 的值吗?
我是异步 Rust 的新手,所以我一直在查看 async-book。
从那里,这些是我考虑过的选项:
-
join!
会等待所有线程完成,所以对我来说行不通,因为我想放弃剩下的 futures。 -
select!
似乎不是一个选项,因为我需要在 select 块中指定特定的 future,我不准备编写 255 行的 select。 -
try_join!
诱使我破坏语义,并让我的 async_function 返回Err(guess)
,以便它导致 try_join 退出并返回我想要的值。 -
我尝试使用
async_fn(guess).boxed.into_stream()
然后使用futures::stream
中的select_all
,但似乎不会并发运行。我看到我的异步函数按顺序运行。
<details>
<summary>英文:</summary>
Using the `futures` crate.
I have a vec of futures which return a bool and I want to wait specifically for the future that returns true.
consider the following pool of futures.
async fn async_function(guess: u8) -> bool {
let random_wait = rand::thread_rng().gen_range(0..2);
std::thread::sleep(std::time::Duration::from_secs(random_wait));
println!("Running guess {guess}");
guess == 231
}
fn main() {
let mut pool: Vec<impl Future<Output = bool>> = vec![];
for guess in 0..=255 {
pool.push(async_function(guess));
}
}
- How do I wait for the futures in the vec?
- Is it possible to wait until only one future returns true?
- Can I identify the value of guess for the future that returns true?
I'm new to async rust, so I've been looking at the async-book.
From there, these are the options I've considered:
- `join!` waits until all threads are done, so that doesn't work for me since I want to drop the remaining futures.
- `select!` doesn't seem to be an option, because I need to specify the specific future in the select block and I'm not about to make 255 line select.
- `try_join!` is tempting me to break semantics and have my async_function return `Err(guess)`so that it causes the try_join to exit and return the value I want.
- I tried using `async_fn(guess).boxed.into_stream()` and then using `select_all` from `futures::stream` but it doesn't seem to run concurrently. I see my async functions running in order.
</details>
# 答案1
**得分**: 1
以下是翻译好的部分:
"Ok, my thinking of futures was wrong. I knew that they weren't executed immediately, but I wasn't using the executors correctly from the futures crate.
Here's what I've got that seems to work.
let thread_pool = ThreadPool::new().unwrap();
let mut pool = vec![];
for guess in 0..=255 {
let thread = thread_pool.spawn_with_handle(async_fn(guess)).expect("Failed to spawn thread");
pool.push(thread.into_stream());
}
let stream = block_on_stream(futures::stream::select_all(pool));
for value in stream {
println!("Got value {value}");
}
the thread pool executor is what creates the separate threads needed to run. Without this my application was single threaded so no matter what I tried, it would only run functions one at a time, not concurrently.
This way I spawn each request into the thread pool. The thread pool appears to spawn 4 background threads. By pushing them all into a stream, using select_all, and iterating over the stream, my main thread blocks until a new value is available.
There's always 4 workers and the thread pool is scheduling them in the order they were requested like a queue. This is perfect."
<details>
<summary>英文:</summary>
Ok, my thinking of futures was wrong. I knew that they weren't executed immediately, but I wasn't using the executors correctly from the futures crate.
Here's what I've got that seems to work.
let thread_pool = ThreadPool::new().unwrap();
let mut pool = vec![];
for guess in 0..=255 {
let thread = thread_pool.spawn_with_handle(async_fn(guess)).expect("Failed to spawn thread");
pool.push(thread.into_stream());
}
let stream = block_on_stream(futures::stream::select_all(pool));
for value in stream {
println!("Got value {value}");
}
the thread pool executor is what creates the separate threads needed to run. Without this my application was single threaded so no matter what I tried, it would only run functions one at a time, not concurrently.
This way I spawn each request into the thread pool. The thread pool appears to spawn 4 background threads. By pushing them all into a stream, using select_all, and iterating over the stream, my main thread blocks until a new value is available.
There's always 4 workers and the thread pool is scheduling them in the order they were requested like a queue. This is perfect.
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论