反应器跳到错误的调度程序?

huangapple go评论58阅读模式
英文:

Reactor jumping to the wrong Scheduler?

问题

我有一个使用flatMaps逐步处理的数据流,同时使用Mono.fromCallable()调用阻塞代码。当查看运行日志时,重试操作在并行池中运行,原因是什么?如何使它们在组所在的调度程序上执行?基本上,以下是工作流程,从消息流开始,按某个键分组,然后对于每个组,我想在其自己的调度程序上运行它们,最大并发数为6(在这种情况下)。我做错了什么?

以下是您提供的代码段:

messageStream
    .groupBy(this::grouper)
    .flatMap(group -> {
        Scheduler scheduler = Schedulers.newBoundedElastic(6, 1, "ChildPool:" + group.key());
        return group
            .publishOn(scheduler)
            .flatMap(t1 -> Mono.fromCallable(() -> chevronOne(t1))
                .retryBackoff(MAX_RETRIES, Duration.ofSeconds(1), Duration.ofSeconds(60))
                .onErrorResume(e -> doErrorChevronOne(m, e, t1))
            )
            .flatMap(t2 -> Mono.fromCallable(() -> chevronTwo(t2))
                .retryBackoff(MAX_RETRIES, Duration.ofSeconds(1), Duration.ofSeconds(60))
                .onErrorResume(e -> doErrorChevronOne(m, e, t2))
            );
    })

以下是日志中显示调度程序跳转的几行示例:

2020-08-12 15:33:20.289 INFO [ChildPool:1004-21] e.u.t.t.f.l.transfers.ITestTransfers:582
2020-08-12 15:33:20.889 INFO [parallel-2] e.u.t.t.f.l.s.TransfersService:475 - ***** DOING chevronTwo ****

希望这对您有所帮助。

英文:

I've got a flux I'm processing in steps using flatMaps and calling blocking code with a Mono.fromCallable(). When looking in the run logs, the retires are running in the parallel pool for some reason. Why is that, and how can I get them to execute in the scheduler that the group is running on? This is basically the workflow below, starting with a stream of messages, group by some key, then for each group, I want to run them in their own Scheduler with a max concurrency of 6 (in this case). What am I doing wrong?

messageStream
    .groupBy(this::grouper)
    .flatMap(group ->
        Scheduler scheduler = Schedulers.newBoundedElastic(6, 1, "ChildPool:"+group.key());
        return group
              .publishOn(scheduler)
              .flatMap(t1 -> Mono.fromCallable(() -> chevronOne(t1))
                   .retryBackoff(MAX_RETRIES, Duration.ofSeconds(1), Duration.ofSeconds(60))
                    .onErrorResume(e -> doErrorChevronOne(m, e, t1))
              )
              .flatMap(t2 -> Mono.fromCallable(() -> chevronTwo(t2))
                   .retryBackoff(MAX_RETRIES, Duration.ofSeconds(1), Duration.ofSeconds(60))
                   .onErrorResume(e -> doErrorChevronOne(m, e, t2))
               )

Here are a couple of lines from the logs showing the Scheduler jumping:

2020-08-12 15:33:20.289 INFO [ChildPool:1004-21] e.u.t.t.f.l.transfers.ITestTransfers:582
2020-08-12 15:33:20.889 INFO [parallel-2] e.u.t.t.f.l.s.TransfersService:475 - ***** DOING chevronTwo ****

答案1

得分: 1

publishOn只保证后续操作将使用给定的SchedulerretryBackoff默认在parallel调度器上进行重试,因此结果可能如此。

如果需要在特定的Scheduler上处理任务,您必须始终明确指定调度方式。

英文:

publishOn only guarantees that given Scheduler will be used by the following operator. retryBackoff schedules the retry on the parallel scheduler by default, hence the result.

You must always be explicit about the scheduling if you need to handle something on a specific Scheduler.

huangapple
  • 本文由 发表于 2020年8月13日 04:27:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/63384283.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定