WebFlux 在后台执行请求吗?

huangapple go评论98阅读模式
英文:

WebFlux execute request in background?

问题

我有一个方法,应该在执行`confirm`方法后返回结果或错误消息,然后执行一些后台作业。我写了类似上面的东西,但我不确定在`then`中调用的方法是否正确。如果是的话,我应该如何使用WebFlux在后台运行方法?

public Mono someMethod(...){
return someReactiveApiClient.confirm().onErrorMap(...).then(doSomeBackgroundJob);
}



<details>
<summary>英文:</summary>

I have method that should return result or error message after executing `confirm` method and then do some background job. I wrote something similar to above but I&#39;m not sure about method that invoked in `then`. Am i wrong? If yes, How can i run method in background using webflux?


public Mono&lt;Void&gt; someMethod(...){
  return someReactiveApiClient.confirm().onErrorMap(...).then(doSomeBackgroundJob);
}




</details>


# 答案1
**得分**: 6

如@zlaval提到的,`publishOn()` 可以用来切换执行上下文。`publishOn` 运算符会影响链中其下运算符的线程上下文,直到出现新的 `publishOn`。因此,`publishOn` 的放置位置很重要。

您可以编写自己的调度程序或使用来自 [此类][1] 的现有调度程序之一。您可以在 [这里][2] 阅读更多信息。

因此,如果您有类似这样的代码(我在使用弹性调度程序):

```java
    Mono.just("Hello").log()
        .onErrorMap(err -> new RuntimeException("my exception"))
        .publishOn(Schedulers.elastic())
        .then(Mono.just("World").log())
        .subscribe();

您将只会在控制台中看到以下输出:

18:30:11.337 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:30:11.340 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
18:30:11.340 [main] INFO reactor.Mono.Just.1 - | onNext(Hello)
18:30:11.344 [main] INFO reactor.Mono.Just.1 - | onComplete()
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | request(unbounded)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onNext(World)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onComplete()

Process finished with exit code 0

因此,拥有 World 的第二个 Mono 在名为 elastic-2 的另一个线程中执行,而不是在 main 线程中

请注意,如果您的 confirm 方法引发错误,那么后台任务将不会发生。执行会在那里停止。

例如,如果您有类似这样的代码:

    Mono.error(new ArithmeticException()).log()
        .onErrorMap(err -> new RuntimeException("my exception"))
        .publishOn(Schedulers.elastic())
        .then(Mono.just("World").log())
        .subscribe();

然后运行这个代码,会得到以下输出:

18:33:50.544 [main] INFO reactor.Mono.Error.1 - onSubscribe([Fuseable] Operators.EmptySubscription)
18:33:50.546 [main] INFO reactor.Mono.Error.1 - request(unbounded)
18:33:50.547 [main] ERROR reactor.Mono.Error.1 - onError(java.lang.ArithmeticException)
18:33:50.549 [main] ERROR reactor.Mono.Error.1 - 
java.lang.ArithmeticException: null
	at com.example.schooltimetable.Application.main(Application.java:29)

Process finished with exit code 0

如果您的后台任务涉及到一些 I/O 或网络调用,您需要类似这样的代码:

    public Mono<Void> someMethod(...) {
      return someReactiveApiClient.confirm()
               .onErrorMap(err -> {
                   // 您的错误处理逻辑。记得返回一个错误
                   })
               .publishOn(Schedulers.elastic())
               .then(doSomeBackgroundJob);
    }
英文:

As @zlaval mentioned, publishOn() can be used to switch execution contexts. The publishOn operator influences the threading context where the rest of the operators in the chain below it run, up to a new occurrence of publishOn. So the placement of publishOn is significant.

You can write your own scheduler or use one of the existing schedulers from this class . You can read more about it here.

Thus, if you have some code like this (I am using elastic scheduler):

    Mono.just(&quot;Hello&quot;).log()
        .onErrorMap(err -&gt; new RuntimeException(&quot;my exception&quot;))
        .publishOn(Schedulers.elastic())
        .then(Mono.just(&quot;World&quot;).log())
        .subscribe();

you will see only this printed in console:

18:30:11.337 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:30:11.340 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
18:30:11.340 [main] INFO reactor.Mono.Just.1 - | onNext(Hello)
18:30:11.344 [main] INFO reactor.Mono.Just.1 - | onComplete()
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | request(unbounded)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onNext(World)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onComplete()

Process finished with exit code 0

Thus the second mono having World is being executed in another thread called elastic-2 and not in the main thread.

Note that, if your confirm method throws an error, then the background task wont occur. The execution will stop there.

eg. If you have something like this:

    Mono.error(new ArithmeticException()).log()
        .onErrorMap(err -&gt; new RuntimeException(&quot;my exception&quot;))
        .publishOn(Schedulers.elastic())
        .then(Mono.just(&quot;World&quot;).log())
        .subscribe();

then running this, will give output,

18:33:50.544 [main] INFO reactor.Mono.Error.1 - onSubscribe([Fuseable] Operators.EmptySubscription)
18:33:50.546 [main] INFO reactor.Mono.Error.1 - request(unbounded)
18:33:50.547 [main] ERROR reactor.Mono.Error.1 - onError(java.lang.ArithmeticException)
18:33:50.549 [main] ERROR reactor.Mono.Error.1 - 
java.lang.ArithmeticException: null
	at com.example.schooltimetable.Application.main(Application.java:29)

Process finished with exit code 0

You need similarly like this if your background job involves some I/O or network call.

    public Mono&lt;Void&gt; someMethod(...){
      return someReactiveApiClient.confirm()
               .onErrorMap(err -&gt; {
                   //Your error handling logic. Remember to return an error
                   })
               .publishOn(Schedulers.elastic())
               .then(doSomeBackgroundJob);
    }

答案2

得分: 1

以下是翻译好的部分:

以下代码将返回confirm()的结果,并在原始发布者成功完成后调用该作业。 publishOn 表示该作业必须在不同的线程上运行。您可能需要选择适合您要求的适当Schedulers

public Mono<YourRequiredType> someMethod(...) {
    return someReactiveApiClient.confirm().onErrorMap(...)
        .publishOn(Schedulers.boundedElastic())
        .doOnComplete(doSomeBackgroundJob);
}
英文:

The following code will return with the result of the confirm(), and after the original publisher is succesfully completed, it will call the job. publishOn indicates that the job have to run on different thread. You may have to choose the proper Schedulers, best fits to your requierements

public Mono&lt;YourRequieredType&gt; someMethod(...){
  return someReactiveApiClient.confirm().onErrorMap(...)
        .publishOn(Schedulers.boundedElastic())
        .doOnComplete(doSomeBackgroundJob);
}

huangapple
  • 本文由 发表于 2020年7月29日 18:59:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/63152086.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定