英文:
Reading from a Flux<Integer> in chunks
问题
可以在WebFlux流中以块的形式进行读取吗?(除了使用delayElements之外)
例如,在我写下以下代码之后:
Flux.range(1, 10).doOnNext(System.out::println).take(5).subscribe();
是否有任何方法可以继续读取接下来的5个整数?
如果不能,是否有其他方法让消费者决定何时请求下一部分数据?
编辑:
澄清一下,我想先读取前5个值,然后暂停,直到程序的某个任意时间,然后读取接下来的5个值,而不重新创建发射器流。仅调用buffer()是不够的。
英文:
Is it possible do read from a webflux flux in chunks? ( other than using delayElements )
For example after I write
Flux.range(1, 10).doOnNext(System.out::println).take(5).subscribe();
is there any way to continue to read the next 5 integers?
If not, is there an alternative for the consumer to decide when to request the next piece of data?
Edit:
To clarify, I would like to read the first 5 values, then pause until an arbitrary later time in the program, then read the next 5 values without recreating the emitter flux. simply calling buffer() won't do
答案1
得分: 2
then you need a full-fledged asynchronous subscriber object, not just a chain of methods.
// use maven dependency 'org.df4j:df4j-core:8.3'
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;
public class FluxSubscriberTest {
@Test
public void test10() {
FluxSubscriber subscriber = new FluxSubscriber();
Flux.range(1, 10).subscribe(subscriber.inp);
subscriber.start();
boolean ok = subscriber.blockingAwait(5000);
Assert.assertTrue(ok);
}
static class FluxSubscriber extends Actor {
InpFlow<Integer> inp = new InpFlow<>(this, 5);
int count = 0;
@Override
protected void runAction() throws Throwable {
if (inp.isCompleted()) {
System.out.println("input stream completed");
complete();
return;
}
Integer value = inp.remove();
System.out.println("value=" + value);
if (++count == 5) {
count = 0;
System.out.println("pause:");
delay(1000);
}
}
}
}
In fact, it reads 5 items first, and then one by one after each call to `inp.remove()`. If this is not exactly what you want, then you can extend class `InpFlow` to modify the policy when it invokes `Subscription.request()`.
Source codes are available at [https://github.com/akaigoro/df4j][1] (yes I am the author).
英文:
then you need a full-fledged asynchronous subscriber object, not just a chain of methods.
// use maven dependency 'org.df4j:df4j-core:8.3'
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;
public class FluxSubscriberTest {
@Test
public void test10() {
FluxSubscriber subscriber = new FluxSubscriber();
Flux.range(1, 10).subscribe(subscriber.inp);
subscriber.start();
boolean ok = subscriber.blockingAwait(5000);
Assert.assertTrue(ok);
}
static class FluxSubscriber extends Actor {
InpFlow<Integer> inp = new InpFlow<>(this, 5);
int count = 0;
@Override
protected void runAction() throws Throwable {
if (inp.isCompleted()) {
System.out.println("input stream completed");
complete();
return;
}
Integer value = inp.remove();
System.out.println("value="+value);
if (++count==5) {
count = 0;
System.out.println("pause:");
delay(1000);
}
}
}
}
In fact, it reads 5 items first, and then one by one after each call to inp.remove()
. If this is not exactly what you want, then you can extend class InpFlow
to modify the policy when it invokes Subscription.request()
.
Source codes are avalable at https://github.com/akaigoro/df4j (yes I am the author).
答案2
得分: 0
buffer
是你要寻找的关键字
Flux.range(1, 10)
.buffer(5)
.doOnNext(System.out::println)
.subscribe();
输出:
[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]
链接:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#buffer--
英文:
buffer
is the keyword you're looking for
Flux.range(1, 10)
.buffer(5)
.doOnNext(System.out::println)
.subscribe();
output:
[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#buffer--
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论