Java流 – 缓冲大数据流

huangapple go评论79阅读模式
英文:

Java Streams - Buffering huge streams

问题

import java.util.*;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class BreakStreams
{
   
   /**
    * Batch a stream into chunks
    */
   public static <T> Stream<List<T>> buffer(Stream<T> stream, final long count)
   {
      final Iterator<T> streamIterator = stream.iterator();

      return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<List<T>>()
      {
         @Override public boolean hasNext()
         {
            return streamIterator.hasNext();
         }

         @Override public List<T> next()
         {
            List<T> intermediate = new ArrayList<>();
            for (long v = 0; v < count && hasNext(); v++)
            {
               intermediate.add(streamIterator.next());
            }
            return intermediate;
         }
      }, 0), false);
   }

   public static void main(String[] args)
   {

      //create streams from huge datasets
      Stream<Long> streams = Stream.of(LongStream.range(0, Integer.MAX_VALUE).boxed(),
                                       LongStream.range(0, Integer.MAX_VALUE).boxed())
                                   //collapse into one stream
                                   .flatMap(x -> x);
      //iterating over the stream one item at a time is OK..
//      streams.forEach(x -> {
      
      //buffering the stream is NOT ok, you will go OOM
      buffer(streams, 25).forEach(x -> {
         try
         {
            Thread.sleep(2500);
         }
         catch (InterruptedException ignore)
         {
         }
         System.out.println(x);
      });
   }
}
英文:

I'm trying to collapse several streams backed by huge amounts of data into one, then buffer them. I'm able to collapse these streams into one stream of items with no problem. When I attempt to buffer/chunk the streams, though, it attempts to fully buffer the first stream, which instantly fills up my memory.

It took me a while to narrow down the issue to a minimum test case, but there's some code below.

I can refactor things such that I don't run into this issue, but without understanding why exactly this blows up, I feel like using streams is just a ticking time bomb.

I took inspiration from https://stackoverflow.com/questions/47842871/buffer-operator-on-java-8-streams for the buffering.

import java.util.*;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class BreakStreams
{

   //@see https://stackoverflow.com/questions/47842871/buffer-operator-on-java-8-streams
   /**
    * Batch a stream into chunks
    */
   public static &lt;T&gt; Stream&lt;List&lt;T&gt;&gt; buffer(Stream&lt;T&gt; stream, final long count)
   {
      final Iterator&lt;T&gt; streamIterator = stream.iterator();

      return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator&lt;List&lt;T&gt;&gt;()
      {
         @Override public boolean hasNext()
         {
            return streamIterator.hasNext();
         }

         @Override public List&lt;T&gt; next()
         {
            List&lt;T&gt; intermediate = new ArrayList&lt;&gt;();
            for (long v = 0; v &lt; count &amp;&amp; hasNext(); v++)
            {
               intermediate.add(streamIterator.next());
            }
            return intermediate;
         }
      }, 0), false);
   }

   public static void main(String[] args)
   {

      //create streams from huge datasets
      Stream&lt;Long&gt; streams = Stream.of(LongStream.range(0, Integer.MAX_VALUE).boxed(),
                                       LongStream.range(0, Integer.MAX_VALUE).boxed())
                                   //collapse into one stream
                                   .flatMap(x -&gt; x);
      //iterating over the stream one item at a time is OK..
//      streams.forEach(x -&gt; {
      
      //buffering the stream is NOT ok, you will go OOM
      buffer(streams, 25).forEach(x -&gt; {
         try
         {
            Thread.sleep(2500);
         }
         catch (InterruptedException ignore)
         {
         }
         System.out.println(x);
      });
   }
}

答案1

得分: 6

这似乎与较旧的问题“https://stackoverflow.com/q/29229373/2711488”有关。虽然该问题已经在流的内置操作中修复,但在我们尝试在外部迭代 flatmapped 流时似乎仍然存在。

我们可以简化代码以重现这个问题:

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
    .flatMapToLong(x -> x)
    .iterator().hasNext();

请注意,Spliterator 也受到影响:

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
    .flatMapToLong(x -> x)
    .spliterator()
    .tryAdvance((long l) -> System.out.println("first item: "+l));

这两种方法都会尝试缓冲元素,最终导致 OutOfMemoryError

由于似乎不受影响的是 spliterator().forEachRemaining(…),您可以实现一个适用于 forEach 的解决方案,但这可能很脆弱,因为对于短路流操作,它仍然会出现问题。

