CompletableFuture用于将结果收集到哈希映射中。

huangapple go评论101阅读模式
英文:

CompletableFuture to collect results to hashmap

问题

以下是翻译好的内容:

我正在从数据库中读取一系列的PDF文件,然后解析它们并对它们执行一些任务。
当我正在读取这些PDF文件的列表时,我发现从PDF中提取图像需要更多的时间,而且我不想阻塞主线程来读取图像。因此,我希望在单独的线程中执行提取图像的操作。
我想要一个接一个地从每个PDF中读取图像,而不是一次性将所有的PDF加载到内存中(由于内存限制)。因此,我只想要两个线程:一个主线程(从PDF中读取一些文本并执行其他操作),另一个线程用于提取图像并返回一组图像对象。

这里的一个注意点是,来自PDF的图像在内容上可能是相同的,因此我希要在汇总结果之前使用校验和或其他方式删除重复的图像。
我不想在所有任务完成之前将图像保存在内存中,我想要在获得一个PDF的结果时就删除重复项。

所以实际的问题是,我需要将多个任务提交给一个大小为1的线程池,并且需要在获得结果时删除重复项,以便我不需要长时间地将图像保存在内存中。

以下是我尝试过的想法。
我从代码中删除了不必要的部分,如图像及其内容,并将代码转换为基于字符串的问题。

  1. public static void main(String[] args) throws InterruptedException, ExecutionException {
  2. Map<String, Integer> uniqueImages = new HashMap<>();
  3. ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
  4. List<CompletableFuture<String>> futureList = new ArrayList<>();
  5. for(int i = 0; i<20000; i++) {
  6. CompletableFuture<String> obj = CompletableFuture.supplyAsync(()->{
  7. // 假设有很多重复项
  8. return UUID.randomUUID().toString();
  9. }, newFixedThreadPool).thenApply((x)->{
  10. if(uniqueImages.containsKey(x)) {
  11. int val = uniqueImages.get(x);
  12. uniqueImages.put(x, val+1);
  13. }
  14. else {
  15. uniqueImages.put(x, 1);
  16. }
  17. return x;
  18. });
  19. futureList.add(obj);
  20. }
  21. for(CompletableFuture<String> future: futureList) {
  22. future.get();
  23. }
  24. System.out.println(uniqueImages.size());
  25. }

我担心这段代码是否真的有效,或者会抛出"ConcurrentModification Exception"异常。

  • uniqueImages地图是否真的包含唯一的图像及其计数?
  • 是否存在任何隐藏问题?
  • 是否有更好的方法来解决我的用例?
英文:

I am reading a list of pdf files from DB and parsing them and performing some tasks with them.
When I am reading this list of pdf's , I have seen extracting images from PDF is taking more time and I don't to block my main thread for reading images. So I want to execute extracting images in a separate thread.
I want to read images from one pdf after another, without loading all the pdfs into memory at once (due to memory concern). So I just want 2 threads; one should be main thread (which reads some text from pdf and does some other stuff) and other should be a thread which extracts images and return set of image objects.

One caveat here is that the images from pdfs can be same in their content , so I want to remove duplicate images using checksum or some other means before collecting their result.
I don't want to hold images in memory until all tasks are getting completed, I want to remove duplicates as on when I get the result of one pdf

So the real question is , I need to submit multiple tasks to a thread pool of size 1 and need to remove duplicates as on when I get the result so that I don't need to hold the image in memory for longer time.

Below is the idea of what have I tried.
I have removed unnecessary things from the code like Images and its content and converted code to String based problem.

  1. public static void main(String[] args) throws InterruptedException, ExecutionException {
  2. Map&lt;String, Integer&gt; uniqueImages = new HashMap&lt;&gt;();
  3. ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
  4. List&lt;CompletableFuture&lt;String&gt;&gt; futureList = new ArrayList&lt;&gt;();
  5. for(int i = 0; i&lt;20000; i++) {
  6. CompletableFuture&lt;String&gt; obj = CompletableFuture.supplyAsync(()-&gt;{
  7. //Assume lot of duplicates
  8. return UUID.randomUUID().toString();
  9. }, newFixedThreadPool).thenApply((x)-&gt;{
  10. if(uniqueImages.containsKey(x)) {
  11. int val = uniqueImages.get(x);
  12. uniqueImages.put(x, val+1);
  13. }
  14. else {
  15. uniqueImages.put(x, 1);
  16. }
  17. return x;
  18. });
  19. futureList.add(obj);
  20. }
  21. for(CompletableFuture&lt;String&gt; future: futureList) {
  22. future.get();
  23. }
  24. System.out.println(uniqueImages.size());
  25. }

I'm worried if this code really works or throws ConcurrentModification Exception.

  • Does uniqueImages map really contains unique images and their counts?
  • Are there any hidden issues?
  • Is there better way to solve my use case?

答案1

得分: 1

只要您仅使用大小为1的线程池,就没有问题。

然而,如果有多个线程,那么代码确实不是线程安全的,容易出现数据竞争。
我们可以通过以下场景加以说明:

考虑以下代码:

  1. if(uniqueImages.containsKey(x)) {
  2. int val = uniqueImages.get(x);
  3. uniqueImages.put(x, val+1);
  4. }
  5. else {
  6. uniqueImages.put(x, 1);
  7. }

想象一下,线程1和线程2都返回相同的字符串,并同时到达if(uniqueImages.containsKey(x))这一行。
if语句在两个线程中都将返回false,然后uniqueImages.put(x, 1);会在两个线程中被调用。
要么会抛出ConcurrentModification异常,要么会得到错误的计数(变成1而不是2)。

如果您计划使用多个线程,那么必须使用ConcurrentHashMap

英文:

As long as you are only using thread pool of size 1, there is no problem.

However, if there are multiple threads, then the code is indeed not thread-safe and is prone to data race.
We can demonstrate using the following scenario:

Consider this code:

  1. if(uniqueImages.containsKey(x)) {
  2. int val = uniqueImages.get(x);
  3. uniqueImages.put(x, val+1);
  4. }
  5. else {
  6. uniqueImages.put(x, 1);
  7. }

Imagine thread 1 and thread 2 both return the same string and reach at line if(uniqueImages.containsKey(x)) together.
if will return false in both the threads, and uniqueImages.put(x, 1); will be called in both the threads.
Either a ConcurrentModification exception will be thrown or you will get a wrong count (1 instead of 2).

If you plan to use multiple threads, then you must use a ConcurrentHashMap.

huangapple
  • 本文由 发表于 2020年7月27日 03:32:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/63104778.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定