从Flux<Integer>中分块读取

huangapple go评论56阅读模式
英文:

Reading from a Flux<Integer> in chunks

问题

可以在WebFlux流中以块的形式进行读取吗?(除了使用delayElements之外)

例如,在我写下以下代码之后:

Flux.range(1, 10).doOnNext(System.out::println).take(5).subscribe();

是否有任何方法可以继续读取接下来的5个整数?

如果不能,是否有其他方法让消费者决定何时请求下一部分数据?

编辑:

澄清一下,我想先读取前5个值,然后暂停,直到程序的某个任意时间,然后读取接下来的5个值,而不重新创建发射器流。仅调用buffer()是不够的。

英文:

Is it possible do read from a webflux flux in chunks? ( other than using delayElements )

For example after I write

Flux.range(1, 10).doOnNext(System.out::println).take(5).subscribe();

is there any way to continue to read the next 5 integers?

If not, is there an alternative for the consumer to decide when to request the next piece of data?

Edit:

To clarify, I would like to read the first 5 values, then pause until an arbitrary later time in the program, then read the next 5 values without recreating the emitter flux. simply calling buffer() won't do

答案1

得分: 2

then you need a full-fledged asynchronous subscriber object, not just a chain of methods.

// use maven dependency 'org.df4j:df4j-core:8.3'
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;

public class FluxSubscriberTest {

    @Test
    public  void test10() {
        FluxSubscriber subscriber = new FluxSubscriber();
        Flux.range(1, 10).subscribe(subscriber.inp);
        subscriber.start();
        boolean ok = subscriber.blockingAwait(5000);
        Assert.assertTrue(ok);
    }

    static class FluxSubscriber extends Actor {
        InpFlow<Integer> inp = new InpFlow<>(this, 5);
        int count = 0;

        @Override
        protected void runAction() throws Throwable {
            if (inp.isCompleted()) {
                System.out.println("input stream completed");
                complete();
                return;
            }
            Integer value = inp.remove();
            System.out.println("value=" + value);
            if (++count == 5) {
                count = 0;
                System.out.println("pause:");
                delay(1000);
            }
        }
    }
}

In fact, it reads 5 items first, and then one by one after each call to `inp.remove()`. If this is not exactly what you want, then you can extend class `InpFlow` to modify the policy when it invokes `Subscription.request()`.

Source codes are available at [https://github.com/akaigoro/df4j][1] (yes I am the author).
英文:

then you need a full-fledged asynchronous subscriber object, not just a chain of methods.

// use maven dependency &#39;org.df4j:df4j-core:8.3&#39;
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;
public class FluxSubscriberTest {
@Test
public  void test10() {
FluxSubscriber subscriber = new FluxSubscriber();
Flux.range(1, 10).subscribe(subscriber.inp);
subscriber.start();
boolean ok = subscriber.blockingAwait(5000);
Assert.assertTrue(ok);
}
static class FluxSubscriber extends Actor {
InpFlow&lt;Integer&gt; inp = new InpFlow&lt;&gt;(this, 5);
int count = 0;
@Override
protected void runAction() throws Throwable {
if (inp.isCompleted()) {
System.out.println(&quot;input stream completed&quot;);
complete();
return;
}
Integer value = inp.remove();
System.out.println(&quot;value=&quot;+value);
if (++count==5) {
count = 0;
System.out.println(&quot;pause:&quot;);
delay(1000);
}
}
}
}

In fact, it reads 5 items first, and then one by one after each call to inp.remove(). If this is not exactly what you want, then you can extend class InpFlow to modify the policy when it invokes Subscription.request().

Source codes are avalable at https://github.com/akaigoro/df4j (yes I am the author).

答案2

得分: 0

buffer 是你要寻找的关键字

Flux.range(1, 10)
    .buffer(5)
    .doOnNext(System.out::println)
    .subscribe();

输出:

[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]

链接:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#buffer--

英文:

buffer is the keyword you're looking for

Flux.range(1, 10)
.buffer(5)
.doOnNext(System.out::println)
.subscribe();

output:

[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#buffer--

huangapple
  • 本文由 发表于 2020年10月23日 19:24:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/64499066.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定