可以将一个流分割成多个较小的流吗?

huangapple go评论86阅读模式
英文:

Can I split a stream into multiple smaller streams

问题

以下是翻译后的内容:

对于流处理有多个问题,但对于这个用例和在 Java 中,并没有找到任何解决方法。

我有一个包含大量对象的流 Stream<A> [约 100 万个对象]。StreamA 是从文件中获取的。

class A { enum Status [Running, Queued, Completed], String name }

我想将 Stream<A> 拆分为三个流,而不使用任何 Collect 语句。Collect 语句会将所有内容加载到内存中。

由于我在这里多次调用了 stream.concat,所以我遇到了 StackOverflowException 问题。

Java 文档中提到了 Stream.Concat 的问题:

“实现注意事项:
在构造重复连接的流时要小心。访问深度连接流的元素可能导致深层调用链,甚至是 StackOverflowException。”

Map<Status, Stream<String>> splitStream = new HashMap<>();
streamA.forEach(aObj -> {
    Stream<String> statusBasedStream = splitStream.getOrDefault(aObj.status, Stream.of());
    splitStream.put(aObj.status, Stream.concat(statusBasedStream, Stream.of(aObj.name)));
});

虽然在 GitHub 上有一些自定义流的选项可以实现连接,但我想使用标准库来解决这个问题。

如果数据较小,可以采用列表方法,如此处所述(https://stackoverflow.com/questions/41127391/split-stream-into-substreams-with-n-elements)。

英文:

There are mulitple questions for streams but for this usecase & in java, didnt find any.

I have a huge stream of objects Stream&lt;A&gt; [~1Million objects]. StreamA comes from a file.

Class A { enum status [Running,queued,Completed], String name }

I want to split Stream&lt;A&gt; into three streams without using any Collect statements. Collect statement loads everything into memory.

I am facing StackOverflowException as I am calling stream.concat multiple times here.

Stream.Concat has problem mentioned in Java Docs
"Implementation Note:
Use caution when constructing streams from repeated concatenation. Accessing an element of a deeply concatenated stream can result in deep call chains, or even StackOverflowException."

Map&lt;Status, Stream&lt;String&gt;&gt; splitStream = new HashMap&lt;&gt;();
streamA.foreach(aObj -&gt; 
Stream&lt;String&gt; statusBasedStream = splitStream.getOrDefault(aObj.status,Stream.of());
splitStream.put(aObj.status, Stream.concat(statusBasedStream, Stream.of(aObj.name))); 

There are few options where custom streams are available in github to achieve Concatenation but wanted to use standard libraries to solve this.

If data is smaller would have taken a list approach as mentioned here (https://stackoverflow.com/questions/41127391/split-stream-into-substreams-with-n-elements)

答案1

得分: 1

不是问题的确切解决方案,但如果您了解索引信息,那么Stream.skip()Stream.limit()的组合可以帮助解决这个问题 - 以下是我尝试过的虚拟代码:

        int queuedNumbers = 100;
        int runningNumbers = 200;
        Stream<Object> all = Stream.of();
        Stream<Object> queuedAndCompleted = all.skip(queuedNumbers);
        Stream<Object> queued = all.limit(queuedNumbers);
        Stream<Object> running = queuedAndCompleted.limit(runningNumbers);
        Stream<Object> completed = queuedAndCompleted.skip(runningNumbers);

希望对您有所帮助。

英文:

Not the exact solution of the problem but if you have information about the indexes then
combination of Stream.skip() and Stream.limit() can help in this - Below is the dummy code that I tried -

    int queuedNumbers = 100;
    int runningNumbers=200;
    Stream&lt;Object&gt; all = Stream.of();
    Stream&lt;Object&gt; queuedAndCompleted = all.skip(queuedNumbers);
    Stream&lt;Object&gt; queued = all.limit(queuedNumbers);
    Stream&lt;Object&gt; running = queuedAndCompleted.limit(runningNumbers);
    Stream&lt;Object&gt; completed = queuedAndCompleted.skip(runningNumbers);

Hope it would be of some help.

huangapple
  • 本文由 发表于 2020年5月5日 08:28:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/61603847.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定