CompletableFuture用于将结果收集到哈希映射中。

huangapple go评论82阅读模式
英文:

CompletableFuture to collect results to hashmap

问题

以下是翻译好的内容:

我正在从数据库中读取一系列的PDF文件,然后解析它们并对它们执行一些任务。
当我正在读取这些PDF文件的列表时,我发现从PDF中提取图像需要更多的时间,而且我不想阻塞主线程来读取图像。因此,我希望在单独的线程中执行提取图像的操作。
我想要一个接一个地从每个PDF中读取图像,而不是一次性将所有的PDF加载到内存中(由于内存限制)。因此,我只想要两个线程:一个主线程(从PDF中读取一些文本并执行其他操作),另一个线程用于提取图像并返回一组图像对象。

这里的一个注意点是,来自PDF的图像在内容上可能是相同的,因此我希要在汇总结果之前使用校验和或其他方式删除重复的图像。
我不想在所有任务完成之前将图像保存在内存中,我想要在获得一个PDF的结果时就删除重复项。

所以实际的问题是,我需要将多个任务提交给一个大小为1的线程池,并且需要在获得结果时删除重复项,以便我不需要长时间地将图像保存在内存中。

以下是我尝试过的想法。
我从代码中删除了不必要的部分,如图像及其内容,并将代码转换为基于字符串的问题。

public static void main(String[] args) throws InterruptedException, ExecutionException {
    Map<String, Integer> uniqueImages = new HashMap<>();
    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
    List<CompletableFuture<String>> futureList = new ArrayList<>();
    for(int i = 0; i<20000; i++) {
        CompletableFuture<String> obj = CompletableFuture.supplyAsync(()->{
            // 假设有很多重复项
            return UUID.randomUUID().toString();
        }, newFixedThreadPool).thenApply((x)->{
            if(uniqueImages.containsKey(x)) {
                int val = uniqueImages.get(x);
                uniqueImages.put(x, val+1);
            }
            else {
                uniqueImages.put(x, 1);
            }
            return x;
        });
        futureList.add(obj);
    }
    
    for(CompletableFuture<String> future: futureList) {
        future.get();
    }
    System.out.println(uniqueImages.size());
}

我担心这段代码是否真的有效,或者会抛出"ConcurrentModification Exception"异常。

  • uniqueImages地图是否真的包含唯一的图像及其计数?
  • 是否存在任何隐藏问题?
  • 是否有更好的方法来解决我的用例?
英文:

I am reading a list of pdf files from DB and parsing them and performing some tasks with them.
When I am reading this list of pdf's , I have seen extracting images from PDF is taking more time and I don't to block my main thread for reading images. So I want to execute extracting images in a separate thread.
I want to read images from one pdf after another, without loading all the pdfs into memory at once (due to memory concern). So I just want 2 threads; one should be main thread (which reads some text from pdf and does some other stuff) and other should be a thread which extracts images and return set of image objects.

One caveat here is that the images from pdfs can be same in their content , so I want to remove duplicate images using checksum or some other means before collecting their result.
I don't want to hold images in memory until all tasks are getting completed, I want to remove duplicates as on when I get the result of one pdf

So the real question is , I need to submit multiple tasks to a thread pool of size 1 and need to remove duplicates as on when I get the result so that I don't need to hold the image in memory for longer time.

Below is the idea of what have I tried.
I have removed unnecessary things from the code like Images and its content and converted code to String based problem.

public static void main(String[] args) throws InterruptedException, ExecutionException {
		Map&lt;String, Integer&gt; uniqueImages = new HashMap&lt;&gt;();
		 ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
    	List&lt;CompletableFuture&lt;String&gt;&gt; futureList = new ArrayList&lt;&gt;();
    	for(int i = 0; i&lt;20000; i++) {
	    	CompletableFuture&lt;String&gt; obj = CompletableFuture.supplyAsync(()-&gt;{
	    		//Assume lot of duplicates
	    		return UUID.randomUUID().toString();
			}, newFixedThreadPool).thenApply((x)-&gt;{
				if(uniqueImages.containsKey(x)) {
					int val = uniqueImages.get(x);
					uniqueImages.put(x, val+1);
				}
				else {
					uniqueImages.put(x, 1);
				}
				return x;
			});
	    	futureList.add(obj);
    	}
    	
    	for(CompletableFuture&lt;String&gt; future: futureList) {
    		future.get();
    	}
    	System.out.println(uniqueImages.size());
	}

I'm worried if this code really works or throws ConcurrentModification Exception.

  • Does uniqueImages map really contains unique images and their counts?
  • Are there any hidden issues?
  • Is there better way to solve my use case?

答案1

得分: 1

只要您仅使用大小为1的线程池,就没有问题。

然而,如果有多个线程,那么代码确实不是线程安全的,容易出现数据竞争。
我们可以通过以下场景加以说明:

考虑以下代码:

if(uniqueImages.containsKey(x)) {
    int val = uniqueImages.get(x);
    uniqueImages.put(x, val+1);
}
else {
    uniqueImages.put(x, 1);
}

想象一下,线程1和线程2都返回相同的字符串,并同时到达if(uniqueImages.containsKey(x))这一行。
if语句在两个线程中都将返回false,然后uniqueImages.put(x, 1);会在两个线程中被调用。
要么会抛出ConcurrentModification异常,要么会得到错误的计数(变成1而不是2)。

如果您计划使用多个线程,那么必须使用ConcurrentHashMap

英文:

As long as you are only using thread pool of size 1, there is no problem.

However, if there are multiple threads, then the code is indeed not thread-safe and is prone to data race.
We can demonstrate using the following scenario:

Consider this code:

if(uniqueImages.containsKey(x)) {
    int val = uniqueImages.get(x);
    uniqueImages.put(x, val+1);
}
else {
    uniqueImages.put(x, 1);
}

Imagine thread 1 and thread 2 both return the same string and reach at line if(uniqueImages.containsKey(x)) together.
if will return false in both the threads, and uniqueImages.put(x, 1); will be called in both the threads.
Either a ConcurrentModification exception will be thrown or you will get a wrong count (1 instead of 2).

If you plan to use multiple threads, then you must use a ConcurrentHashMap.

huangapple
  • 本文由 发表于 2020年7月27日 03:32:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/63104778.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定