多个筛选器内部的流处理

huangapple go评论74阅读模式
英文:

Stream multiple filter internals

问题

我正在尝试理解Java Stream API的内部调用。

我有以下代码,其中包含两个filter(中间操作)和一个终端操作。

IntStream.of(1,2,3)
	.filter(e->e%2==0)
	.filter(e->e==2)
	.forEach(e->System.out.println(e));

Stream -> 返回带有重写filter的Stream -> 返回带有重写filter的Stream -> 终端操作

我看到,对于每个中间操作,都会返回一个新的带有重写filter方法的流。一旦到达终端方法,流会执行filter。我看到,如果有两个filter操作而不是一个,则filter()会运行两次。

我想要理解一次流遍历如何能够调用两次filter。

以下是在Stream中的filter方法中命中的IntPipeline代码。

@Override
public final IntStream filter(IntPredicate predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
            return new Sink.ChainedInt<Integer>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(int t) {
                    if (predicate.test(t)) ///line 11
                        downstream.accept(t);
                }
            };
        }
    };
}

filter()返回一个新的Stream,其谓词设置为e%2==0,然后再返回一个新的Stream,其谓词为e==2。一旦命中终端操作,对于每次遍历,谓词的代码会在第11行执行。

编辑:我看到downstream用于将中间操作链接为LinkedList。因此,所有实现都作为前一阶段添加到linkedlist中,并在遍历开始时调用?

英文:

I am trying to understand internal calls of Stream API of Java.

I have the following code, which has two filter (intermediate) operations and one terminal operation.

IntStream.of(1,2,3)
	.filter(e-&gt;e%2==0)
	.filter(e-&gt;e==2)
	.forEach(e-&gt;System.out.println(e));

Stream - > returns Stream with overridden filter - > returns Stream with overridden filter - > terminal

I see that for each intermediate operation a new stream is returned with overridden filter method. Once it hits the terminal method, the stream executes the filter. I see that filter() is being run twice if there are two filter operations instead of once.

I want to understand how one stream traversal is able to call filter twice.

Pasting the IntPipeline code below which is hit for the filter method in Stream.

@Override
public final IntStream filter(IntPredicate predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp&lt;Integer&gt;(this, StreamShape.INT_VALUE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink&lt;Integer&gt; opWrapSink(int flags, Sink&lt;Integer&gt; sink) {
            return new Sink.ChainedInt&lt;Integer&gt;(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(int t) {
                    if (predicate.test(t)) ///line 11
                        downstream.accept(t);
                }
            };
        }
    };
}

The filter() returns a new Stream whose predicate is set as e%2==0 and then again a new Stream is returned whose predicate is e==2. Once the terminal operation is hit the, for each traversal the predicates code is executed then at line 11.

Edit : I see that downstream is used to link the intermediate ops as a LinkedList. So all the implementations are added to the linkedlist as previous stage and called once traversal starts?

答案1

得分: 6

我认为你对流(Streams)的理解中存在一些负担和混合概念。

  1. 你的困惑与filter(或任何其他操作)的实现无关;也不存在覆盖中间(或任何其他)流操作的概念;

  2. 每个单独的流都是一个单一的流水线,它有(1)开始,(2)可选的中间部分和(3)结束/结论

  3. 中间操作不会被覆盖;相反,它们形成一个操作顺序链,流的每个元素都必须经过这些操作链(除非在某个中间操作中丢弃了元素),按照相同的顺序。

    <sup>将K个对象的流想象成一个管道,K的实例将通过该管道。该管道有一个起点/源(对象进入管道的地方)和终点/目的地;然而,在两者之间,可能有几个中间操作,这些操作将对这些对象进行过滤、转换、装箱等。流是惰性的,这意味着中间操作直到终端操作被调用时才会执行(这是流的一个很棒的特性),因此,每个项在终端操作被调用时将逐个通过此管道。</sup>

此外,请阅读此片段

> 为了执行计算,流操作被组合成流水线。流水线由源(可以是数组、集合、生成器函数、I/O通道等)、零个或多个中间操作(将流转换为另一个流,例如*filter(Predicate))以及终端操作(生成结果或副作用,例如count()* 或 forEach(Consumer))组成。流是惰性的;只有在启动终端操作时才会对源数据进行计算,并且仅在需要时才会消耗源元素。

记住这个范例,流由以下部分组成:

  1. 源操作 - 从一些对象集合中获取流的操作,或表示现有的流对象(图像中的transactions);
  2. 中间操作s - 可能是0个或多个,需要引入中间的细粒度、调整或修改流的操作(图像中的filtersortedmap);
  3. 终端操作 - 终止并关闭流。它可以产生(返回)一些东西,消耗(接受)一些东西,两者都可以,或者都不可以(图像中的collect)。

