并行处理而不是使用Iterators.forEachRemaining?

huangapple go评论108阅读模式
英文:

Parallel processing instead of Interators.forEachRemaining?

问题

  1. 我有这段代码,它从数据库中获取了20万条记录,然后批量处理这些记录。我注意到处理每个批次(每批处理5000条记录)大约需要3分钟(基本上是2小时的处理时间)。
  2. 我已经在数据库中为entryId使用了索引以加快查找记录的速度。我想知道的是是否可能让这个批量处理以某种方式并行运行。
  3. 如果不清楚的话,我愿意重写问题。
  1. private void processBatch(List<X> list, List<OutX> built) {
  2. list.stream().map(m -> {
  3. X xEntry = repository
  4. .findLatestUpdateById(m.getEntryId());
  5. if (xEntry == null) {
  6. return null;
  7. }
  8. XOut out = buildXOut(x);
  9. return out;
  10. }).filter(Objects::nonNull).forEach(built::add);
  11. }

更新1:
我尝试了以下方式来更新第一部分,这将处理时间缩短到大约25分钟。
还有其他建议吗?如何使这个过程更快?

List<x> lists = repository
.findBySourceFileCreationDate(getDateFromFilename(sourceFileName));

int batchSize = properties.getBuildOutput().getSize();
List<OutX> built = new ArrayList<>();// 我应该使用线程安全的向量吗?

ForkJoinPool forkJoinPool = new ForkJoinPool();
List<ForkJoinTask<?>> tasks = new ArrayList<>();

for (int i = 0; i < lists.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, lists.size());
List<X> batch = lists.subList(i, endIndex);
ForkJoinTask<?> task = forkJoinPool.submit(() -> processBatch(batch, built));
tasks.add(task);
}

tasks.forEach(ForkJoinTask::join);
forkJoinPool.shutdown();

  1. <details>
  2. <summary>英文:</summary>
  3. I have this piece of code that is getting 200k entries from the database and afterwards is processing it in batches. What I noticed is that the processBatch (5000 entries per batch) it takes around 3 minutes to process (2 hours processing basically)
  4. I am already using indexes for the database for the entryId to speedup finding entries. What I am wondering is if is possible to have this batch processing somehow maybe running in parallel.
  5. If is not clear I would gladly refactor the question
  1. Stream&lt;X&gt; streams= repository
  2. .streamBySourceFileCreationDate(getDateFromFilename(sourceFileName));
  3. int batchSize = properties.getBuildOutput().getSize();
  4. List&lt;OutX&gt; built = new ArrayList&lt;&gt;();
  5. Iterators.partition(streams.iterator(), batchSize)
  6. .forEachRemaining(list -&gt; processBatch(list, built));

private void processBatch(List<X> list, List<OutX> built) {
list.stream().map(m -> {
X xEntry = repository
.findLatestUpdateById(m.getEntryId());
if (xEntry == null) {
return null;
}
XOut out = buildXOut(x);
return out;
}).filter(Objects::nonNull).forEach(built::add);
}

  1. UPDATE 1:
  2. I tried like this to update the first part and this reduced the processing to around 25 min.
  3. Any other suggestion maybe how to make this faster ?

List<x> lists = repository
.findBySourceFileCreationDate(getDateFromFilename(sourceFileName));

int batchSize = properties.getBuildOutput().getSize();
List<OutX> built = new ArrayList<>();// Should I use maybe a vector for thread safe ?

  1. ForkJoinPool forkJoinPool = new ForkJoinPool();
  2. List&lt;ForkJoinTask&lt;?&gt;&gt; tasks = new ArrayList&lt;&gt;();
  3. for (int i = 0; i &lt; lists.size(); i += batchSize) {
  4. int endIndex = Math.min(i + batchSize, lists.size());
  5. List&lt;X&gt; batch = lists.subList(i, endIndex);
  6. ForkJoinTask&lt;?&gt; task = forkJoinPool.submit(() -&gt; processBatch(batch, built));
  7. tasks.add(task);
  8. }
  9. tasks.forEach(ForkJoinTask::join);
  10. forkJoinPool.shutdown();
  1. </details>
  2. # 答案1
  3. **得分**: 1
  4. 以下是已经翻译好的代码部分:
  5. ```java
  6. One of many possible options:
  7. List<X> readIn = repository.findBySourceFileCreationDate(getDateFromFilename(sourceFileName));
  8. List<OutX> built = readIn.parallelStream()
  9. .map(m ->
  10. {
  11. X xEntry = repository.findLatestUpdateById(m.getEntryId());
  12. if (xEntry == null) {
  13. return null;
  14. }
  15. return buildXOut(x);
  16. })
  17. .filter(Objects::nonNull)
  18. .collect(Collectors.toList());
英文:

One of many possible options:

  1. List&lt;X&gt; readIn = repository.findBySourceFileCreationDate(getDateFromFilename(sourceFileName));
  2. List&lt;OutX&gt; built = readIn.parallelStream()
  3. .map(m -&gt;
  4. {
  5. X xEntry = repository.findLatestUpdateById(m.getEntryId());
  6. if (xEntry == null) {
  7. return null;
  8. }
  9. return buildXOut(x);
  10. })
  11. .filter(Objects::nonNull)
  12. .collect(Collectors.toList());

huangapple
  • 本文由 发表于 2023年6月22日 17:43:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/76530573.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定