英文:
Can I have blocking remote call in Flux.generate state generator
问题
我正在从一系列阻塞的 REST API 调用中生成 Flux,每个调用都依赖于前一个调用的结果。即:
Result r1 = call(fromRow = 0);
Result r2 = call(fromRow = 0 + r1.size());
Result r3 = call(fromRow = 0 + r1.size() + r2.size());
...
这是我尝试的简化版本:
Flux.generate(() -> 0, (i, sink) -> {
Result r = slowRemoteCall(i);
if (r == null) {
sink.complete();
} else {
sink.next(r)
}
return i + r.size();
}, state -> {});
只是想知道状态生成器内部的阻塞调用 slowRemoteCall
会成为问题吗?
提前感谢你的帮助!
英文:
I am generating a Flux from a series of blocking REST API calls, each call depends on the result of previous call. i.e.
Result r1 = call(fromRow = 0);
Result r2 = call(fromRow = 0 + r1.size());
Result r3 = call(fromRow = 0 + r1.size() + r2.size());
...
Here is a simplified version of what I am trying with:
Flux.generate(() -> 0, (i, sink) -> {
Result r = slowRemoteCall(i);
if (r == null) {
sink.complete();
} else {
sink.next(r)
}
return i + r.size();
}, state -> {});
Just wonder will the blocking call slowRemoteCall
inside the state generator become a problem?
Thank you for your help in advance!
答案1
得分: 2
使用expand
操作符和响应式远程客户端(例如:Spring WebClient),您可以以响应式非阻塞的方式实现这一点:
slowRemoteCall(0)
.expand(result -> {
if (result.size() == 0) { // 停止条件
return Mono.empty();
} else {
return slowRemoteCall(result.startIndex() + result.size()); // 保持状态
}
})
Mono<Result> slowRemoteCall(int startIndex) {
// 模拟延迟(这里可以是一个WebClient调用)
return Mono.delay(Duration.ofMillis(200)).thenReturn(new Result(startIndex));
}
受到此博文的启发。
英文:
With expand
operator and a reactive remote client (e.g.: Spring WebClient) you can implement this in a reactive non-blocking way:
slowRemoteCall(0)
.expand(result -> {
if (result.size() == 0) { // stop condition
return Mono.empty();
} else {
return slowRemoteCall(result.startIndex() + result.size()); // maintain state
}
})
Mono<Result> slowRemoteCall(int startIndex) {
// simulate latency (could be a WebClient call here)
return Mono.delay(Duration.ofMillis(200)).thenReturn(new Result(startIndex));
}
Inspired by this blog post.
答案2
得分: 1
这可能会成为一个问题!如果您有任何阻塞调用,您可以使用schedulers
来通过特定的线程池完成任务。
Flux.generate(.........)
.subscribeOn(Schedulers.boundedElastic())
更多信息:
https://www.vinsguru.com/reactive-programming-schedulers/
英文:
It may become a problem! If you have any blocking call, you can use schedulers
to get the task done by specific thread pools.
Flux.generate(.........)
.subscribeOn(Schedulers.boundedElastic())
More info:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论