在每个元素被消耗后,向Java Stream添加一个副作用?

huangapple go评论71阅读模式
英文:

Add a side-effect to Java Stream after each element is consumed?

问题

This is a simplified example but it illustrates the point.

Let's say I have a method defined like this:

Stream<String> generateStream() {
  return Stream.of("hello", "world"); // (S)
}

Is there a way to make S for example print something after an element has been consumed without the consumer of the stream having to do or know anything about it?

For example, I'd to modify S so that this:

generateStream().forEach(System.out::println)

actually prints out this to the console:

hello
consumed
world
consumed

Is this possible in Java 8 and if so how?

英文:

This is a simplified example but it illustrates the point.

Let's say I have a method defined like this:

Stream&lt;String&gt; generateStream() {
  return Stream.of(&quot;hello&quot;, &quot;world&quot;); // (S)
}

Is there a way to make S for example print something <i>after</i> an element has been consumed without the consumer of the stream having to do or know anything about it?

For example, I'd to modify S so that this:

generateStream().forEach(System.out::println)

actually prints out this to the console:

hello
consumed
world
consumed

Is this possible in Java 8 and if so how?

答案1

得分: 3

你可以使用 flatMap 与一个关闭动作:

Stream<String> generateStream() {
  return Stream.of("hello", "world")
      .flatMap(s -> Stream.of(s).onClose(() -> System.out.println(s + " consumed")));
}

它按预期工作:

generateStream().forEach(System.out::println);
hello
hello consumed
world
world consumed

即使在短路操作中也可以使用:

Optional<String> o = generateStream().filter(s -> s.startsWith("he")).findFirst();
o.ifPresent(s -> System.out.println("found " + s));
hello consumed
found hello

但请注意,与没有 flatMap 的操作相比,这可能对性能有巨大影响。但对于调试或性能无关紧要的情况可能会有帮助。

此外,请记住,被流管道消耗并不意味着终端操作不会保留对它的引用或停止访问它。

对于 forEach 操作,它是这样工作的,但例如对于 reduce((a,b) -> a),所有元素都会一个接一个地被消耗,但对第一个元素的引用会一直保留到最后,并且甚至作为最终结果返回。对于 min(comparator),可能会保留任何元素,直到找到一个更小的元素为止。最后,像 toArray() 这样的操作会保留并返回结果中的所有元素。

此外,有状态的操作可能会将后续的流水线处理与源流分离。例如,sorted 步骤可能会像 toArray 一样,缓冲所有元素,使它们在被源视为已消耗之前出现,然后对数组进行排序,并在继续流式处理数组时进行。同样,distinct 会保留对对象的引用,超出其消耗,并且在并行流中,它们的后续处理可能会被推迟到关闭源流之后的某个时间点。

英文:

You can use flatMap with a close action:

Stream&lt;String&gt; generateStream() {
  return Stream.of(&quot;hello&quot;, &quot;world&quot;)
      .flatMap(s -&gt; Stream.of(s).onClose(() -&gt; System.out.println(s+&quot; consumed&quot;)));
}

which works as intended:

generateStream().forEach(System.out::println);
hello
hello consumed
world
world consumed

even for short-circuiting operations:

Optional&lt;String&gt; o = generateStream().filter(s -&gt; s.startsWith(&quot;he&quot;)).findFirst();
o.ifPresent(s -&gt; System.out.println(&quot;found &quot;+s));
hello consumed
found hello

But note that this can have a dramatic impact on the performance, compared to an operation without flatMap. But for debugging purposes, or for cases where performance doesn’t matter, it might be helpful.

Further, keep in mind that being consumed by the Stream pipeline doesn’t imply that the terminal operation won’t keep a reference to it nor stop accessing it.

It works that way for the forEach operation, but e.g., for reduce((a,b) -&gt; a), all elements get consumed one after the other, but a reference to the first element will be kept until the end and even returned as final result. For min​(comparator), any element might be held until a smaller one has been encountered. And finally, operations like toArray() hold and return all elements in the result.

Also, stateful operations may detach the subsequent pipeline processing from the source stream. E.g. a sorted step may act like a toArray, buffering all elements making them appear as consumed to the source, before sorting the array and continuing by streaming over the array. Likewise, distinct will hold references to objects beyond their consumption and in a parallel stream, their subsequent processing might be deferred to a point after closing the source stream.

答案2

得分: 1

你可以在你的链中使用 peek 来消耗一个元素并返回一个 Stream。
forEach 在关闭流。

Stream.of("hello", "world")
    .peek(System.out::println)
    .forEach(word -> System.out.println("consumed"));

或者你可以在 forEach 中链接消费者以实现你期望的行为。
Stream 在 API 中没有明确的方法来实现你所要求的功能,
你可以使用 peek 来链接方法,而不是使用 forEach 来不结束流,或者在 forEach 中链接消费者。

Consumer<String> stringConsumer = System.out::println;
Consumer<String> endConsumed = string -> System.out.println("consumed");
Stream.of("hello", "world")
        .forEach(stringConsumer.andThen(endConsumed));

注意:以上代码中的 """ 应替换为双引号(")以使代码正常运行。

英文:

you can use peek in your chain to consume an element and return a Stream.
forEach is closing the stream


    Stream.of(&quot;hello&quot;, &quot;world&quot;)
        .peek(System.out::println)
        .forEach(word -&gt; System.out.println(&quot;consumed&quot;)); 

or you can chain consumers in foreach to have your expected behaviour.
Stream doesn't have an explicit method in the api for what you are asking,
you can chain methods using peek instead of foreach to not finish the stream, or chain consumer in foreach

    Consumer&lt;String&gt; stringConsumer = System.out::println;
    Consumer&lt;String&gt; endConsumed = string -&gt; System.out.println(&quot;consumed&quot;);
    Stream.of(&quot;hello&quot;, &quot;world&quot;)
            .forEach(stringConsumer.andThen(endConsumed));

huangapple
  • 本文由 发表于 2020年8月1日 17:29:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/63203709.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定