如果你仍然感到困惑(现在应该不会这样),你还可以额外参考一些重要观点,这些观点可能会进一步解决你的困惑:

> 中间操作会返回一个新的流。它们总是惰性的;执行诸如filter()之类的中间操作实际上并不执行任何过滤操作,而是创建一个新的流,该流在遍历时包含与给定谓词匹配的初始流的元素。在执行流水线的终端操作之前,不会开始遍历管道源;

> 惰性处理允许实现显著的效率;在诸如上述filter-map-sum示例的流水线中,过滤、映射和求和可以融合成对数据的单次传递,中间状态最小。惰性还允许在不必要时避免检查所有数据;对于诸如“查找第一个长度大于1000个字符的字符串”的操作,只需要检查足够多的字符串,以找到具有所需特征的一个字符串,而无需检查源中的所有字符串。(当输入流是无限的而不仅仅是大型的时,这种行为变得更加重要。)

英文:

I think you are having some burden and mixed concepts in your understanding of Streams.

  1. Your confusion has nothing to do with filter (or any other operation) implementation; nor does there exist a concept of overriding an intermediate (or any other) stream operation;

    <sup>Overriding is a completely different concept and it is related to Inheritance</sup>

  2. Every single stream is a single pipeline, which has a (1) beginning, (2) optionally an intermediate part, and (3) the end/conclusion;

  3. Intermediate operations are not overridden; they, instead, form a sequential chain of operations, which every element of the stream will have to go through (unless an element is discarded at some intermediate operation), in the order of same sequence.

    > <sup>Think of Stream of K objects, as a pipe, in which instances of K will go through. That pipe has a beginning/source (where objects are entering the pipe), and the end/destination; however, in between, there might be several intermediate operations which will (either) filter, transform, box, or etc. those objects. Streams are lazy, meaning, that intermediate operations are not performed until the terminal operation is invoked (this is a great feature of streams), so, each item, one at a time, will go through this pipe, when the terminal operation is invoked.</sup>

Additionally, read this snippet:

> To perform a computation, stream operations are composed into a stream pipeline. A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream, such as filter(Predicate)), and a terminal operation (which produces a result or side-effect, such as count() or forEach(Consumer)). Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.

Remember the paradigm, that Stream consists of:

  1. Sourcer operation - one that obtains stream from some set of some objects, or represents an existing stream object (transactions on the picture below);
  2. Intermediate operations - that could be 0 or more, and that are needed to introduce intermediary fine-graining, adjusting or modifying operations on your stream (filter, sorted, and map on the picture below);
  3. Terminal operation - that terminates and closes stream. It can produce (return) something, consume (accept) something, neither, or both (collect on the picture below).

多个筛选器内部的流处理

<br>

If you are still confused (which should not be the case now), you can additionally refer to few important points that may shed some more light on your confusions:
> Intermediate operations return a new stream. They are always lazy; executing an intermediate operation such as filter() does not actually perform any filtering, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate. Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed;

> Processing streams lazily allows for significant efficiencies; in a pipeline such as the filter-map-sum example above, filtering, mapping, and summing can be fused into a single pass on the data, with minimal intermediate state. Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source. (This behavior becomes even more important when the input stream is infinite and not merely large.)

答案2

得分: 2

以下是您提供的内容的翻译部分:

首先,我会尽力解释流式 API 背后的发生情况,首先,您应该改变自己过去的编程方式,尝试理解这个新思想。

所以,为了有一个现实世界的例子,想象一个工厂(我指的是真实世界中的实际工厂,而不是工厂设计模式),在一个工厂里,我们有一些原材料和一些连续的过程,在不同的阶段将原材料转化为成品。为了理解这个概念,看看下面的图表:

(阶段1)原材料 -> (阶段2)处理输入并将输出传递到下一个阶段 -> (阶段3)处理输入并将输出传递到下一个阶段 -> ......

因此,第一个阶段的输出是原材料,所有后续阶段对其输入进行一些处理并将其传递(例如,它可以将阶段的输入转换为其他内容,或者完全拒绝掉输入,因为质量较差),然后交付输出给位于前面的另一个阶段。从现在开始,我们将这些连续的阶段统称为“流水线”。

处理是什么?可以是任何事情,例如,一个阶段可以决定将输入完全转换为其他内容并将其传递(这正是流式 API 中的 map 所提供的),另一个阶段可以基于某些条件允许输入继续传递(这正是流式 API 中的 filter 所做的)。

