
huangapple go评论97阅读模式

Getting ArrayIndexOutOfBoundsException when using parallel stream


  1. 我在使用以下代码时偶尔遇到数组索引越界异常。有任何线索吗?数组的大小总是大约在29-30左右。
  2. ```lang-java
  3. logger.info("devicetripmessageinfo大小:{}",deviceMessageInfoList.size());
  4. deviceMessageInfoList.parallelStream().forEach(msg->{
  5. if(msg!=null && msg.getMessageVO()!=null)
  6. {
  7. DeviceTripMessageInfo currentDevTripMsgInfo =
  8. (DeviceTripMessageInfo) msg.getMessageVO();
  9. if(currentDevTripMsgInfo.getValueMap()!=null)
  10. {mapsList.add(currentDevTripMsgInfo.getValueMap());}
  11. }
  12. });
  1. java.lang.ArrayIndexOutOfBoundsException: null
  2. at java.base/jdk.internal.reflect.GeneratedConstructorAccessor26.newInstance(Unknown Source)
  3. at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  4. at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
  5. at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:603)
  6. at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
  7. at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
  8. at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
  9. at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
  10. at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
  11. at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
  12. at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
  13. at com.*.*.*.*.worker.*.process(*.java:96)
  14. at com.*.jms.consumer.JMSWorker.processList(JMSWorker.java:279)
  15. at com.*.jms.consumer.JMSWorker.process(JMSWorker.java:244)
  16. at com.*.jms.consumer.JMSWorker.processMessages(JMSWorker.java:200)
  17. at com.*.jms.consumer.JMSWorker.run(JMSWorker.java:136)
  18. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  19. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  20. at java.base/java.lang.Thread.run(Thread.java:834)
  21. Caused by: java.lang.ArrayIndexOutOfBoundsException: null

I am ending up with occasional array index out of bounds exception when using the following code. Any leads? The size of the array is always approximately around 29-30.

  1. logger.info("devicetripmessageinfo size :{}",deviceMessageInfoList.size());
  2. deviceMessageInfoList.parallelStream().forEach(msg->{
  3. if(msg!=null && msg.getMessageVO()!=null)
  4. {
  5. DeviceTripMessageInfo currentDevTripMsgInfo =
  6. (DeviceTripMessageInfo) msg.getMessageVO();
  7. if(currentDevTripMsgInfo.getValueMap()!=null)
  8. {mapsList.add(currentDevTripMsgInfo.getValueMap());}
  9. }
  10. });
  1. java.lang.ArrayIndexOutOfBoundsException: null
  2. at java.base/jdk.internal.reflect.GeneratedConstructorAccessor26.newInstance(Unknown Source)
  3. at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  4. at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
  5. at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:603)
  6. at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
  7. at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
  8. at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
  9. at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
  10. at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
  11. at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
  12. at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
  13. at com.*.*.*.*.worker.*.process(*.java:96)
  14. at com.*.jms.consumer.JMSWorker.processList(JMSWorker.java:279)
  15. at com.*.jms.consumer.JMSWorker.process(JMSWorker.java:244)
  16. at com.*.jms.consumer.JMSWorker.processMessages(JMSWorker.java:200)
  17. at com.*.jms.consumer.JMSWorker.run(JMSWorker.java:136)
  18. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  19. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  20. at java.base/java.lang.Thread.run(Thread.java:834)
  21. Caused by: java.lang.ArrayIndexOutOfBoundsException: null


得分: 4


问题在于 ArrayList 的设计不适合多个线程同时进行修改,但是并行流在多个线程中对列表进行写操作。一个好的解决方案是切换到惯用的流实现:

  1. List msgList = deviceMessageInfoList.parallelStream() // 声明泛型类型,例如 List<Map<String, Object>>
  2. .filter(Objects::nonNull)
  3. .map(m -> (DeviceTripMessageInfo) m.getMessageVO())
  4. .filter(Objects::nonNull)
  5. .map(DeviceTripMessageInfo::getValueMap)
  6. .filter(Objects::nonNull)
  7. .toList();



注意,此实现未同步。 如果多个线程同时访问一个 ArrayList 实例,并且至少有一个线程在结构上修改了列表,必须在外部进行同步。(结构修改是指添加或删除一个或多个元素,或显式调整支持数组的大小;仅设置元素的值不是结构修改。)这通常是通过在自然封装列表的某个对象上同步来实现的。如果没有这样的对象,应使用 Collections.synchronizedList 方法来“包装”列表。最好在创建时完成,以防止意外的非同步访问列表。


另请注意,虽然从并行流切换到顺序流可能在实践中修复问题,但它取决于流实现,而不是 API 保证的。因此,这种方法是不可取的,因为它可能在库的未来版本中出现问题。根据 forEach 的Javadocs









  1. ArrayList<String> results = new ArrayList<>();
  2. stream.filter(s -> pattern.matcher(s).matches())
  3. .forEach(s -> results.add(s)); // 不必要的副作用!

此代码不必要地使用了副作用。如果并行执行,ArrayList 的非线程安全性将导致不正确的结果,而添加所需的同步将导致竞争,削弱并行性的好处。


此外,这指向了一种可能使用传统非流代码的解决方案。我会简要讨论它,因为了解并发列表修改问题的传统解决方案很有帮助。传统上,可以将 ArrayList 替换为使用 Collections.synchronizedList 进行包装的同步版本,或者使用诸如 ConcurrentLinkedQueue 这样的固有并发集合类型。由于这些方法是为并发插入而设计的,它们解决了并行插入问题,尽管可能会带来额外的同步争用开销。



此外,在这里使用副作用是完全不必要的;forEach() 可以被更安全、更高效且更适于并行化的规约操作所取代:

  1. List<String> results =
  2. stream.filter(s -> pattern.matcher(s).matches())
  3. .toList(); // 没有副作用!


  1. List msgList = deviceMessageInfoList.parallelStream() // 声明泛型类型,例如 List<Map<String, Object>>
  2. .filter(Objects::nonNull)
  3. .map(m -> (DeviceTripMessageInfo) m.getMessageVO())
  4. <details>
  5. <summary>英文:</summary>
  6. ## Summary
  7. The problem is that `ArrayList` is by design not safe for modification by multiple threads concurrently, but the parallel stream is writing to the list from multiple threads. A good solution is to switch to an idiomatic stream implementation:
  8. ```lang-java
  9. List msgList = deviceMessageInfoList.parallelStream() // Declare generic type, e.g. List&lt;Map&lt;String, Object&gt;&gt;
  10. .filter(Objects::nonNull)
  11. .map(m -&gt; (DeviceTripMessageInfo) m.getMessageVO())
  12. .filter(Objects::nonNull)
  13. .map(DeviceTripMessageInfo::getValueMap)
  14. .filter(Objects::nonNull)
  15. .toList();

