并行处理集合

huangapple go评论59阅读模式
英文:

Parallelize processing of collection

问题

我有一个包含大量条目(假设有50K个)的Seq(),每个条目都是一个ID。

Seq[Int] = Seq(1,2,3,4,5,6 ..... 50000)

有一个阻塞函数,它以Seq作为输入,加载与每个ID对应的实体,并返回单独的Iterable[Entity]。

def getEntityFromID(id: Seq[Int]): Iterable[Entity]

如何在上述Seq上批量并行化此操作,然后将条目收集到单个Seq中。

英文:

I have a Seq() with a significant number of entries(50K suppose) - each being an ID.

Seq[Int] = Seq(1,2,3,4,5,6 ..... 50000)

There is a blocking function, which takes a Seq as an input,loads the entities corresponding to each id, and return a separate Iterable[Entity].

def getEntityFromID(id: Seq[Int]): Ierable[Entity]

How do I parallelize this operation on the above Seq - in batches of x , and collect the entries into a single Seq().

答案1

得分: 0

最简单的方法是使用 .sliding(size, step) 来遍历序列,它返回 Iterator[Seq[Int]],然后你可以在每个 Seq 上调用 getEntityFromID 并将其包装在一个 Future 中。然后,你可以等待这些 Futures 来获取结果。

示例代码:

val s = Seq(1,2,3,4,5,6,7,8)
def getEntityFromID(w: Seq[Int]) = w.map(_+1)

import scala.concurrent.duration._
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
val s2: List[Future[Seq[Int]]] = s.sliding(3, 3).map(w => Future(getEntityFromID(w))).toList
val s3: List[Int] = s2.flatMap(f => Await.result(f, 2 second))
println(s3)

当然,这仍然是阻塞的,等待后续调用完成,但这是一种方法。

一个好的做法是始终将对外部服务的调用包装在 Future 中,因为你不知道它会花多长时间(或者是否会返回有效的答案)。

这只是一个基本的答案,我在不了解具体用例的情况下所能提供的最好答案。

英文:

The easiest way is to "slide" over the sequence using .sliding(size, step) which returns Iterator[Seq[Int]], you can then do the call to the getEntityFromID from each Seq and wrap it in a future. To then get the results you can await the futures.

As an example:

  val s = Seq(1,2,3,4,5,6,7,8)
  def getEntityFromID(w: Seq[Int]) = w.map(_+1)

  import scala.concurrent.duration._
  implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
  val s2: List[Future[Seq[Int]]] = s.sliding(3, 3).map(w => Future(getEntityFromID(w))).toList
  val s3: List[Int] = s2.flatMap(f => Await.result(f, 2 second))
  println(s3)

Of course it's still blocking and waiting for subsequent calls to finish, but it's one way to do it.
A good idea is always to wrap a call to an external service in a future as you don't know how long it will take (or if it will return a valid answer).

It's a basic answer here I can give at best without making assumptions about the use case.

答案2

得分: 0

你需要在初始序列上执行 grouped(i) 操作,并获取大小为 i 的子序列,然后迭代这些子序列并调用 getEntityFromID

val list: Seq[Int] = Seq(1, 2, 3, 4, 5, 6, ..... 50000)
val groupedList = list.grouped(3)

groupedList.flatMap {
  gp => getEntityFromID(gp)
}

def getEntityFromID(id: Seq[Int]): Iterable[Entity] = ???

要并行执行 groupedList,你可以使用以下代码:

groupedList.toList.par.flatMap {
  gp => getEntityFromID(gp)
}

你还可以控制并行度使用以下方式:

val parallelCollection = groupedList.toList.par
parallelCollection.tasksupport = new ForkJoinTaskSupport(
  new scala.concurrent.forkjoin.ForkJoinPool(20)
)

parallelCollection.flatMap {
  gp => getEntityFromID(gp)
}

以上代码每次执行 20 个项。

英文:

you need to do grouped(i) on the initial seq and get subsequest sequences of i size

then iterate those and call getEntityFromID

  val list : Seq[Int] = Seq(1,2,3,4,5,6..... 50000)
  val groupedList = list.grouped(3)
  
  groupedList.flatMap{
    gp => getEntityFromID(gp)
  }

  def getEntityFromID(id: Seq[Int]): Iterable[Entity] = ???

to execute groupedList in parallel you can use

 groupedList.toList.par.flatMap{
    gp => getEntityFromID(gp)
  }

you can also control prallelism using

  val parallelCollection = groupedList.toList.par
  parallelCollection.tasksupport = new ForkJoinTaskSupport(
    new scala.concurrent.forkjoin.ForkJoinPool(20)
  )

  parallelCollection.flatMap{
    gp => getEntityFromID(gp)
  }

above code executes 20 items at a time

huangapple
  • 本文由 发表于 2023年6月19日 19:58:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/76506409.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定