Java 流式 API 做的事情与工厂类似。每个流实际上是一个流水线,您可以为每个流水线添加另一个阶段并创建一个新的流水线,所以当您写下 IntStream.of(1,2,3) 时,您已经创建了一个流水线,即 IntStream,因此让我们分解您的代码:

IntStream intStream = IntStream.of(1,2,3);

这相当于我们工厂的原材料,因此它是一个只有一个阶段的流水线。但是,一个只传递原材料的流水线没有任何好处。让我们向前一个流水线添加另一个阶段并创建一个新的流水线:

IntStream evenNumberPipeline = intStream.filter(e -> e % 2 == 0);

请注意,在这里您创建了一个新的流水线,而这个流水线正是前一个流水线加上另一个阶段,该阶段只允许偶数通过,并拒绝其他数字。当您调用过滤器方法时,代码的以下部分实际上创建了一个新的流水线:

@Override
public final IntStream filter(IntPredicate predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
            return new Sink.ChainedInt<Integer>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(int t) {
                    if (predicate.test(t)) ///line 11
                        downstream.accept(t);
                }
            };
        }
    };
}

您可以看到,过滤器返回了 StatelessOp<Integer> 的新实例,它扩展了 IntPipeline,如下所示:

abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN>

让我们暂停一下,问一个问题:到目前为止是否进行了任何操作?答案是否定的,当您创建一个工厂或者工厂的流水线时,尚未生产任何产品。您应该向流水线提供原材料,以便在工厂中获得成品,但我们迄今为止还没有这样做。因此,当我们在流中调用过滤器和其他操作时,实际上只是在设计流水线过程,我们并没有真正处理任何东西,我们只是向流水线添加另一个阶段,并告诉它在给定输入时应该执行这个过程。

在我们的例子中,我们向工厂添加了第二个阶段,并告诉它在给定输入时检查它是否为偶数,然后在只有 2 的情况下才允许它继续传递。现在,让我们向我们的流水线添加另一个阶段:

IntStream onlyTwo = evenNumberPipeline.filter(e -> e == 2);

在这里,您创建了一个新的流水线,获取了先前的流水线(即 evenNumberPipeline),并在该流水线中添加了另一个阶段(evenNumberPipeline 并没有改变,我们创建了一个包含 evenNumberPipeline 的新流水线)。让我们看一下到目前为止的流水线:

原材料(阶段1) -> 过滤偶数(阶段2) -> 过滤仅为 2(阶段3)

将其视为流水线中各阶段的定义,而不是操作,也许我们还没有原材料,但我们可以设计工厂,以便以后可以提供原材料。您可以看到,此流水线有三个阶段,每个阶段对前一个阶段的输出进行一些处理。流水线将逐一提供原材料(暂时忘记并行流),因此当您向此流水线提供 1 作为原材料时,它将经过这些阶段。每个阶段都是

英文:

I will try to do my best to explain what is happening behind the scene in Stream API, First of all you should change your mind about how you have been programming so far, try to get this new idea.

So to have an example of real world imagine factory( I mean real factory in real world not factory design pattern), in a factory we have some raw material and some consecutive processes in different stages that turn the raw material into finished product. to have a grasp of this concept see the following diagram:

(stage1)raw material -> (stage2) process the input and pass the output to the next stage -> (stage3) process the input and pass the output to the next stage -> .....

So the output of the first stage is the raw material and all subsequent stages do some processing on their input and pass it along(for example it can turn the input of the stage to something else or rejects that input completely because of its low quality) then after it will hand over the output to another stage which is in front of it. from now on we call this consecutive stages altogether a Pipeline.

> what is processing?, it can be anything, for instance one stage can decide to turn the input to completely different thing and pass it along( this is exactly what map provides us with in Stream API), another stage may allow input to pass along based on some condition(this is exactly what filter does in Stream API).

Java Stream API does something similar to a factory. Each Stream is exactly a pipeline and you can add another stage to each pipeline and create a new pipeline, so when you write IntStream.of(1,2,3) you have created a pipeline which is IntStream so let's break down your code:

IntStream intStream = IntStream.of(1,2,3)

This is the equivalent to raw material in our factory, therefore it is a pipeline which has only one stage. However having a pipeline which only passes the raw material has no benefit. Let's add another stage to the previous pipeline and create a new Pipeline:

IntStream evenNumbePipeline = intStream.filter(e -&gt; e%2==0);

Please note that here you create new Pipeline and this pipeline is exactly the previous one plus another stage that allow only even number pass through and rejects other ones. When you call filter method following portion of the code create exactly a new Pipeline:

