java 8的parallelStream().forEach导致数据丢失

huangapple go评论76阅读模式
英文:

java 8 parallelStream().forEach Result data loss

问题

有两个测试案例使用了parallelStream()

案例一:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 20000; i++) {
  src.add(i);
}
List<String> strings = new ArrayList<>();

src.parallelStream().filter(integer -> (integer % 2) == 0).forEach(integer -> strings.add(integer + ""));

System.out.println("=size=>" + strings.size());

输出:

=size=>9332

案例二:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 20000; i++) {
  src.add(i);
}
List<String> strings = new ArrayList<>();

src.parallelStream().forEach(integer -> strings.add(integer + ""));

System.out.println("=size=>" + strings.size());

输出:

=size=>17908

为什么在使用parallelStream时我总是丢失数据?
我做错了什么?

英文:

There are two test cases which use parallelStream():

List&lt;Integer&gt; src = new ArrayList&lt;&gt;();
for (int i = 0; i &lt; 20000; i++) {
  src.add(i);
}
List&lt;String&gt; strings = new ArrayList&lt;&gt;();
       
src.parallelStream().filter(integer -&gt; (integer % 2) == 0).forEach(integer -&gt; strings.add(integer + &quot;&quot;));
    
System.out.println(&quot;=size=&gt;&quot; + strings.size());
=size=&gt;9332
List&lt;Integer&gt; src = new ArrayList&lt;&gt;();
for (int i = 0; i &lt; 20000; i++) {
  src.add(i);
}
List&lt;String&gt; strings = new ArrayList&lt;&gt;();

src.parallelStream().forEach(integer -&gt; strings.add(integer + &quot;&quot;));

System.out.println(&quot;=size=&gt;&quot; + strings.size());
=size=&gt;17908

Why do I always lose data when using parallelStream?
What did i do wrong?

答案1

得分: 7

ArrayList 不是线程安全的。您需要使用以下方法之一:

List<String> strings = Collections.synchronizedList(new ArrayList<>());

或者

List<String> strings = new Vector<>();

以确保所有更新操作都是同步的,或者切换到以下方法:

List<String> strings = src.parallelStream()
    .filter(integer -> (integer % 2) == 0)
    .map(integer -> integer + "")
    .collect(Collectors.toList());

并且将列表构建留给 Streams 框架。需要注意的是,由 collect 返回的列表是否可修改是未定义的,所以如果这是一个要求,您可能需要修改您的方法。

在性能方面,Stream.collect 可能比使用 Stream.forEach 来添加到同步集合要快得多,因为 Streams 框架可以在每个线程中独立处理值的集合,无需同步,并且可以在最后以线程安全的方式合并结果。

英文:

ArrayList isn't thread safe. You need to do

List&lt;String&gt; strings = Collections.synchronizedList(new ArrayList&lt;&gt;());

or

List&lt;String&gt; strings = new Vector&lt;&gt;();

to ensure all updates are synchronized, or switch to

List&lt;String&gt; strings = src.parallelStream()
    .filter(integer -&gt; (integer % 2) == 0)
    .map(integer -&gt; integer + &quot;&quot;)
    .collect(Collectors.toList());

and leave the list building to the Streams framework. Note that it's undefined whether the list returned by collect is modifiable, so if that is a requirement, you may need to modify your approach.

In terms of performance, Stream.collect is likely to be much faster than using Stream.forEach to add to a synchronized collection, since the Streams framework can handle collection of values in each thread separately without synchronization and combine the results at the end in a thread safe fashion.

答案2

得分: 1

ArrayList 不具备线程安全性。一个线程可能看到一个有30个元素的列表,而另一个线程仍然可能看到只有29个元素,并覆盖第30个位置(导致丢失1个元素)。

当支持列表的数组需要调整大小时,可能会出现另一个问题。会创建一个新数组(大小加倍),并将原始数组中的元素复制到新数组中。而在此过程中,其他线程可能已经添加了一些内容,执行调整大小操作的线程可能没有看到这一点,或者多个线程正在调整大小,最终可能只有一个线程会成功。

在使用多个线程时,您需要在访问列表时进行同步处理,要么使用线程安全的列表(可以通过将其包装在SynchronizedList中,或者使用CopyOnWriteArrayList来实现,这是两种可能的解决方案之一)。更好的做法是使用流的collect方法将所有内容放入列表中。

英文:

ArrayList isn't thread-safe. While 1 thread sees a list with 30 elements another might still see 29 and override the 30th position (loosing 1 element).

Another issue might arise when the array backing the list needs to be resized. A new array (with double the size) is created and elements from the original array are copied into it. While other threads might have added stuff the thread doing the resizing might not have seen this or multiple threads are resizing and eventually only 1 will win.

When using multiple threads you need to either do some syncronized when accessing the list OR use a multi-thread safe list (by either wrapping it in a SynchronizedList or by using a CopyOnWriteArrayList to mention 2 possible solutions). Even better would be to use the collect method on the stream to put everything into a list.

答案3

得分: 1

ParallelStream与forEach结合使用时,如果不小心使用可能会产生严重问题。请查看以下注意事项,以避免任何错误:

  1. 如果您有一个现有的列表对象,希望从parallelStream循环中添加更多对象,请使用Collections.synchronizedList方法,在循环parallelStream之前将现有列表对象传递给它。

  2. 如果您需要创建一个新的列表,可以使用Vector在循环外部初始化列表。
    或者

  3. 如果您需要创建一个新的列表,只需使用parallelStream,在结束时收集输出。

英文:

ParallelStream with forEach is a deadly combo if not used carefully.
Please take a look at below points to avoid any bugs:

  1. If you have a preexisting list object in which you want to add more objects from a parallelStream loop, Use Collections.synchronizedList & pass that pre-existing list object to it before looping through the parallelstream.

  2. If you have to create a new list, then you can use Vector to initialize the list outside the loop.
    or

  3. If you have to create a new list, then simply use parallelStream and collect the output at the end.

答案4

得分: 0

当您尝试进行突变时,就会失去使用流(以及并行流)的好处。一般的原则是,在使用流时避免突变。Venkat Subramaniam解释了为什么。而是使用收集器(collectors)。此外,还要尽量在流式链中完成大量操作。例如:

System.out.println(
                IntStream.range(0, 200000)
                        .filter(i -> i % 2 == 0)
                        .mapToObj(String::valueOf)
                        .collect(Collectors.toList()).size()
        );

您可以通过添加 .parallel() 来在 parallelStream 中运行它。

英文:

You lose the benefits of using stream (and parallel stream) when you try to do mutation. As a general rule, avoid mutation when using streams. Venkat Subramaniam explains why. Instead, use collectors. Also try to get a lot accomplished within the stream chain. For example:

System.out.println(
                IntStream.range(0, 200000)
                        .filter(i -&gt; i % 2 == 0)
                        .mapToObj(String::valueOf)
                        .collect(Collectors.toList()).size()
        );

You can run that in parallelStream by adding .parallel()

huangapple
  • 本文由 发表于 2020年9月16日 15:04:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/63914806.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定