英文:
Parallelize processing of collection
问题
我有一个包含大量条目(假设有50K个)的Seq(),每个条目都是一个ID。
Seq[Int] = Seq(1,2,3,4,5,6 ..... 50000)
有一个阻塞函数,它以Seq作为输入,加载与每个ID对应的实体,并返回单独的Iterable[Entity]。
def getEntityFromID(id: Seq[Int]): Iterable[Entity]
如何在上述Seq上批量并行化此操作,然后将条目收集到单个Seq中。
英文:
I have a Seq() with a significant number of entries(50K suppose) - each being an ID.
Seq[Int] = Seq(1,2,3,4,5,6 ..... 50000)
There is a blocking function, which takes a Seq as an input,loads the entities corresponding to each id, and return a separate Iterable[Entity].
def getEntityFromID(id: Seq[Int]): Ierable[Entity]
How do I parallelize this operation on the above Seq - in batches of x , and collect the entries into a single Seq().
答案1
得分: 0
最简单的方法是使用 .sliding(size, step) 来遍历序列,它返回 Iterator[Seq[Int]],然后你可以在每个 Seq 上调用 getEntityFromID 并将其包装在一个 Future 中。然后,你可以等待这些 Futures 来获取结果。
示例代码:
val s = Seq(1,2,3,4,5,6,7,8)
def getEntityFromID(w: Seq[Int]) = w.map(_+1)
import scala.concurrent.duration._
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
val s2: List[Future[Seq[Int]]] = s.sliding(3, 3).map(w => Future(getEntityFromID(w))).toList
val s3: List[Int] = s2.flatMap(f => Await.result(f, 2 second))
println(s3)
当然,这仍然是阻塞的,等待后续调用完成,但这是一种方法。
一个好的做法是始终将对外部服务的调用包装在 Future 中,因为你不知道它会花多长时间(或者是否会返回有效的答案)。
这只是一个基本的答案,我在不了解具体用例的情况下所能提供的最好答案。
英文:
The easiest way is to "slide" over the sequence using .sliding(size, step) which returns Iterator[Seq[Int]], you can then do the call to the getEntityFromID from each Seq and wrap it in a future. To then get the results you can await the futures.
As an example:
val s = Seq(1,2,3,4,5,6,7,8)
def getEntityFromID(w: Seq[Int]) = w.map(_+1)
import scala.concurrent.duration._
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
val s2: List[Future[Seq[Int]]] = s.sliding(3, 3).map(w => Future(getEntityFromID(w))).toList
val s3: List[Int] = s2.flatMap(f => Await.result(f, 2 second))
println(s3)
Of course it's still blocking and waiting for subsequent calls to finish, but it's one way to do it.
A good idea is always to wrap a call to an external service in a future as you don't know how long it will take (or if it will return a valid answer).
It's a basic answer here I can give at best without making assumptions about the use case.
答案2
得分: 0
你需要在初始序列上执行 grouped(i) 操作,并获取大小为 i 的子序列,然后迭代这些子序列并调用 getEntityFromID。
val list: Seq[Int] = Seq(1, 2, 3, 4, 5, 6, ..... 50000)
val groupedList = list.grouped(3)
groupedList.flatMap {
gp => getEntityFromID(gp)
}
def getEntityFromID(id: Seq[Int]): Iterable[Entity] = ???
要并行执行 groupedList,你可以使用以下代码:
groupedList.toList.par.flatMap {
gp => getEntityFromID(gp)
}
你还可以控制并行度使用以下方式:
val parallelCollection = groupedList.toList.par
parallelCollection.tasksupport = new ForkJoinTaskSupport(
new scala.concurrent.forkjoin.ForkJoinPool(20)
)
parallelCollection.flatMap {
gp => getEntityFromID(gp)
}
以上代码每次执行 20 个项。
英文:
you need to do grouped(i) on the initial seq and get subsequest sequences of i size
then iterate those and call getEntityFromID
val list : Seq[Int] = Seq(1,2,3,4,5,6..... 50000)
val groupedList = list.grouped(3)
groupedList.flatMap{
gp => getEntityFromID(gp)
}
def getEntityFromID(id: Seq[Int]): Iterable[Entity] = ???
to execute groupedList in parallel you can use
groupedList.toList.par.flatMap{
gp => getEntityFromID(gp)
}
you can also control prallelism using
val parallelCollection = groupedList.toList.par
parallelCollection.tasksupport = new ForkJoinTaskSupport(
new scala.concurrent.forkjoin.ForkJoinPool(20)
)
parallelCollection.flatMap{
gp => getEntityFromID(gp)
}
above code executes 20 items at a time
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论