我如何在异步Rust中从一组futures中等待特定的结果?

huangapple go评论96阅读模式
英文:

How can I wait for a specific result from a pool of futures with async rust?

问题

  1. 使用 `futures` crate
  2. 我有一个返回布尔值的 futures 向量,我想专门等待返回 true future
  3. 考虑以下一组 futures
  4. ```rust
  5. async fn async_function(guess: u8) -> bool {
  6. let random_wait = rand::thread_rng().gen_range(0..2);
  7. std::thread::sleep(std::time::Duration::from_secs(random_wait));
  8. println!("Running guess {guess}");
  9. guess == 231
  10. }
  11. fn main() {
  12. let mut pool: Vec<impl Future<Output = bool>> = vec![];
  13. for guess in 0..=255 {
  14. pool.push(async_function(guess));
  15. }
  16. }
  • 我如何等待向量中的 futures?
  • 是否可能等待直到只有一个 future 返回 true?
  • 我可以确定返回 true 的 future 的 guess 的值吗?

我是异步 Rust 的新手,所以我一直在查看 async-book。

从那里,这些是我考虑过的选项:

  • join! 会等待所有线程完成,所以对我来说行不通,因为我想放弃剩下的 futures。

  • select! 似乎不是一个选项,因为我需要在 select 块中指定特定的 future,我不准备编写 255 行的 select。

  • try_join! 诱使我破坏语义,并让我的 async_function 返回 Err(guess),以便它导致 try_join 退出并返回我想要的值。

  • 我尝试使用 async_fn(guess).boxed.into_stream() 然后使用 futures::stream 中的 select_all,但似乎不会并发运行。我看到我的异步函数按顺序运行。

  1. <details>
  2. <summary>英文:</summary>
  3. Using the `futures` crate.
  4. I have a vec of futures which return a bool and I want to wait specifically for the future that returns true.
  5. consider the following pool of futures.

async fn async_function(guess: u8) -> bool {
let random_wait = rand::thread_rng().gen_range(0..2);
std::thread::sleep(std::time::Duration::from_secs(random_wait));
println!("Running guess {guess}");
guess == 231
}

fn main() {
let mut pool: Vec<impl Future<Output = bool>> = vec![];
for guess in 0..=255 {
pool.push(async_function(guess));
}
}

  1. - How do I wait for the futures in the vec?
  2. - Is it possible to wait until only one future returns true?
  3. - Can I identify the value of guess for the future that returns true?
  4. I&#39;m new to async rust, so I&#39;ve been looking at the async-book.
  5. From there, these are the options I&#39;ve considered:
  6. - `join!` waits until all threads are done, so that doesn&#39;t work for me since I want to drop the remaining futures.
  7. - `select!` doesn&#39;t seem to be an option, because I need to specify the specific future in the select block and I&#39;m not about to make 255 line select.
  8. - `try_join!` is tempting me to break semantics and have my async_function return `Err(guess)`so that it causes the try_join to exit and return the value I want.
  9. - I tried using `async_fn(guess).boxed.into_stream()` and then using `select_all` from `futures::stream` but it doesn&#39;t seem to run concurrently. I see my async functions running in order.
  10. </details>
  11. # 答案1
  12. **得分**: 1
  13. 以下是翻译好的部分:
  14. "Ok, my thinking of futures was wrong. I knew that they weren't executed immediately, but I wasn't using the executors correctly from the futures crate.
  15. Here's what I've got that seems to work.
  16. let thread_pool = ThreadPool::new().unwrap();
  17. let mut pool = vec![];
  18. for guess in 0..=255 {
  19. let thread = thread_pool.spawn_with_handle(async_fn(guess)).expect("Failed to spawn thread");
  20. pool.push(thread.into_stream());
  21. }
  22. let stream = block_on_stream(futures::stream::select_all(pool));
  23. for value in stream {
  24. println!("Got value {value}");
  25. }
  26. the thread pool executor is what creates the separate threads needed to run. Without this my application was single threaded so no matter what I tried, it would only run functions one at a time, not concurrently.
  27. This way I spawn each request into the thread pool. The thread pool appears to spawn 4 background threads. By pushing them all into a stream, using select_all, and iterating over the stream, my main thread blocks until a new value is available.
  28. There's always 4 workers and the thread pool is scheduling them in the order they were requested like a queue. This is perfect."
  29. <details>
  30. <summary>英文:</summary>
  31. Ok, my thinking of futures was wrong. I knew that they weren&#39;t executed immediately, but I wasn&#39;t using the executors correctly from the futures crate.
  32. Here&#39;s what I&#39;ve got that seems to work.
  33. let thread_pool = ThreadPool::new().unwrap();
  34. let mut pool = vec![];
  35. for guess in 0..=255 {
  36. let thread = thread_pool.spawn_with_handle(async_fn(guess)).expect(&quot;Failed to spawn thread&quot;);
  37. pool.push(thread.into_stream());
  38. }
  39. let stream = block_on_stream(futures::stream::select_all(pool));
  40. for value in stream {
  41. println!(&quot;Got value {value}&quot;);
  42. }
  43. the thread pool executor is what creates the separate threads needed to run. Without this my application was single threaded so no matter what I tried, it would only run functions one at a time, not concurrently.
  44. This way I spawn each request into the thread pool. The thread pool appears to spawn 4 background threads. By pushing them all into a stream, using select_all, and iterating over the stream, my main thread blocks until a new value is available.
  45. There&#39;s always 4 workers and the thread pool is scheduling them in the order they were requested like a queue. This is perfect.
  46. </details>

huangapple
  • 本文由 发表于 2023年2月10日 05:36:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/75404656.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定