在使用并行流时出现了ArrayIndexOutOfBoundsException错误。

huangapple go评论74阅读模式
英文:

Getting ArrayIndexOutOfBoundsException when using parallel stream

问题

我在使用以下代码时偶尔遇到数组索引越界异常。有任何线索吗?数组的大小总是大约在29-30左右。

```lang-java
logger.info("devicetripmessageinfo大小:{}",deviceMessageInfoList.size());
deviceMessageInfoList.parallelStream().forEach(msg->{
    if(msg!=null && msg.getMessageVO()!=null)
    {
        
        DeviceTripMessageInfo currentDevTripMsgInfo = 
                        (DeviceTripMessageInfo) msg.getMessageVO();
        if(currentDevTripMsgInfo.getValueMap()!=null)
        {mapsList.add(currentDevTripMsgInfo.getValueMap());}
    }
});
java.lang.ArrayIndexOutOfBoundsException: null
        at java.base/jdk.internal.reflect.GeneratedConstructorAccessor26.newInstance(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:603)
        at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
        at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
        at com.*.*.*.*.worker.*.process(*.java:96)
        at com.*.jms.consumer.JMSWorker.processList(JMSWorker.java:279)
        at com.*.jms.consumer.JMSWorker.process(JMSWorker.java:244)
        at com.*.jms.consumer.JMSWorker.processMessages(JMSWorker.java:200)
        at com.*.jms.consumer.JMSWorker.run(JMSWorker.java:136)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ArrayIndexOutOfBoundsException: null
英文:

I am ending up with occasional array index out of bounds exception when using the following code. Any leads? The size of the array is always approximately around 29-30.

logger.info("devicetripmessageinfo size :{}",deviceMessageInfoList.size());
deviceMessageInfoList.parallelStream().forEach(msg->{
    if(msg!=null && msg.getMessageVO()!=null)
    {
        
        DeviceTripMessageInfo currentDevTripMsgInfo = 
                        (DeviceTripMessageInfo) msg.getMessageVO();
        if(currentDevTripMsgInfo.getValueMap()!=null)
        {mapsList.add(currentDevTripMsgInfo.getValueMap());}
    }
});
java.lang.ArrayIndexOutOfBoundsException: null
        at java.base/jdk.internal.reflect.GeneratedConstructorAccessor26.newInstance(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:603)
        at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
        at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
        at com.*.*.*.*.worker.*.process(*.java:96)
        at com.*.jms.consumer.JMSWorker.processList(JMSWorker.java:279)
        at com.*.jms.consumer.JMSWorker.process(JMSWorker.java:244)
        at com.*.jms.consumer.JMSWorker.processMessages(JMSWorker.java:200)
        at com.*.jms.consumer.JMSWorker.run(JMSWorker.java:136)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ArrayIndexOutOfBoundsException: null

答案1

得分: 4

概述

问题在于 ArrayList 的设计不适合多个线程同时进行修改,但是并行流在多个线程中对列表进行写操作。一个好的解决方案是切换到惯用的流实现:

List msgList = deviceMessageInfoList.parallelStream() // 声明泛型类型,例如 List<Map<String, Object>>
                .filter(Objects::nonNull)
                .map(m -> (DeviceTripMessageInfo) m.getMessageVO())
                .filter(Objects::nonNull)
                .map(DeviceTripMessageInfo::getValueMap)
                .filter(Objects::nonNull)
                .toList();

问题:并发修改

ArrayListJavadocs解释了并发修改的问题:

注意,此实现未同步。 如果多个线程同时访问一个 ArrayList 实例,并且至少有一个线程在结构上修改了列表,必须在外部进行同步。(结构修改是指添加或删除一个或多个元素,或显式调整支持数组的大小;仅设置元素的值不是结构修改。)这通常是通过在自然封装列表的某个对象上同步来实现的。如果没有这样的对象,应使用 Collections.synchronizedList 方法来“包装”列表。最好在创建时完成,以防止意外的非同步访问列表。

请注意,您看到的异常不是您可能会遇到的唯一不正确的行为。在针对大型列表的自己的代码测试中,该代码有时会在没有异常的情况下完成,但生成的列表只包含源列表中的一些元素。

另请注意,虽然从并行流切换到顺序流可能在实践中修复问题,但它取决于流实现,而不是 API 保证的。因此,这种方法是不可取的,因为它可能在库的未来版本中出现问题。根据 forEach 的Javadocs

对于任何给定的元素,操作可以在库选择的任何时间和任何线程中执行。如果操作访问共享状态,则应提供所需的同步。

问题:不符合惯用法

除了正确性问题之外,这种方法的另一个问题是在流代码中使用副作用并不是特别惯用。流文档明确不鼓励这样做。

流操作的行为参数中的副作用通常是不鼓励的,因为它们往往会导致无意识违反无状态要求以及其他线程安全风险。

[...]

许多情况下,人们可能会倾向于使用副作用的计算可以更安全、更高效地在没有副作用的情况下表达,例如使用规约来代替可变累加器。

特别要注意的是,文档继续描述了此问题中发布的确切场景,即在流中不适当使用副作用:

作为不适当地使用副作用转换流管道以不使用副作用的示例,以下代码在字符串流中搜索与给定正则表达式匹配的字符串,并将匹配项放入列表中。

     ArrayList<String> results = new ArrayList<>();
     stream.filter(s -> pattern.matcher(s).matches())
           .forEach(s -> results.add(s));  // 不必要的副作用!

此代码不必要地使用了副作用。如果并行执行,ArrayList 的非线程安全性将导致不正确的结果,而添加所需的同步将导致竞争,削弱并行性的好处。

旁注:传统非流解决方案

此外,这指向了一种可能使用传统非流代码的解决方案。我会简要讨论它,因为了解并发列表修改问题的传统解决方案很有帮助。传统上,可以将 ArrayList 替换为使用 Collections.synchronizedList 进行包装的同步版本,或者使用诸如 ConcurrentLinkedQueue 这样的固有并发集合类型。由于这些方法是为并发插入而设计的,它们解决了并行插入问题,尽管可能会带来额外的同步争用开销。

流解决方案

流文档继续提供了一个替代不适当使用副作用的方法:

此外,在这里使用副作用是完全不必要的;forEach() 可以被更安全、更高效且更适于并行化的规约操作所取代:

     List<String> results =
         stream.filter(s -> pattern.matcher(s).matches())
               .toList();  // 没有副作用!

将这种方法应用于您的代码,您将得到:

List msgList = deviceMessageInfoList.parallelStream() // 声明泛型类型,例如 List<Map<String, Object>>
                .filter(Objects::nonNull)
                .map(m -> (DeviceTripMessageInfo) m.getMessageVO())
               

<details>
<summary>英文:</summary>

## Summary

The problem is that `ArrayList` is by design not safe for modification by multiple threads concurrently, but the parallel stream is writing to the list from multiple threads.  A good solution is to switch to an idiomatic stream implementation:

```lang-java
List msgList = deviceMessageInfoList.parallelStream() // Declare generic type, e.g. List&lt;Map&lt;String, Object&gt;&gt;
                .filter(Objects::nonNull)
                .map(m -&gt; (DeviceTripMessageInfo) m.getMessageVO())
                .filter(Objects::nonNull)
                .map(DeviceTripMessageInfo::getValueMap)
                .filter(Objects::nonNull)
                .toList();