public static <T> Stream<List<T>> buffer(Stream<T> stream, final int count) {
    boolean parallel = stream.isParallel();
    Spliterator<T> source = stream.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<List<T>>(
            (source.estimateSize()+count-1)/count, source.characteristics()
                &(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
                    | Spliterator.NONNULL) {
            List<T> list;
            Consumer<T> c = t -> list.add(t);
            @Override
            public boolean tryAdvance(Consumer<? super List<T>> action) {
                if(list == null) list = new ArrayList<>(count);
                if(!source.tryAdvance(c)) return false;
                do {} while(list.size() < count && source.tryAdvance(c));
                action.accept(list);
                list = null;
                return true;
            }
            @Override
            public void forEachRemaining(Consumer<? super List<T>> action) {
                source.forEachRemaining(t -> {
                    if(list == null) list = new ArrayList<>(count);
                    list.add(t);
                    if(list.size() == count) {
                        action.accept(list);
                        list = null;
                    }
                });
                if(list != null) {
                    action.accept(list);
                    list = null;
                }
            }
        }, parallel);
}

但请注意,基于 Spliterator 的解决方案通常更可取,因为它们支持携带额外信息以启用优化,并且在许多用例中具有较低的迭代成本。一旦在JDK代码中修复了这个问题,这就是应该采用的方法。

作为一种权宜之计,您可以使用 Stream.concat(…) 来组合流,但它在文档中明确警告不要一次组合太多流:

当构造从重复连接中生成流时要小心。访问深度连接流的元素可能导致深度调用链,甚至会引发 StackOverflowError 错误。

英文:

This seems to be connected to the older issue “https://stackoverflow.com/q/29229373/2711488”. While that issue has been fixed for the Stream’s builtin operations, it seems to still exist when we try to iterate over a flatmapped stream externally.

We can simplify the code to reproduce the problem to

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -&gt; x)
.iterator().hasNext();

Note that using Spliterator is affected as well

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -&gt; x)
.spliterator()
.tryAdvance((long l) -&gt; System.out.println(&quot;first item: &quot;+l));

Both try to buffer elements until ultimately bailing out with an OutOfMemoryError.

Since spliterator().forEachRemaining(…) seems not to be affected, you could implement a solution which works for your use case of forEach, but it would be fragile, as it would still exhibit the problem for short-circuiting stream operations.

public static &lt;T&gt; Stream&lt;List&lt;T&gt;&gt; buffer(Stream&lt;T&gt; stream, final int count) {
boolean parallel = stream.isParallel();
Spliterator&lt;T&gt; source = stream.spliterator();
return StreamSupport.stream(
new Spliterators.AbstractSpliterator&lt;List&lt;T&gt;&gt;(
(source.estimateSize()+count-1)/count, source.characteristics()
&amp;(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
| Spliterator.NONNULL) {
List&lt;T&gt; list;
Consumer&lt;T&gt; c = t -&gt; list.add(t);
@Override
public boolean tryAdvance(Consumer&lt;? super List&lt;T&gt;&gt; action) {
if(list == null) list = new ArrayList&lt;&gt;(count);
if(!source.tryAdvance(c)) return false;
do {} while(list.size() &lt; count &amp;&amp; source.tryAdvance(c));
action.accept(list);
list = null;
return true;
}
@Override
public void forEachRemaining(Consumer&lt;? super List&lt;T&gt;&gt; action) {
source.forEachRemaining(t -&gt; {
if(list == null) list = new ArrayList&lt;&gt;(count);
list.add(t);
if(list.size() == count) {
action.accept(list);
list = null;
}
});
if(list != null) {
action.accept(list);
list = null;
}
}
}, parallel);
}

But note that Spliterator based solutions are preferable in general, as they support carrying additional information enabling optimizations and have lower iteration costs in a lot of use cases. So this is the way to go once this issue has been fixed in the JDK code.

As a workaround, you can use Stream.concat(…) to combine streams, but it has an explicit warning about not to combine too many streams at once in [its documentation]:

> Use caution when constructing streams from repeated concatenation. Accessing an element of a deeply concatenated stream can result in deep call chains, or even StackOverflowException [sic].

<sup>The throwable’s name has been corrected to StackOverflowError in Java 9’s documentation</sup>
[its documentation]: https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#concat-java.util.stream.Stream-java.util.stream.Stream-

huangapple
  • 本文由 发表于 2020年4月9日 13:11:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/61114380.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定