英文:
Parallel processing instead of Interators.forEachRemaining?
问题
我有这段代码,它从数据库中获取了20万条记录,然后批量处理这些记录。我注意到处理每个批次(每批处理5000条记录)大约需要3分钟(基本上是2小时的处理时间)。
我已经在数据库中为entryId使用了索引以加快查找记录的速度。我想知道的是是否可能让这个批量处理以某种方式并行运行。
如果不清楚的话,我愿意重写问题。
private void processBatch(List<X> list, List<OutX> built) {
    list.stream().map(m -> {
            X xEntry = repository
                    .findLatestUpdateById(m.getEntryId());
            if (xEntry == null) {
                return null;
            }
            XOut out = buildXOut(x);
            return out;
    }).filter(Objects::nonNull).forEach(built::add);
}
更新1:
我尝试了以下方式来更新第一部分,这将处理时间缩短到大约25分钟。
还有其他建议吗?如何使这个过程更快?
List<x> lists = repository
.findBySourceFileCreationDate(getDateFromFilename(sourceFileName));
int batchSize = properties.getBuildOutput().getSize();
List<OutX> built = new ArrayList<>();// 我应该使用线程安全的向量吗?
ForkJoinPool forkJoinPool = new ForkJoinPool();
List<ForkJoinTask<?>> tasks = new ArrayList<>();
for (int i = 0; i < lists.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, lists.size());
List<X> batch = lists.subList(i, endIndex);
ForkJoinTask<?> task = forkJoinPool.submit(() -> processBatch(batch, built));
tasks.add(task);
}
tasks.forEach(ForkJoinTask::join);
forkJoinPool.shutdown();
<details>
<summary>英文:</summary>
I have this piece of code that is getting 200k entries from the database and afterwards is processing it in batches. What I noticed is that the processBatch (5000 entries per batch) it takes around 3 minutes to process (2 hours processing basically)
I am already using indexes for the database for the entryId to speedup finding entries.  What I am wondering is if is possible to have this batch processing somehow maybe running in parallel.
If is not clear I would gladly refactor the question
     Stream<X> streams= repository
            .streamBySourceFileCreationDate(getDateFromFilename(sourceFileName));
    int batchSize = properties.getBuildOutput().getSize();
    List<OutX> built = new ArrayList<>();
    Iterators.partition(streams.iterator(), batchSize)
            .forEachRemaining(list -> processBatch(list, built));
private void processBatch(List<X> list, List<OutX> built) {
list.stream().map(m -> {
X xEntry = repository
.findLatestUpdateById(m.getEntryId());
if (xEntry == null) {
return null;
}
XOut out = buildXOut(x);
return out;
}).filter(Objects::nonNull).forEach(built::add);
}
UPDATE 1:
I tried like this to update the first part and this reduced the processing to around 25 min. 
Any other suggestion maybe how to make this faster ? 
List<x> lists = repository
.findBySourceFileCreationDate(getDateFromFilename(sourceFileName));
int batchSize = properties.getBuildOutput().getSize();
List<OutX> built = new ArrayList<>();// Should I use maybe a vector for thread safe ?
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    List<ForkJoinTask<?>> tasks = new ArrayList<>();
    for (int i = 0; i < lists.size(); i += batchSize) {
        int endIndex = Math.min(i + batchSize, lists.size());
        List<X> batch = lists.subList(i, endIndex);
        ForkJoinTask<?> task = forkJoinPool.submit(() -> processBatch(batch, built));
        tasks.add(task);
    }
    tasks.forEach(ForkJoinTask::join);
    forkJoinPool.shutdown();
</details>
# 答案1
**得分**: 1
以下是已经翻译好的代码部分:
```java
One of many possible options:
    List<X> readIn = repository.findBySourceFileCreationDate(getDateFromFilename(sourceFileName));
    List<OutX> built = readIn.parallelStream()
        .map(m ->
            {
              X xEntry = repository.findLatestUpdateById(m.getEntryId());
              if (xEntry == null) {
                return null;
              }
              return buildXOut(x);
            })
        .filter(Objects::nonNull)
        .collect(Collectors.toList());
英文:
One of many possible options:
List<X> readIn = repository.findBySourceFileCreationDate(getDateFromFilename(sourceFileName));
List<OutX> built = readIn.parallelStream()
    .map(m -> 
        {
          X xEntry = repository.findLatestUpdateById(m.getEntryId());
          if (xEntry == null) {
            return null;
          }
          return buildXOut(x);
        })
    .filter(Objects::nonNull)
    .collect(Collectors.toList());
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论