Issue: concurrent modification

The ArrayList Javadocs explain the concurrent modification issue:

> Note that this implementation is not synchronized. If multiple threads access an ArrayList instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally. (A structural modification is any operation that adds or deletes one or more elements, or explicitly resizes the backing array; merely setting the value of an element is not a structural modification.) This is typically accomplished by synchronizing on some object that naturally encapsulates the list. If no such object exists, the list should be "wrapped" using the Collections.synchronizedList method. This is best done at creation time, to prevent accidental unsynchronized access to the list

Note that the exception you're seeing is not the only incorrect behavior you might encounter. In my own tests of your code against large lists, the code sometimes completed without exception, but the resulting list contained only some of the elements from the source list.

Note that while switching from a parallel stream to a sequential stream would likely fix the issue in practice, it is dependent on the stream implementation, and not guaranteed by the API. Therefore, such an approach is inadvisable, as it could break in future versions of the library. Per the forEach Javadocs:

> For any given element, the action may be performed at whatever time and in whatever thread the library chooses. If the action accesses shared state, it is responsible for providing the required synchronization.

Issue: not idiomatic

Aside from the correctness issue, another issue with this approach is that it's not particularly idiomatic to use side effects within stream code. The stream documentation explicitly discourages them.

> Side-effects in behavioral parameters to stream operations are, in general, discouraged, as they can often lead to unwitting violations of the statelessness requirement, as well as other thread-safety hazards.
>
> [...]
>
> Many computations where one might be tempted to use side-effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators.

