我能在Flux.generate状态生成器中进行阻塞的远程调用吗?

huangapple go评论84阅读模式
英文:

Can I have blocking remote call in Flux.generate state generator

问题

我正在从一系列阻塞的 REST API 调用中生成 Flux,每个调用都依赖于前一个调用的结果。即:

Result r1 = call(fromRow = 0);
Result r2 = call(fromRow = 0 + r1.size());
Result r3 = call(fromRow = 0 + r1.size() + r2.size());
...

这是我尝试的简化版本:

Flux.generate(() -> 0, (i, sink) -> {
    Result r = slowRemoteCall(i);
    if (r == null) {
        sink.complete();
    } else {
        sink.next(r)
    }
    return i + r.size();
}, state -> {});

只是想知道状态生成器内部的阻塞调用 slowRemoteCall 会成为问题吗?

提前感谢你的帮助!

英文:

I am generating a Flux from a series of blocking REST API calls, each call depends on the result of previous call. i.e.

Result r1 = call(fromRow = 0);
Result r2 = call(fromRow = 0 + r1.size());
Result r3 = call(fromRow = 0 + r1.size() + r2.size());
...

Here is a simplified version of what I am trying with:

Flux.generate(() -> 0, (i, sink) -> {
    Result r = slowRemoteCall(i);
    if (r == null) {
        sink.complete();
    } else {
        sink.next(r)
    }
    return i + r.size();
}, state -> {});

Just wonder will the blocking call slowRemoteCall inside the state generator become a problem?

Thank you for your help in advance!

答案1

得分: 2

使用expand操作符和响应式远程客户端(例如:Spring WebClient),您可以以响应式非阻塞的方式实现这一点:

slowRemoteCall(0)
        .expand(result -> {
            if (result.size() == 0) { // 停止条件
                return Mono.empty();
            } else {
                return slowRemoteCall(result.startIndex() + result.size()); // 保持状态
            }
        })

Mono<Result> slowRemoteCall(int startIndex) {
    // 模拟延迟(这里可以是一个WebClient调用)
    return Mono.delay(Duration.ofMillis(200)).thenReturn(new Result(startIndex));
}

受到此博文的启发。

英文:

With expand operator and a reactive remote client (e.g.: Spring WebClient) you can implement this in a reactive non-blocking way:

slowRemoteCall(0)
        .expand(result -&gt; {
            if (result.size() == 0) { // stop condition
                return Mono.empty();
            } else {
                return slowRemoteCall(result.startIndex() + result.size()); // maintain state
            }
        })

Mono&lt;Result&gt; slowRemoteCall(int startIndex) {
    // simulate latency (could be a WebClient call here)
    return Mono.delay(Duration.ofMillis(200)).thenReturn(new Result(startIndex));
}

Inspired by this blog post.

答案2

得分: 1

这可能会成为一个问题!如果您有任何阻塞调用,您可以使用schedulers来通过特定的线程池完成任务。

Flux.generate(.........)
    .subscribeOn(Schedulers.boundedElastic())

更多信息:

https://www.vinsguru.com/reactive-programming-schedulers/

英文:

It may become a problem! If you have any blocking call, you can use schedulers to get the task done by specific thread pools.

Flux.generate(.........)
    .subscribeOn(Schedulers.boundedElastic())

More info:

https://www.vinsguru.com/reactive-programming-schedulers/

huangapple
  • 本文由 发表于 2020年10月18日 21:08:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/64413712.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定