反应器基础的轮询未终止。

huangapple go评论88阅读模式
英文:

Reactor based polling is not terminating

问题

以下是已翻译的代码部分:

意图是轮询若干次直到收到特定响应以下是我编译的代码片段

Mono.defer(() -> webClient.getResponse())
    // 每500毫秒重复进行GET调用
    .repeatWhen(repeat -> repeat.delayElements(Duration.ofMillis(500)))
    .repeat(4) // 最大重复尝试次数
    .takeUntil(response -> response.equals("1"))
    //.retryWhen(Retry.max(3))
    .log()
    .subscribe(response -> {
        System.out.println("收到响应:" + response);
    });

但是出现了问题它根本不停止我希望它运行4次然后退出

12:54:54.527 [parallel-7] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
收到响应2
12:54:55.034 [parallel-8] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
收到响应2
12:54:55.540 [parallel-9] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
收到响应2
12:54:56.045 [parallel-10] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
收到响应2
12:54:56.551 [parallel-1] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
收到响应2
12:54:57.057 [parallel-2] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
收到响应2
12:54:57.563 [parallel-3] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
收到响应2
.......

请注意,代码部分已经翻译,其他内容已被删除,如您所要求。如果您需要更多帮助,请随时提出。

英文:

Intention is to poll a certain number of times until I receive a particular response. Here is the piece of code I have compiled:

Mono.defer(() -> webClient.getResponse())
// repeat GET call after every 500 ms
.repeatWhen(repeat -> repeat.delayElements(Duration.ofMillis(500)))
.repeat(4) // max repeat attempts
.takeUntil(response -> response.equals("1"))
//.retryWhen(Retry.max(3))
.log()
.subscribe(response -> {
System.out.println("response received: " + response);
});

But, there's a problem. It is not stopping at all. I was expecting it to run 4 times and exit.

12:54:54.527 [parallel-7] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
response received: 2
12:54:55.034 [parallel-8] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
response received: 2
12:54:55.540 [parallel-9] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
response received: 2
12:54:56.045 [parallel-10] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
response received: 2
12:54:56.551 [parallel-1] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
response received: 2
12:54:57.057 [parallel-2] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
response received: 2
12:54:57.563 [parallel-3] INFO  reactor.Flux.TakeUntil.1 - onNext(2)
response received: 2
.......

答案1

得分: 1

repeat(int n) 运行 flux n 次。但在你的情况下,在此之前使用了 repeatWhen,它产生了一个无限的 flux。这就是为什么你的 flux 不再结束的原因。

如果你想要在收到正确值之前重试订阅,你可以使用 retryWhen 函数。它接受一个重试配置作为输入,以限制重试次数,指定重试之间的延迟等。在你的情况下,你可以:

  1. 如果没有收到正确值,使流失败
  2. 添加一个 retryWhen 操作来重试订阅。

让我们用一个示例来说明:

import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.System.nanoTime;

public class RetryWhen {

    /**
     * 在每次订阅时发出递减的值
     * @param initial 要发出的第一个值
     */
    static Mono<Integer> countdown(int initial) {
        var current = new AtomicInteger(initial);
        var t0 = nanoTime();
        return Mono.fromCallable(() -> current.getAndDecrement())
                .doOnNext(i -> System.out.printf(
                        "Emit %d at %d ms%n",
                        i, (int)((nanoTime() - t0) * 1e-6)
                ));
    }

    public static void main(String[] args) {
        var source = countdown(4);
        // 如果值不等于 1,则引发错误。
        var oneOrError = source.filter(i -> i == 1)
                .switchIfEmpty(Mono.error(new RuntimeException("bad value")));

        // 最多重试 4 次。每次重试间隔约为 500 毫秒
        var retried = oneOrError.retryWhen(Retry.fixedDelay(4, Duration.ofMillis(500)));

        var result = retried.block();
        System.out.println("Result: "+result);
    }
}

上面的程序产生了以下输出:

Emit 4 at 173 ms
Emit 3 at 696 ms
Emit 2 at 1198 ms
Emit 1 at 1699 ms
Result: 1

注意

  • 在你的情况下,repeatWhen 产生了一个无限流,因为你传递给它的函数从不返回空的发布者。这导致持续订阅你的原始 mono(它返回 "2")。
  • 另外,由于 repeatWhensource/caller 重复值,所以你的 .takeUntil(response -> response.equals("1")) 语句永远不会匹配(它总是接收到 "2")。
英文:

repeat(int n) runs the flux n number of times. But in your case, you've used repeatWhen before that, and it produces an infinite flux. That's why your flux does not ends anymore.

If you want to retry subscription until you receive the right value, you can use retryWhen function. It takes a retry configuration as input, to limit the number of retries, specifies delays between retrys, etc. In your case, you can:

  1. Fail the stream if you do not receive the right value
  2. Append a retryWhen operation to retry subscription.

Let's illustrate it with an example:

import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.System.nanoTime;

public class RetryWhen {

    /**
     * Emit a decreased value on each subscription
     * @param initial First value to emit
     */
    static Mono&lt;Integer&gt; countdown(int initial) {
        var current = new AtomicInteger(initial);
        var t0 = nanoTime();
        return Mono.fromCallable(() -&gt; current.getAndDecrement())
                .doOnNext(i -&gt; System.out.printf(
                        &quot;Emit %d at %d ms%n&quot;,
                        i, (int)((nanoTime() - t0) * 1e-6)
                ));
    }

    public static void main(String[] args) {
        var source = countdown(4);
        // raise an error if value is different than one.
        var oneOrError = source.filter(i -&gt; i == 1)
                .switchIfEmpty(Mono.error(new RuntimeException(&quot;bad value&quot;)));

        // Retry 4 times maximum. Each retry is spaced by ~ 500 ms
        var retried = oneOrError.retryWhen(Retry.fixedDelay(4, Duration.ofMillis(500)));

        var result = retried.block();
        System.out.println(&quot;Result: &quot;+result);
    }
}

The above program produces:

Emit 4 at 173 ms
Emit 3 at 696 ms
Emit 2 at 1198 ms
Emit 1 at 1699 ms
Result: 1

Notes :

  • in your case, repeatWhen produces an infinite stream, because the function you passed to it never returns an empty publisher. It causes continuous subscription to your original mono (which returns "2").
  • Also, as repeatWhen repeat values from the source/caller, your .takeUntil(response -&gt; response.equals(&quot;1&quot;)) statement will never be matched (it will always receive "2").

huangapple
  • 本文由 发表于 2023年7月6日 15:27:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/76626438.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定