Of particular note, the documentation goes on to describe the exact scenario posted in this question as an inappropriate use of side-effects in a stream:

> As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.
>
> lang-java
&gt; ArrayList&lt;String&gt; results = new ArrayList&lt;&gt;();
&gt; stream.filter(s -&gt; pattern.matcher(s).matches())
&gt; .forEach(s -&gt; results.add(s)); // Unnecessary use of side-effects!
&gt;

>
> This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism.

Aside: traditional non-stream solution

As an aside, this points to a solution one might use using traditional non-stream code. I will discuss it briefly, since it's helpful to understand traditional solutions to the issue of concurrent list modification. Traditionally, one might replace the ArrayList with either a wrapped syncnhronized version using Collections.synchronizedList or an inherently concurrent collection type such as ConcurrentLinkedQueue. Since these approaches are designed for concurrent insertion, they solve the parallel insert issue, though possibly with additional synchronization contention overhead.

Stream solution

The stream documentation continues on with a replacement for the inappropriate use of side effects:

> Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:
>
>lang-java
&gt; List&lt;String&gt;results =
&gt; stream.filter(s -&gt; pattern.matcher(s).matches())
&gt; .toList(); // No side-effects!
&gt;

Applying this approach to your code, you get:

List msgList = deviceMessageInfoList.parallelStream() // Declare generic type, e.g. List&lt;Map&lt;String, Object&gt;&gt;
                .filter(Objects::nonNull)
                .map(m -&gt; (DeviceTripMessageInfo) m.getMessageVO())
                .filter(Objects::nonNull)
                .map(DeviceTripMessageInfo::getValueMap)
                .filter(Objects::nonNull)
                .toList();

答案2

得分: 1

即使你将它改为synchronized(或者更确切地说是线程安全的List),根据你目前的方法,仍然不能保证元素放入的顺序。文档中非常明确地通过forEach来避免这种情况,详见此处。只需查找“副作用”。

这整个过程可以通过更好的方式来完成(而且阅读起来更容易):

deviceMessageInfoList
     .stream()
     .parallel()
     .filter(Objects::notNull)
     .map(x -> x.getMessageVO())
     .filter(Objects::notNull) 
     .map(x -> (DeviceTripMessageInfo) x.getMessageVO())
     .map(DeviceTripMessageInfo::getValueMap)
     .filter(Objects::notNull)
     .collect(Collectors.toList());
英文:

Even if you change that to a synchronized (or better said a thread-safe List), with your current approach, you still don't have a guaranteed order of how the elements are going to be put in. The documentation, btw, is very clear to discourage such things via forEach, here. Just look-up Side-Effects.

This entire thing can be done in far better way (and easier to read too):

 deviceMessageInfoList
      .stream()
      .parallel()
      .filter(Objects::notNull)
      .map(x -&gt; x.getMessageVO())
      .filter(Objects::notNull) 
      .map(x -&gt; (DeviceTripMessageInfo) x.getMessageVO())
      .map(DeviceTripMessageInfo::getValueMap)
      .filter(Objects::notNull)
      .collect(Collectors.toList());

huangapple
  • 本文由 发表于 2020年10月14日 20:27:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/64353250.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定