将一个并行数组流降低为单个数组。

huangapple go评论75阅读模式
英文:

reduce a parallel stream of arrays into a single array

问题

我试图将一个并行的数组流 Stream<ArrayList<T>> 缩减为一个单一的数组 ArrayList<T>,因此我使用了带有累加器和合并器的 reduce 方法,如下所示:

	public static void main(String [] args) {
		ArrayList&lt;String&gt; l1 = new ArrayList&lt;&gt;();
		l1.add(&quot;a1&quot;);
		l1.add(&quot;a2&quot;);
		
		List&lt;String&gt; l2 = new ArrayList&lt;&gt;();
		l2.add(&quot;a3&quot;);
		l2.add(&quot;a4&quot;);
				
		List&lt;List&lt;String&gt;&gt; l = new ArrayList&lt;&gt;();
		l.add(l1);
		l.add(l2);
		
		Stream&lt;List&lt;String&gt;&gt; stream = l.stream();
		join(stream).forEach(System.out::println);
}

private  static  &lt;T&gt; List&lt;T&gt; join(Stream&lt;List&lt;T&gt;&gt; stream) {
		return stream.parallel().reduce(new ArrayList&lt;&gt;(),  (total, element) -&gt; {
         	System.out.println(&quot;total: &quot; + total);
			System.out.println(&quot;element: &quot; + element);
			total.addAll(element);
			return total;
		},  (total1, total2) -&gt; {
            System.out.println(&quot;total1: &quot; + total1);
			System.out.println(&quot;total2: &quot; + total2);
			total1.addAll(total2);
			return total1;
		});
}

我知道合并器用于合并并行流,但它并没有按照我的预期工作,因为我得到了重复的结果,如下所示:

total: []
element: [a3, a4]
total: []
element: [a1, a2]
total1: [a3, a4, a1, a2]
total2: [a3, a4, a1, a2]
a3
a4
a1
a2
a3
a4
a1
a2

所以为什么会出现重复的结果?另外,在累加器中使用 ArrayList 是线程安全的吗?

英文:

i'm trying to reduce a parallel stream of arrays Stream<ArrayList<T>> into a single array ArrayList<T> so
i used the reduce method with accumulator and combiner as follows:-

	public static void main(String [] args) {
ArrayList&lt;String&gt; l1 = new ArrayList&lt;&gt;();
l1.add(&quot;a1&quot;);
l1.add(&quot;a2&quot;);
List&lt;String&gt; l2 = new ArrayList&lt;&gt;();
l2.add(&quot;a3&quot;);
l2.add(&quot;a4&quot;);
List&lt;List&lt;String&gt;&gt; l = new ArrayList&lt;&gt;();
l.add(l1);
l.add(l2);
Stream&lt;List&lt;String&gt;&gt; stream = l.stream();
join(stream).forEach(System.out::println);
}
private  static  &lt;T&gt; List&lt;T&gt; join(Stream&lt;List&lt;T&gt;&gt; stream) {
return stream.parallel().reduce(new ArrayList&lt;&gt;(),  (total, element) -&gt; {
System.out.println(&quot;total: &quot; + total);
System.out.println(&quot;element: &quot; + element);
total.addAll(element);
return total;
},  (total1, total2) -&gt; {
System.out.println(&quot;total1: &quot; + total1);
System.out.println(&quot;total2: &quot; + total2);
total1.addAll(total2);
return total1;
});
}

i knew the combiner is used to combine the parallel streams..but it is not working as i expected,
as i got duplicated results as follows:-

total: []
element: [a3, a4]
total: []
element: [a1, a2]
total1: [a3, a4, a1, a2]
total2: [a3, a4, a1, a2]
a3
a4
a1
a2
a3
a4
a1
a2

so why the result is duplicated ? also is it thread safe to use array list in accumulator ?

答案1

得分: 3

你应该只使用 flatMap

> 返回一个流,其中包含将此流的每个元素替换为应用所提供的映射函数到每个元素的映射流的内容的结果。每个映射流在其内容被放入此流后将被关闭。(如果映射流为 null,则使用空流代替。)
>
> 这是一个中间操作。

l.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); // 得到 [a1, a2, a3, a4]

或者

l.stream().flatMap(List::stream).collect(Collectors.toList());

你代码中的问题是你将函数式风格的代码与具有副作用的代码混合在一起。这不是一个好兆头。如果你移除副作用,输出将会如预期一样:

private static <T> List<T> join(Stream<List<T>> stream) {
    return stream.parallel().reduce(new ArrayList<>(), (total, element) -> {
        System.out.println("total: " + total);
        System.out.println("element: " + element);
        //total.addAll(element);
        //return total;
        var list = new ArrayList<>(total);
        list.addAll(element);
        return list;
    }, (total1, total2) -> {
        System.out.println("total1: " + total1);
        System.out.println("total2: " + total2);
        //total1.addAll(total2);
        //return total1;
        var list = new ArrayList<>(total1);
        list.addAll(total2);
        return list;
    });
}

除非你有明确而客观的理由,否则应避免使用 parallel()。并行化会带来开销,只有在需要执行大量工作时才会更加高效。否则,同步开销会成为比任何性能收益更大的负担。

英文:

You should just use flatMap:

> Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element. Each mapped stream is closed after its contents have been placed into this stream. (If a mapped stream is null an empty stream is used, instead.)
>
> This is an intermediate operation.

l.stream().flatMap(x -&gt; x.stream()).collect(Collectors.toList()); // is [a1, a2, a3, a4]

or

l.stream().flatMap(List::stream).collect(Collectors.toList());

The problem with your code is that you are mixing functional-style code with side-effects. That doesn't bode well. If you remove the side-effects, the output is as expected:

	private static &lt;T&gt; List&lt;T&gt; join(Stream&lt;List&lt;T&gt;&gt; stream) {
		return stream.parallel().reduce(new ArrayList&lt;&gt;(), (total, element) -&gt; {
			System.out.println(&quot;total: &quot; + total);
			System.out.println(&quot;element: &quot; + element);
			//total.addAll(element);
			//return total;
			var list = new ArrayList&lt;T&gt;(total);
			list.addAll(element);
			return list;
		}, (total1, total2) -&gt; {
			System.out.println(&quot;total1: &quot; + total1);
			System.out.println(&quot;total2: &quot; + total2);
			//total1.addAll(total2);
			//return total1;
			var list = new ArrayList&lt;T&gt;(total1);
			list.addAll(total2);
			return list;
		});
	}

You should also avoid using parallel() unless you have a clear, objective reason to. Parallelism is an overhead, and it only becomes more performant if there is heavy work to do. Otherwise, the synchronization overhead will be a bigger penalty than any gains.

huangapple
  • 本文由 发表于 2020年9月15日 03:05:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/63890446.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定