Rust中的C# Parallel.ForEach等效方法

huangapple go评论101阅读模式
英文:

C# Parallel.ForEach equivalent in Rust

问题

I am very new to rust and trying to create a Parallel.Run method of C# in rust.

C# code

  1. Parallel.ForEach(collection, x => {
  2. // do something with x
  3. })

Rust Code

  1. pub async fn parallel_run<T>(collection: Vec<T>, callback: fn(item: T) -> dyn Future<Output= ()>) {
  2. for item in collection {
  3. tokio::spawn(callback(item)).await.expect("TODO: panic message");
  4. }
  5. }

Compile Error

the size for values of type dyn Future<Output = ()>
cannot be known at compilation time [E0277]
doesn't have a size known at compile-time Help:
the trait Sized is not implemented for dyn Future<Output = ()>
Note: required by a bound in tokio::spawn
dyn Future<Output = ()> cannot be sent between threads
safely [E0277] Help: the trait Send is not implemented for
dyn Future<Output = ()> Note: required by a bound in tokio::spawn

What am I missing ??

I need to re-create Parallel.ForEach equivalent in rust

英文:

I am very new to rust and trying to create a Parallel.Run method of C# in rust.

C# code

  1. Parallel.ForEach(collection, x =&gt; {
  2. // do something with x
  3. })

Rust Code

  1. pub async fn parallel_run&lt;T&gt;(collection: Vec&lt;T&gt;, callback: fn(item: T) -&gt; dyn Future&lt;Output= ()&gt;){
  2. {
  3. for item in collection
  4. {
  5. tokio::spawn(callback(item)).await.expect(&quot;TODO: panic message&quot;);
  6. }
  7. }

Compile Error
>the size for values of type dyn Future&lt;Output = ()&gt;
cannot be known at compilation time [E0277]
doesn't have a size known at compile-time Help:
the trait Sized is not implemented for dyn Future&lt;Output = ()&gt;
Note: required by a bound in tokio::spawn
dyn Future&lt;Output = ()&gt; cannot be sent between threads
safely [E0277] Help: the trait Send is not implemented for
dyn Future&lt;Output = ()&gt; Note: required by a bound in tokio::spawn

What am I missing ??

I need to re-create Parallel.ForEach equivalent in rust

答案1

得分: 1

If you need just some calculation (CPU-bound task), you can just use rayon.

Example of doing parallel filtering and squaring values:

  1. use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
  2. fn my_complex_calculation(v: i32) -> i32 {
  3. v * v
  4. }
  5. fn main() {
  6. let values = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
  7. let updated: Vec<i32> = values.par_iter()
  8. .filter(|&&x| x % 2 == 0)
  9. .copied()
  10. .map(my_complex_calculation)
  11. .collect();
  12. assert_eq!(*updated, [4, 16, 36, 64, 100]);
  13. }

If you need some async tasks in parallel and wait for all of them, you can use JoinSet from tokio.

  1. use tokio::task::JoinSet;
  2. async fn my_network_request(v: i32) -> i32 {
  3. // Write your code here.
  4. v * v
  5. }
  6. #[tokio::main]
  7. async fn main() {
  8. let values = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
  9. let mut results = vec![0; values.len()];
  10. let mut join_set = JoinSet::new();
  11. for (i, &v) in values.iter().enumerate() {
  12. join_set.spawn(async move {
  13. (i, my_network_request(v).await)
  14. });
  15. }
  16. while let Some(res) = join_set.join_next().await {
  17. let (idx, val) = res.unwrap();
  18. results[idx] = val;
  19. }
  20. assert_eq!(*results, [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
  21. }

Also, @chayim-friedman suggested using futures::future::join_all in comments. Note that unlike my previous suggestions, this would not parallelize computational cost between CPUs.

  1. use futures::future::join_all;
  2. async fn my_network_request(v: i32) -> i32 {
  3. // Write your code here.
  4. v * v
  5. }
  6. #[tokio::main]
  7. async fn main() {
  8. let values = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
  9. let results = join_all(
  10. values.iter().copied().map(my_network_request)
  11. ).await;
  12. assert_eq!(*results, [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
  13. }
英文:

If you need just some calculation (CPU-bound task), you can just use rayon

Example of doing parallel filtering and squaring values:

  1. use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
  2. fn my_complex_calculation(v: i32)-&gt;i32 {
  3. v*v
  4. }
  5. fn main(){
  6. let values = vec![1,2,3,4,5,6,7,8,9,10];
  7. let updated: Vec&lt;i32&gt; = values.par_iter()
  8. .filter(|&amp;&amp;x| x &amp; 1 == 0)
  9. .copied()
  10. .map(my_complex_calculation)
  11. .collect();
  12. assert_eq!(*updated, [4,16,36,64,100]);
  13. }

If you need some async tasks in parallel and wait all of them, you can use JoinSet from tokio.

  1. use tokio::task::JoinSet;
  2. async fn my_network_request(v: i32)-&gt;i32 {
  3. // Write your code here.
  4. v*v
  5. }
  6. #[tokio::main]
  7. async fn main(){
  8. let values = vec![1,2,3,4,5,6,7,8,9,10];
  9. let mut results = vec![0; values.len()];
  10. let mut join_set = JoinSet::new();
  11. for (i, &amp;v) in values.iter().enumerate(){
  12. join_set.spawn(async move {
  13. (i, my_network_request(v).await)
  14. });
  15. }
  16. while let Some(res) = join_set.join_next().await {
  17. let(idx, val) = res.unwrap();
  18. results[idx] = val;
  19. }
  20. assert_eq!(*results, [1, 4, 9, 16,25, 36,49, 64,81,100]);
  21. }

Also, @chayim-friedman suggested to use futures::future::join_all in comments. Note that unlike my previous suggestions, this would not parallelize computational cost between CPUs.

  1. use futures::future::join_all;
  2. async fn my_network_request(v: i32)-&gt;i32 {
  3. // Write your code here.
  4. v*v
  5. }
  6. #[tokio::main]
  7. async fn main(){
  8. let values = vec![1,2,3,4,5,6,7,8,9,10];
  9. let results = join_all(
  10. values.iter().copied().map(my_network_request)
  11. ).await;
  12. assert_eq!(*results, [1, 4, 9, 16,25, 36,49, 64,81,100]);
  13. }

huangapple
  • 本文由 发表于 2023年5月6日 15:34:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/76187679.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定