@Override
public final IntStream filter(IntPredicate predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp&lt;Integer&gt;(this, StreamShape.INT_VALUE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink&lt;Integer&gt; opWrapSink(int flags, Sink&lt;Integer&gt; sink) {
            return new Sink.ChainedInt&lt;Integer&gt;(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(int t) {
                    if (predicate.test(t)) ///line 11
                        downstream.accept(t);
                }
            };
        }
    };
}

You can see filter return new instance of StatelessOp&lt;Integer&gt; which extends IntPipeline as below:

abstract static class StatelessOp&lt;E_IN&gt; extends IntPipeline&lt;E_IN&gt; 

Let's stop here for a moment with a question: Has any operation done so far? The answer is a BIG NO, when you create a factory or pipeline of the factory no product has been produced yet, You should provide the raw material to the pipeline to get your finished product by factory, but we haven't done it so far. so when we are calling filter and other operation in stream we are just designing our pipeline procedures and we don't really process anything, we are just adding another stage to our pipeline and say hey when you are given the input you should do this procedure on it,

In our case we add stage2 to our factory and tell it when you are given the input check if it is even or not and then after allow it to pass along if it is an even number. We are reaching to your question now let's add another stage to our pipeline:

IntStream onlyTwo = evenNumbePipeline.filter(e -&gt; e==2);

Here you create new pipeline which get the previous pipeline (which is evenNumbePipeline) and add another stage to that pipeline(evenNumbePipeline has not changed, we create new pipeline which has evenNumbePipeline inside it). Let's take a look at our pipeline so far:

raw material(stage1) -&gt; filter even number(stage2) -&gt; filter only 2(stage3)

Think about it as the definition of the stages in our pipeline and not operation, maybe we don't have raw material yet but we can design our factory so that we can provide it with raw material later. You can see this pipeline has three stages and each stage does something to the previous stage output. Pipeline will be provided by raw material one by one(forget about parallel stream fro now) so when you provide 1 as raw material to this pipeline it goes through these stages. Each of this stage is new Object in java.

So lets get to something you said in your question

> I want to understand how one stream traversal is able to call filter
> twice.

From what we have investigated so far do you think traversal of the Stream has called filter method twice or when we created our pipeline we have called filter method twice?

We call this filter method twice because we want to have two different stages in our pipeline. Think about a factory we call filter method twice because we want to have two different filter stages in our factory when we are designing it. We have not traversed the factory stages yet and we have not produced any finished product yet.

Let's have some fun and produce some output:

onlyTwo.forEach(e -&gt; System.out.println(e));

As foreach is a terminal operation, it starts our factory and provides raw material to our factory's pipeline. So for instance 1 go through stage2 then stage3 and then delivered to foreach statement.

But there is another question: how can we define what each stage does when we are designing our pipeline?

In our example when we were designing our pipeline we called filter method to create new stage and pass to it something that filter stage should do as a parameter the name of this parameter is predicate. The operation that exactly executed when input received(by each stage) is define by opWrapSink method in each stage. Therefore we have to implement this method when we are creating our stage so let's get back to our filter method where Stream classs create new Stage for us:

@Override
public final IntStream filter(IntPredicate predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp&lt;Integer&gt;(this, StreamShape.INT_VALUE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink&lt;Integer&gt; opWrapSink(int flags, Sink&lt;Integer&gt; sink) {
            return new Sink.ChainedInt&lt;Integer&gt;(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(int t) {
                    if (predicate.test(t)) ///line 11
                        downstream.accept(t);
                }
            };
        }
    };
}

You can see each stage's opWrapSink methods return a Sink but what's the Sink?

To put away lots of complexity in this interface it is a consumer and has an accept method as below(it has also has many other accept method for primitive types to avoid unnecessary boxing and unboxing) :

void accept(T t);

When you are implementing this interface you should define what you want to do with the input value that you will be delivered as input in the stage. You don't need to implement this interface in your program because methods in Stream implementations have done the heavy lifting for you. Let's see how it has been implemented in filter case:

@Override
Sink&lt;Integer&gt; opWrapSink(int flags, Sink&lt;Integer&gt; sink) {
    return new Sink.ChainedInt&lt;Integer&gt;(sink) {
        @Override
        public void begin(long size) {
            downstream.begin(-1);
        }

        @Override
        public void accept(int t) {
            if (predicate.test(t)) ///line 11
                downstream.accept(t);
        }
    };
}

Stream framework provides opWrapSink method with the next stage Sink(as the second parameter when calling this method), it means we know how the next stage in our Pipeline do their job(with the help of this Sink) but we should provide them with an input, it is crystal clear that the input to the next stage is the output of the current stage. another parameter that we need to produce the output of the current stage is the input to our current stage.