Issue: concurrent modification

The ArrayList Javadocs explain the concurrent modification issue:

> Note that this implementation is not synchronized. If multiple threads access an ArrayList instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally. (A structural modification is any operation that adds or deletes one or more elements, or explicitly resizes the backing array; merely setting the value of an element is not a structural modification.) This is typically accomplished by synchronizing on some object that naturally encapsulates the list. If no such object exists, the list should be "wrapped" using the Collections.synchronizedList method. This is best done at creation time, to prevent accidental unsynchronized access to the list

Note that the exception you're seeing is not the only incorrect behavior you might encounter. In my own tests of your code against large lists, the code sometimes completed without exception, but the resulting list contained only some of the elements from the source list.

Note that while switching from a parallel stream to a sequential stream would likely fix the issue in practice, it is dependent on the stream implementation, and not guaranteed by the API. Therefore, such an approach is inadvisable, as it could break in future versions of the library. Per the forEach Javadocs:

> For any given element, the action may be performed at whatever time and in whatever thread the library chooses. If the action accesses shared state, it is responsible for providing the required synchronization.

Issue: not idiomatic

Aside from the correctness issue, another issue with this approach is that it's not particularly idiomatic to use side effects within stream code. The stream documentation explicitly discourages them.

> Side-effects in behavioral parameters to stream operations are, in general, discouraged, as they can often lead to unwitting violations of the statelessness requirement, as well as other thread-safety hazards.
> [...]
> Many computations where one might be tempted to use side-effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators.

Of particular note, the documentation goes on to describe the exact scenario posted in this question as an inappropriate use of side-effects in a stream:

> As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.
> lang-java
&gt; ArrayList&lt;String&gt; results = new ArrayList&lt;&gt;();
&gt; stream.filter(s -&gt; pattern.matcher(s).matches())
&gt; .forEach(s -&gt; results.add(s)); // Unnecessary use of side-effects!

> This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism.

Aside: traditional non-stream solution

As an aside, this points to a solution one might use using traditional non-stream code. I will discuss it briefly, since it's helpful to understand traditional solutions to the issue of concurrent list modification. Traditionally, one might replace the ArrayList with either a wrapped syncnhronized version using Collections.synchronizedList or an inherently concurrent collection type such as ConcurrentLinkedQueue. Since these approaches are designed for concurrent insertion, they solve the parallel insert issue, though possibly with additional synchronization contention overhead.

Stream solution

The stream documentation continues on with a replacement for the inappropriate use of side effects:

> Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:
&gt; List&lt;String&gt;results =
&gt; stream.filter(s -&gt; pattern.matcher(s).matches())
&gt; .toList(); // No side-effects!

Applying this approach to your code, you get:

  1. List msgList = deviceMessageInfoList.parallelStream() // Declare generic type, e.g. List&lt;Map&lt;String, Object&gt;&gt;
  2. .filter(Objects::nonNull)
  3. .map(m -&gt; (DeviceTripMessageInfo) m.getMessageVO())
  4. .filter(Objects::nonNull)
  5. .map(DeviceTripMessageInfo::getValueMap)
  6. .filter(Objects::nonNull)
  7. .toList();


得分: 1



  1. deviceMessageInfoList
  2. .stream()
  3. .parallel()
  4. .filter(Objects::notNull)
  5. .map(x -> x.getMessageVO())
  6. .filter(Objects::notNull)
  7. .map(x -> (DeviceTripMessageInfo) x.getMessageVO())
  8. .map(DeviceTripMessageInfo::getValueMap)
  9. .filter(Objects::notNull)
  10. .collect(Collectors.toList());

Even if you change that to a synchronized (or better said a thread-safe List), with your current approach, you still don't have a guaranteed order of how the elements are going to be put in. The documentation, btw, is very clear to discourage such things via forEach, here. Just look-up Side-Effects.

This entire thing can be done in far better way (and easier to read too):

  1. deviceMessageInfoList
  2. .stream()
  3. .parallel()
  4. .filter(Objects::notNull)
  5. .map(x -&gt; x.getMessageVO())
  6. .filter(Objects::notNull)
  7. .map(x -&gt; (DeviceTripMessageInfo) x.getMessageVO())
  8. .map(DeviceTripMessageInfo::getValueMap)
  9. .filter(Objects::notNull)
  10. .collect(Collectors.toList());

  • 本文由 发表于 2020年10月14日 20:27:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/64353250.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
