Rust中的C# Parallel.ForEach等效方法

huangapple go评论76阅读模式
英文:

C# Parallel.ForEach equivalent in Rust

问题

I am very new to rust and trying to create a Parallel.Run method of C# in rust.

C# code

Parallel.ForEach(collection, x => {
  // do something with x 
})

Rust Code

pub async fn parallel_run<T>(collection: Vec<T>, callback: fn(item: T) -> dyn Future<Output= ()>) {
    for item in collection {
        tokio::spawn(callback(item)).await.expect("TODO: panic message");
    }
}

Compile Error

the size for values of type dyn Future<Output = ()>
cannot be known at compilation time [E0277]
doesn't have a size known at compile-time Help:
the trait Sized is not implemented for dyn Future<Output = ()>
Note: required by a bound in tokio::spawn
dyn Future<Output = ()> cannot be sent between threads
safely [E0277] Help: the trait Send is not implemented for
dyn Future<Output = ()> Note: required by a bound in tokio::spawn

What am I missing ??

I need to re-create Parallel.ForEach equivalent in rust

英文:

I am very new to rust and trying to create a Parallel.Run method of C# in rust.

C# code

Parallel.ForEach(collection, x =&gt; {
  // do something with x 
})

Rust Code

pub async fn parallel_run&lt;T&gt;(collection: Vec&lt;T&gt;, callback:  fn(item: T) -&gt; dyn Future&lt;Output= ()&gt;){
{
    
  for item in collection  
  {
       
    tokio::spawn(callback(item)).await.expect(&quot;TODO: panic message&quot;);
  }

}


Compile Error
>the size for values of type dyn Future&lt;Output = ()&gt;
cannot be known at compilation time [E0277]
doesn't have a size known at compile-time Help:
the trait Sized is not implemented for dyn Future&lt;Output = ()&gt;
Note: required by a bound in tokio::spawn
dyn Future&lt;Output = ()&gt; cannot be sent between threads
safely [E0277] Help: the trait Send is not implemented for
dyn Future&lt;Output = ()&gt; Note: required by a bound in tokio::spawn

What am I missing ??

I need to re-create Parallel.ForEach equivalent in rust

答案1

得分: 1

If you need just some calculation (CPU-bound task), you can just use rayon.

Example of doing parallel filtering and squaring values:

use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

fn my_complex_calculation(v: i32) -> i32 {
    v * v
}

fn main() {
    let values = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    let updated: Vec<i32> = values.par_iter()
        .filter(|&&x| x % 2 == 0)
        .copied()
        .map(my_complex_calculation)
        .collect();
    assert_eq!(*updated, [4, 16, 36, 64, 100]);
}

If you need some async tasks in parallel and wait for all of them, you can use JoinSet from tokio.

use tokio::task::JoinSet;

async fn my_network_request(v: i32) -> i32 {
    // Write your code here.
    v * v
}

#[tokio::main]
async fn main() {
    let values = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    let mut results = vec![0; values.len()];
    let mut join_set = JoinSet::new();
    for (i, &v) in values.iter().enumerate() {
        join_set.spawn(async move {
            (i, my_network_request(v).await)
        });
    }
    while let Some(res) = join_set.join_next().await {
        let (idx, val) = res.unwrap();
        results[idx] = val;
    }
    assert_eq!(*results, [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
}

Also, @chayim-friedman suggested using futures::future::join_all in comments. Note that unlike my previous suggestions, this would not parallelize computational cost between CPUs.

use futures::future::join_all;

async fn my_network_request(v: i32) -> i32 {
    // Write your code here.
    v * v
}

#[tokio::main]
async fn main() {
    let values = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    let results = join_all(
        values.iter().copied().map(my_network_request)
    ).await;
    assert_eq!(*results, [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
}
英文:

If you need just some calculation (CPU-bound task), you can just use rayon

Example of doing parallel filtering and squaring values:

use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

fn my_complex_calculation(v: i32)-&gt;i32 {
    v*v
}

fn main(){
    let values = vec![1,2,3,4,5,6,7,8,9,10];
    let updated: Vec&lt;i32&gt; = values.par_iter()
        .filter(|&amp;&amp;x| x &amp; 1 == 0)
        .copied()
        .map(my_complex_calculation)
        .collect();
    assert_eq!(*updated, [4,16,36,64,100]);
}

If you need some async tasks in parallel and wait all of them, you can use JoinSet from tokio.

use tokio::task::JoinSet;

async fn my_network_request(v: i32)-&gt;i32 {
    // Write your code here.
    v*v
}


#[tokio::main]
async fn main(){
    let values = vec![1,2,3,4,5,6,7,8,9,10];
    let mut results = vec![0; values.len()];
    let mut join_set = JoinSet::new();
    for (i, &amp;v) in values.iter().enumerate(){
        join_set.spawn(async move {
            (i, my_network_request(v).await)
        });
    }
    while let Some(res) = join_set.join_next().await {
        let(idx, val) = res.unwrap();
        results[idx] = val;
    }
    assert_eq!(*results, [1, 4, 9, 16,25, 36,49, 64,81,100]);
}

Also, @chayim-friedman suggested to use futures::future::join_all in comments. Note that unlike my previous suggestions, this would not parallelize computational cost between CPUs.

use futures::future::join_all;

async fn my_network_request(v: i32)-&gt;i32 {
    // Write your code here.
    v*v
}


#[tokio::main]
async fn main(){
    let values = vec![1,2,3,4,5,6,7,8,9,10];
    let results = join_all(
        values.iter().copied().map(my_network_request)
    ).await;
    assert_eq!(*results, [1, 4, 9, 16,25, 36,49, 64,81,100]);
}

huangapple
  • 本文由 发表于 2023年5月6日 15:34:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/76187679.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定