将阻塞调用放在响应式链中似乎可以在不同的线程中工作

huangapple go评论74阅读模式
英文:

Putting blocking call inside a reactive chain seems to work in different thread

问题

对不起,我无法提供代码的翻译。如果您有其他非代码方面的问题或需要帮助,请随时提出。

英文:

Fairly new to reactor so please excuse if you don't see enough research or depth in the question.

In my reactive project, the resource API endpoint returns Mono<Resource>. At one stage, I have to poll an external API to get a completion status, and use that status to process further, and ultimately construct the resource and return.

In another question, I asked if I could use .repeatWhen(), but it was just not working as I thought it would and user @amanin mentioned about .retryWhen(), about which I was initially apprehensive which is the reason why I tried .repeatWhen() in the beginning. .retryWhen() worked making desirable number of retry calls and stopping after that, so below is my adaptation of the getStatus() method.

public Mono&lt;AppContext&gt; getStatus(AppContext ctx) {
    externalclient.getStatus() // external API call, client returns Mono&lt;Response&gt;
    .map(resp -&gt; {
        if(resp.isCompleted()) {
            ctx.setCompleted(true);
            return resp;
        }
        throw new RetryException();
    })
    .retryWhen(Retry.backoff(maxAttempts).filter(err -&gt; err instanceof RetryException))
    .subscribe();
    return Mono.just(ctx);
}

Queries:

  1. Is it the right approach because it looks like to me as an anti-pattern since I am controlling the flow with exception?
  2. Above executes, but problem is my FTs seem to get the response while retries are still happening, meaning the main API thread and this retry thread are just not connected. How do I fix it?

答案1

得分: 2

使用以下代码:

externalclient.getStatus() // 调用外部 API,客户端返回
// ...
.subscribe();
return Mono.just(ctx);

你立即返回了Mono.just(ctx),这与外部客户端调用和所有逻辑无关。不要手动调用subscribe并返回将所有逻辑应用于外部客户端原始Mono的结果,像这样:

public Mono<AppContext> getStatus(AppContext ctx) {
    return externalclient.getStatus() // 调用外部 API,客户端返回 Mono<Response>
            .map(resp -> {
                if (resp.isCompleted()) {
                    return resp;
                }
                throw new RetryException();
            })
            .retryWhen(Retry.backoff(maxAttempts).filter(err -> err instanceof RetryException))
            .map(resp -> {
                        ctx.setCompleted(true);
                        return ctx;
                    }
            );
}

订阅将由Spring WebFlux在内部处理。

关于使用异常控制流的唯一有争议的事情对我来说是instanceof,因为通常retryWhen用于处理由于异常而失败的Monos。想象一下,例如,如果出现某种网络问题,externalclient.getStatus()返回由底层引发异常而失败的Mono。所以也许你不需要.filter(err -> ...),因为在任何错误的情况下都必须重试。

英文:

With

externalclient.getStatus() // external API call, client returns 
// ...
.subscribe();
return Mono.just(ctx);

you immediately return Mono.just(ctx) that have nothing to do with external client call and all that logic. Do not call subscribe manually and return result of applying all that logic to the original Mono from external client like that:

public Mono&lt;AppContext&gt; getStatus(AppContext ctx) {
    return externalclient.getStatus() // external API call, client returns Mono&lt;Response&gt;
            .map(resp -&gt; {
                if (resp.isCompleted()) {
                    return resp;
                }
                throw new RetryException();
            })
            .retryWhen(Retry.backoff(maxAttempts).filter(err -&gt; err instanceof RetryException))
            .map(resp -&gt; {
                        ctx.setCompleted(true);
                        return ctx;
                    }
            );
}

Subscribe would be done by spring-webflux under the hood.

The only controversial thing about controlling flow with exception for me is instanceof, because in general retryWhen is designed to handle Monos failed with exception. Imagine, for example, if there are a kind of network issue and externalclient.getStatus() returns Mono failed with exception raised under the hood. So may be you just don't need .filter(err -&gt; ...) because you have to retry in case of any error.

huangapple
  • 本文由 发表于 2023年7月13日 16:33:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/76677404.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定