英文:
Reading from a Flux<Integer> in chunks
问题
可以在WebFlux流中以块的形式进行读取吗?(除了使用delayElements之外)
例如,在我写下以下代码之后:
Flux.range(1, 10).doOnNext(System.out::println).take(5).subscribe();
是否有任何方法可以继续读取接下来的5个整数?
如果不能,是否有其他方法让消费者决定何时请求下一部分数据?
编辑:
澄清一下,我想先读取前5个值,然后暂停,直到程序的某个任意时间,然后读取接下来的5个值,而不重新创建发射器流。仅调用buffer()是不够的。
英文:
Is it possible do read from a webflux flux in chunks? ( other than using delayElements )
For example after I write
Flux.range(1, 10).doOnNext(System.out::println).take(5).subscribe();
is there any way to continue to read the next 5 integers?
If not, is there an alternative for the consumer to decide when to request the next piece of data?
Edit:
To clarify, I would like to read the first 5 values, then pause until an arbitrary later time in the program, then read the next 5 values without recreating the emitter flux. simply calling buffer() won't do
答案1
得分: 2
then you need a full-fledged asynchronous subscriber object, not just a chain of methods.
// use maven dependency 'org.df4j:df4j-core:8.3'
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;
public class FluxSubscriberTest {
@Test
public void test10() {
FluxSubscriber subscriber = new FluxSubscriber();
Flux.range(1, 10).subscribe(subscriber.inp);
subscriber.start();
boolean ok = subscriber.blockingAwait(5000);
Assert.assertTrue(ok);
}
static class FluxSubscriber extends Actor {
InpFlow<Integer> inp = new InpFlow<>(this, 5);
int count = 0;
@Override
protected void runAction() throws Throwable {
if (inp.isCompleted()) {
System.out.println("input stream completed");
complete();
return;
}
Integer value = inp.remove();
System.out.println("value=" + value);
if (++count == 5) {
count = 0;
System.out.println("pause:");
delay(1000);
}
}
}
}
In fact, it reads 5 items first, and then one by one after each call to `inp.remove()`. If this is not exactly what you want, then you can extend class `InpFlow` to modify the policy when it invokes `Subscription.request()`.
Source codes are available at [https://github.com/akaigoro/df4j][1] (yes I am the author).
英文:
then you need a full-fledged asynchronous subscriber object, not just a chain of methods.
// use maven dependency 'org.df4j:df4j-core:8.3'
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;
public class FluxSubscriberTest {
@Test
public void test10() {
FluxSubscriber subscriber = new FluxSubscriber();
Flux.range(1, 10).subscribe(subscriber.inp);
subscriber.start();
boolean ok = subscriber.blockingAwait(5000);
Assert.assertTrue(ok);
}
static class FluxSubscriber extends Actor {
InpFlow<Integer> inp = new InpFlow<>(this, 5);
int count = 0;
@Override
protected void runAction() throws Throwable {
if (inp.isCompleted()) {
System.out.println("input stream completed");
complete();
return;
}
Integer value = inp.remove();
System.out.println("value="+value);
if (++count==5) {
count = 0;
System.out.println("pause:");
delay(1000);
}
}
}
}
In fact, it reads 5 items first, and then one by one after each call to inp.remove(). If this is not exactly what you want, then you can extend class InpFlow to modify the policy when it invokes Subscription.request().
Source codes are avalable at https://github.com/akaigoro/df4j (yes I am the author).
答案2
得分: 0
buffer 是你要寻找的关键字
Flux.range(1, 10)
.buffer(5)
.doOnNext(System.out::println)
.subscribe();
输出:
[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]
链接:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#buffer--
英文:
buffer is the keyword you're looking for
Flux.range(1, 10)
.buffer(5)
.doOnNext(System.out::println)
.subscribe();
output:
[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#buffer--
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论