> input to our current stage -> do operation of the current stage on the
> input -> pass the output to the next stages(following stages in the
> pipeline)

So in the accept method we have input to our current stage as parameter t we should do something on this input(as the operation of our current stage on the input) and then pass it to the next stage. In our filter stage we need to check if the input to our stage which is t passes a predicate(which in our case is e%2==0) and then after we should pass it to the next stage Sink. And here is exactly what our accept method do(downstream is exactly the Sink for the following stages in the pipeline):

@Override
public void accept(int t) {
    if (predicate.test(t)) ///line 11
        downstream.accept(t);
}

What you should notice in this accept method implementation is that it only passes the input of the current stage(which is t) to the next stages if it passes a predicate(which in our case is e%2==0) and if it does not passes the predicate it does not pass it through(It is exactly what we expect from the filtering stage to do);

答案3

得分: 0

filter方法未被覆盖,但它被用不同的lambda表达式调用。

如果你将lambda表达式“展开”为匿名类,你会得到类似这样的代码:

IntStream.of(1,2,3)
        .filter(new IntPredicate() {
            @Override
            public boolean test(int e) {
                return e % 2 == 0;
            }
        })
        .filter(new IntPredicate() {
            @Override
            public boolean test(int e) {
                return e == 2;
            }
        })
        .forEach(new IntConsumer() {
            @Override
            public void accept(int e) {
                System.out.println(e);
            }
        });

你可以看到,IntStream.of(int...)是一种工厂方法,它使用给定的值创建了一个IntStream的实现。
由于filter方法再次返回一个IntStream,你可以进行链式调用并再次调用该实例的filter方法。已经定义每次都会获得一个新的实例(“Intermediate operations return a new stream.”)。

实现可以保持不变,因此无需覆盖filter方法。

实际上,在我机器上安装的Java 8版本(OpenJDK 1.8.0_181)中,反编译的实现显示java.util.stream.IntStream.of(int...)委托给java.util.Arrays.stream(int[]),而后者实际上返回了java.util.stream.IntPipeline.Head的实例。 IntPipeline扩展java.util.stream.AbstractPipelineAbstractPipeline扩展java.util.stream.PipelineHelper,它没有继承任何其他类(只有Object,但我不会计算它;-))

因此,java.util.LinkedList没有在过滤部分进行扩展或修改,也许它在foreach方法中起作用,但我没有深入研究,因为我认为你更关心过滤部分。

我使用IntelliJ IDEA自动将lambda表达式替换为匿名类,并进行了Java实现的反编译 - 如果你对Java实现的更多细节感兴趣,可以查看一下(https://www.jetbrains.com/idea/,免费的社区版就足够进行这种研究)。

英文:

The filter method is not overridden, but it is called with different lambda expressions.

If you "explode" the lambda expressions to anonymous classes, you will get something like this:

IntStream.of(1,2,3)
        .filter(new IntPredicate() {
            @Override
            public boolean test(int e) {
                return e % 2 == 0;
            }
        })
        .filter(new IntPredicate() {
            @Override
            public boolean test(int e) {
                return e == 2;
            }
        })
        .forEach(new IntConsumer() {
            @Override
            public void accept(int e) {
                System.out.println(e);
            }
        });

You see that InStream.of(int...) is a kind of factory method, that creates an implementation of an IntStream with the given values.
Since the filter method gives back an IntStream again, you can do chaining and call filter of that instance again. It is defined that you get a new instance here everytime ("Intermediate operations return a new stream.").

The implementation can be the same, so filter does not have to be overridden.

In fact, in the Java 8 version currently installed here on my machine (OpenJDK 1.8.0_181), decompiling the implementations show that java.util.stream.IntStream.of(int...) delegates to java.util.Arrays.stream(int[]) and that returns actually an instance of java.util.stream.IntPipeline.Head. IntPipeline extends java.util.stream.AbstractPipeline, AbstractPipeline extends java.util.stream.PipelineHelper and that extends nothing (but Object, I wouldn't count that 多个筛选器内部的流处理 )

So java.util.LinkedList is not extended or modified for the filtering part, maybe it comes into play in the foreach method, but I didn't investigate that far as I thought you were more interested to the filtering.

I used IntelliJ IDEA to automatically replace the lambda expressions with anonymous classes and also do the decompiling stuff of the Java implementations - if you are interested in more details of the Java implementation you might give it a look (https://www.jetbrains.com/idea/, the free community edition is by far sufficient for that kind of research)

huangapple
  • 本文由 发表于 2020年10月10日 18:42:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/64292518.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定