英文:
WebFlux execute request in background?
问题
我有一个方法,应该在执行`confirm`方法后返回结果或错误消息,然后执行一些后台作业。我写了类似上面的东西,但我不确定在`then`中调用的方法是否正确。如果是的话,我应该如何使用WebFlux在后台运行方法?
public Mono
return someReactiveApiClient.confirm().onErrorMap(...).then(doSomeBackgroundJob);
}
<details>
<summary>英文:</summary>
I have method that should return result or error message after executing `confirm` method and then do some background job. I wrote something similar to above but I'm not sure about method that invoked in `then`. Am i wrong? If yes, How can i run method in background using webflux?
public Mono<Void> someMethod(...){
return someReactiveApiClient.confirm().onErrorMap(...).then(doSomeBackgroundJob);
}
</details>
# 答案1
**得分**: 6
如@zlaval提到的,`publishOn()` 可以用来切换执行上下文。`publishOn` 运算符会影响链中其下运算符的线程上下文,直到出现新的 `publishOn`。因此,`publishOn` 的放置位置很重要。
您可以编写自己的调度程序或使用来自 [此类][1] 的现有调度程序之一。您可以在 [这里][2] 阅读更多信息。
因此,如果您有类似这样的代码(我在使用弹性调度程序):
```java
Mono.just("Hello").log()
.onErrorMap(err -> new RuntimeException("my exception"))
.publishOn(Schedulers.elastic())
.then(Mono.just("World").log())
.subscribe();
您将只会在控制台中看到以下输出:
18:30:11.337 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:30:11.340 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
18:30:11.340 [main] INFO reactor.Mono.Just.1 - | onNext(Hello)
18:30:11.344 [main] INFO reactor.Mono.Just.1 - | onComplete()
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | request(unbounded)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onNext(World)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onComplete()
Process finished with exit code 0
因此,拥有 World
的第二个 Mono 在名为 elastic-2
的另一个线程中执行,而不是在 main
线程中。
请注意,如果您的 confirm
方法引发错误,那么后台任务将不会发生。执行会在那里停止。
例如,如果您有类似这样的代码:
Mono.error(new ArithmeticException()).log()
.onErrorMap(err -> new RuntimeException("my exception"))
.publishOn(Schedulers.elastic())
.then(Mono.just("World").log())
.subscribe();
然后运行这个代码,会得到以下输出:
18:33:50.544 [main] INFO reactor.Mono.Error.1 - onSubscribe([Fuseable] Operators.EmptySubscription)
18:33:50.546 [main] INFO reactor.Mono.Error.1 - request(unbounded)
18:33:50.547 [main] ERROR reactor.Mono.Error.1 - onError(java.lang.ArithmeticException)
18:33:50.549 [main] ERROR reactor.Mono.Error.1 -
java.lang.ArithmeticException: null
at com.example.schooltimetable.Application.main(Application.java:29)
Process finished with exit code 0
如果您的后台任务涉及到一些 I/O 或网络调用,您需要类似这样的代码:
public Mono<Void> someMethod(...) {
return someReactiveApiClient.confirm()
.onErrorMap(err -> {
// 您的错误处理逻辑。记得返回一个错误
})
.publishOn(Schedulers.elastic())
.then(doSomeBackgroundJob);
}
英文:
As @zlaval mentioned, publishOn()
can be used to switch execution contexts. The publishOn
operator influences the threading context where the rest of the operators in the chain below it run, up to a new occurrence of publishOn
. So the placement of publishOn
is significant.
You can write your own scheduler or use one of the existing schedulers from this class . You can read more about it here.
Thus, if you have some code like this (I am using elastic scheduler):
Mono.just("Hello").log()
.onErrorMap(err -> new RuntimeException("my exception"))
.publishOn(Schedulers.elastic())
.then(Mono.just("World").log())
.subscribe();
you will see only this printed in console:
18:30:11.337 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:30:11.340 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
18:30:11.340 [main] INFO reactor.Mono.Just.1 - | onNext(Hello)
18:30:11.344 [main] INFO reactor.Mono.Just.1 - | onComplete()
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | request(unbounded)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onNext(World)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onComplete()
Process finished with exit code 0
Thus the second mono having World
is being executed in another thread called elastic-2
and not in the main
thread.
Note that, if your confirm
method throws an error, then the background task wont occur. The execution will stop there.
eg. If you have something like this:
Mono.error(new ArithmeticException()).log()
.onErrorMap(err -> new RuntimeException("my exception"))
.publishOn(Schedulers.elastic())
.then(Mono.just("World").log())
.subscribe();
then running this, will give output,
18:33:50.544 [main] INFO reactor.Mono.Error.1 - onSubscribe([Fuseable] Operators.EmptySubscription)
18:33:50.546 [main] INFO reactor.Mono.Error.1 - request(unbounded)
18:33:50.547 [main] ERROR reactor.Mono.Error.1 - onError(java.lang.ArithmeticException)
18:33:50.549 [main] ERROR reactor.Mono.Error.1 -
java.lang.ArithmeticException: null
at com.example.schooltimetable.Application.main(Application.java:29)
Process finished with exit code 0
You need similarly like this if your background job involves some I/O or network call.
public Mono<Void> someMethod(...){
return someReactiveApiClient.confirm()
.onErrorMap(err -> {
//Your error handling logic. Remember to return an error
})
.publishOn(Schedulers.elastic())
.then(doSomeBackgroundJob);
}
答案2
得分: 1
以下是翻译好的部分:
以下代码将返回confirm()
的结果,并在原始发布者成功完成后调用该作业。 publishOn
表示该作业必须在不同的线程上运行。您可能需要选择适合您要求的适当Schedulers
。
public Mono<YourRequiredType> someMethod(...) {
return someReactiveApiClient.confirm().onErrorMap(...)
.publishOn(Schedulers.boundedElastic())
.doOnComplete(doSomeBackgroundJob);
}
英文:
The following code will return with the result of the confirm()
, and after the original publisher is succesfully completed, it will call the job. publishOn
indicates that the job have to run on different thread. You may have to choose the proper Schedulers
, best fits to your requierements
public Mono<YourRequieredType> someMethod(...){
return someReactiveApiClient.confirm().onErrorMap(...)
.publishOn(Schedulers.boundedElastic())
.doOnComplete(doSomeBackgroundJob);
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论