For Java streams, does generate + limit guarantee no additional calls to the generator function, or is there a preferred alternative?

huangapple go评论91阅读模式
英文:

For Java streams, does generate + limit guarantee no additional calls to the generator function, or is there a preferred alternative?

问题

我有一份数据源我知道它有 `n` 个元素我可以通过反复调用对象上的方法来访问它为了举例让我们称其为 `myReader.find()`。我想创建一个包含这些 `n` 个元素的数据流还假设我不想调用超过要返回的数据量的 `find()` 方法次数因为如果在达到数据末尾后调用该方法它将抛出异常例如 `NoSuchElementException`)。

我知道可以使用 [`IntStream.range`](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/stream/IntStream.html#range(int,int)) 方法创建此流,并使用 `find` 方法映射每个元素。然而,这感觉有点奇怪,因为我完全忽略了流中的 int 值(实际上我只是用它来产生一个恰好有 `n` 个元素的流)。

```java
return IntStream.range(0, n).mapToObj(i -> myReader.read());

我考虑过的一种方法是使用 Stream.generate(supplier),然后是 Stream.limit(maxSize)。根据我对 limit 函数的理解,这似乎应该可以工作。

Stream.generate(myReader::read).limit(n)

然而,在 API 文档的任何地方,我都没有看到 Stream.limit() 方法将保证由其调用的流生成恰好 maxSize 个元素的迹象。流实现允许调用生成器函数超过 n 次并不是不可行的,只要最终结果是前 n 次调用,只要它满足作为短路中间操作的 API 合同。

Stream.limit 的 JavaDocs

> 返回由此流的元素组成的流,其长度被截断为不超过 maxSize。
> 这是一种短路的有状态中间操作。

流操作和流水线文档

> 如果中间操作在面对无限输入时可能生成有限流,则它是短路的。[...] 在流水线中有一个短路操作是正常情况下使处理无限流在有限时间内终止的必要条件,但这还不够。

是否可以放心依赖于 Stream.generate(generator).limit(n) 仅对底层生成器进行 n 次调用?如果是的话,是否有一些我遗漏的关于这一事实的文档?

而且,为了避免 XY 问题:创建一个流的惯用方法,通过执行操作恰好 n 次?


<details>
<summary>英文:</summary>

I have a source of data that I know has `n` elements, which I can access by repeatedly calling a method on an object; for the sake of example, let&#39;s call it `myReader.find()`.  I want to create a stream of data containing those `n` elements.  Let&#39;s also say that I don&#39;t want to call the `find()` method more times than the amount of data I want to return, as it will throw an exception (e.g. `NoSuchElementException`) if the method is called after the end of the data is reached.

I know I can create this stream by using the [`IntStream.range`](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/stream/IntStream.html#range(int,int)) method, and mapping each element using the `find` method.  However, this feels a little weird since I&#39;m completely ignoring the int values in the stream (I&#39;m really just using it to produce a stream with exactly `n` elements).

```java
return IntStream.range(0, n).mapToObj(i -&gt; myReader.read());

An approach I've considered is using Stream.generate(supplier) followed by Stream.limit(maxSize). Based on my understanding of the limit function, this feels like it should work.

Stream.generate(myReader::read).limit(n)

However, nowhere in the API documentation do I see an indication that the Stream.limit() method will guarantee exactly maxSize elements are generated by the stream it's called on. It wouldn't be infeasible that a stream implementation could be allowed to call the generator function more than n times, so long as the end result was just the first n calls, and so long as it meets the API contract for being a short-circuiting intermediate operation.

Stream.limit JavaDocs

> Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length.
> This is a short-circuiting stateful intermediate operation.

Stream operations and pipelines documentation

> An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. [...] Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.

Is it safe to rely on Stream.generate(generator).limit(n) only making n calls to the underlying generator? If so, is there some documentation of this fact that I'm missing?

And to avoid the XY Problem: what is the idiomatic way of creating a stream by performing an operation exactly n times?

答案1

得分: 5

Stream.generate 方法创建一个 无序 的 Stream。这意味着随后的 limit 操作不必使用前 n 个元素,因为在没有顺序的情况下不存在“第一个”,而是可能选择任意的 n 个元素。实现可以利用这个特性,例如为了更高的并行处理性能。

以下是示例代码:

IntSummaryStatistics s =
    Stream.generate(new AtomicInteger()::incrementAndGet)
        .parallel()
        .limit(100_000)
        .collect(Collectors.summarizingInt(Integer::intValue));

System.out.println(s);

在我的机器上输出类似于:

IntSummaryStatistics{count=100000, sum=5000070273, min=1, average=50000.70273, max=100207}

尽管最大数可能会有所不同。它演示了该 Stream 已经选择了正好 100000 个元素,如所需,但不是从 1 到 100000 的元素。由于生成器产生严格递增的数字,很明显它已经被调用超过 100000 次以获得更高的数字。

另一个示例:

System.out.println(
    Stream.generate(new AtomicInteger()::incrementAndGet)
        .parallel()
        .map(String::valueOf)
        .limit(10)
        .collect(Collectors.toList())
);

在我的机器上(JDK-14),输出类似于:

[4, 8, 5, 6, 10, 3, 7, 1, 9, 11]

在 JDK-8 上,甚至可能输出类似于:

[4, 14, 18, 24, 30, 37, 42, 52, 59, 66]

如果像下面这样的结构:

IntStream.range(0, n).mapToObj(i -> myReader.read())

因为未使用的 i 参数而感觉奇怪,你可以使用下面的方式:

Collections.nCopies(n, myReader).stream().map(TypeOfMyReader::read)

代替。这样做不会显示未使用的 int 参数,并且效果同样好,实际上,它在内部被实现为 IntStream.range(0, n).mapToObj(i -> element)。无论如何,无论是可见的还是隐藏的,都无法避免一些计数器,以确保该方法将被调用 n 次。请注意,由于 read 很可能是一个有状态的操作,启用并行处理时,结果行为将始终类似于无序流,但 IntStreamnCopies 方法创建的是一个有限的流,永远不会调用方法超过指定的次数。

英文:

Stream.generate creates an unordered Stream. This implies that the subsequent limit operation is not required to use the first n elements, as there is no “first” when there’s no order, but may select arbitrary n elements. The implementation may exploit this permission , e.g. for higher parallel processing performance.

The following code

IntSummaryStatistics s =
    Stream.generate(new AtomicInteger()::incrementAndGet)
        .parallel()
        .limit(100_000)
        .collect(Collectors.summarizingInt(Integer::intValue));

System.out.println(s);

prints something like

IntSummaryStatistics{count=100000, sum=5000070273, min=1, average=50000,702730, max=100207}

on my machine, whereas the max number may vary. It demonstrates that the Stream has selected exactly 100000 elements, as required, but not the elements from 1 to 100000. Since the generator produces strictly ascending numbers, it’s clear that is has been called more than 100000 times to get number higher than that.

Another example

System.out.println(
    Stream.generate(new AtomicInteger()::incrementAndGet)
        .parallel()
        .map(String::valueOf)
        .limit(10)
        .collect(Collectors.toList())
);

prints something like this on my machine (JDK-14)

[4, 8, 5, 6, 10, 3, 7, 1, 9, 11]

With JDK-8, it even prints something like

[4, 14, 18, 24, 30, 37, 42, 52, 59, 66]

If a construct like

IntStream.range(0, n).mapToObj(i -&gt; myReader.read())

feels weird due to the unused i parameter, you may use

Collections.nCopies(n, myReader).stream().map(TypeOfMyReader::read)

instead. This doesn’t show an unused int parameter and works equally well, as in fact, it’s internally implemented as IntStream.range(0, n).mapToObj(i -&gt; element). There is no way around some counter, visible or hidden, to ensure that the method will be called n times. Note that, since read likely is a stateful operation, the resulting behavior will always be like an unordered stream when enabling parallel processing, but the IntStream and nCopies approaches create a finite stream that will never invoke the method more than the specified number of times.

答案2

得分: 0

只回答你关于 XY 问题的部分:只需为你的读取器创建一个 Spliterator。

class MyStreamSpliterator implements Spliterator<String> { // 或者任何其他数据类型
    private final MyReaderClass reader;

    public MyStreamSpliterator(MyReaderClass reader) {
        this.reader = reader;
    }

    @Override
    public boolean tryAdvance(Consumer<String> action) {
        try {
            String nextval = reader.read();
            action.accept(nextval);
            return true;
        } catch(NoSuchElementException e) {
            // 必要时进行清理
            return false;
        }
        // 或者:如果你真的想要使用 n 次迭代,
        // 添加一个计数器并使用它。
    }

    @Override
    public Spliterator<String> trySplit() {
        return null; // 我们不拆分
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE; // 或者如果你事先知道正确的值,可以替换为正确的值
    }

    @Override
    public int characteristics() {
        // 如果你知道大小,可以添加 SIZED
        return Spliterator.IMMUTABLE | Spliterator.ORDERED;
    }
}

然后,通过以下方式创建流:StreamSupport.stream(new MyStreamSpliterator(reader), false)

免责声明:这只是我在 Stack Overflow 编辑器中随意编写的,可能存在一些错误。

英文:

Only answering the XY-problem part of your question: simply create a spliterator for your reader.

class MyStreamSpliterator implements Spliterator&lt;String&gt; { // or whichever datatype
    private final MyReaderClass reader;

    public MyStramSpliterator(MyReaderClass reader) {
        this.reader = reader;
    }

    @Override
    public boolean tryAdvance(Consumer&lt;String&gt; action) {
        try {
            String nextval = reader.read();
            action.accept(nextval);
            return true;
        } catch(NoSuchElementException e) {
            // cleanup if necessary
            return false;
        }
        // Alternative: if you really really want to use n iterations,
        // add a counter and use it.
    }

    @Override
    public Spliterator&lt;String&gt; trySplit() {
        return null; // we don&#39;t split
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE; // or the correct value, if you know it before
    }

    @Override
    public int characteristics() {
        // add SIZED if you know the size
        return Spliterator.IMMUTABLE | Spliterator.ORDERED;
    }
}

Then, create your stream as StreamSupport.stream(new MyStreamSpliterator(reader), false)

Disclaimer: I just threw this together in the SO editor, probably there are some errors.

huangapple
  • 本文由 发表于 2020年9月17日 07:03:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/63929040.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定