RxJava中断并行执行,当找到第一个结果时停止

huangapple go评论73阅读模式
英文:

RxJava interrupt parallel execution when first result found

问题

我正在运行一些具有并行延迟的服务。任务是不等待所有服务都执行完毕。

以下是您的代码的翻译:

Observable.just(2, 3, 5)
    .map(delay -> serviceReturningSingleWithDelay(delay))
    .toList()
    .flatMap(list ->
        Single.zip(list, output ->
            Arrays.stream(output)
                .map(delay -> (Integer) delay)
                .filter(delay -> delay == 3)
                .findFirst()
                .orElse(0)
        ))
    .subscribe(System.out::println);
private Single<Integer> serviceReturningSingleWithDelay(Integer delay) {
    return Single.just(delay)
        .delay(delay, TimeUnit.SECONDS)
        .doOnSuccess(s -> System.out.printf("延迟 %d: 线程 : %s \n", delay, Thread.currentThread().getName()));
}

现在我的输出是:

延迟 2: 线程 : RxComputationThreadPool-1
延迟 3: 线程 : RxComputationThreadPool-2
延迟 5: 线程 : RxComputationThreadPool-3
3

期望的结果是在 RxComputationThreadPool-3 线程执行完毕之前获得过滤值 3。感谢任何想法。

英文:

I am running some service working with delay in parallel. The task is to not wait while all services have finished execution.

    Observable.just(2, 3, 5)
                .map(delay -&gt; serviceReturningSingleWithDelay(delay))
                .toList()
                .flatMap(list -&gt;
                        Single.zip(list, output -&gt; Arrays.stream(output)
                                .map(delay -&gt; (Integer) delay)
                                .filter(delay -&gt; delay == 3)
                                .findFirst()
                                .orElse(0)
                        ))
                .subscribe(System.out::println);
 private Single&lt;Integer&gt; serviceReturningSingleWithDelay(Integer delay) {
        return Single.just(delay)
                .delay(delay, TimeUnit.SECONDS)
                .doOnSuccess(s -&gt; System.out.printf(&quot;Delay %d: Thread : %s \n&quot;, delay, Thread.currentThread().getName()));
    }

Now my output is:

Delay 2: Thread : RxComputationThreadPool-1 
Delay 3: Thread : RxComputationThreadPool-2 
Delay 5: Thread : RxComputationThreadPool-3 
3

The desired result is to obtain filtered value - 3 before RxComputationThreadPool-3 thread finished execution.
I will be thankful for any ideas.

答案1

得分: 1

如果您想要并行运行它们,并在收到值3时退出,您不需要使用zip。而是使用takeWhile来中断您的可观察对象,就像以下示例一样:

Observable.just(2, 3, 5)
          .flatMapSingle(this::serviceReturningSingleWithDelay)
          .takeWhile(e -> e != 3)
          .subscribe(System.out::println);

如果您想要获取值3,请改用takeUntil(e -> e == 3),而不是takeWhile(e -> e != 3)

英文:

If you want to run them all in parallel and exit when you receive value 3, you don't need to use zip. Rather use takeWhile to interrupt your observable like the following :

Observable.just(2, 3, 5)
          .flatMapSingle(this::serviceReturningSingleWithDelay)
          .takeWhile(e -&gt; e != 3)
          .subscribe(System.out::println);

And if you want the 3 value use takeUntil(e -&gt; e == 3) instead of takeWhile(e -&gt; e != 3)

答案2

得分: 0

另一种基于 @bubbles 回答的方法。如果你想保留 flatmap 并且在 Kotlin 中:

fun main(args: Array<String>) {
    Observable.just(5L, 8L, 10L)
        .flatMap {
            serviceReturningSingleWithDelay(it).toObservable()
        }
        .takeWhile { delay -> delay != 8L }
        .subscribeBy(
            onNext = { println("延迟时间 $it")}
        )

    Thread.sleep(10000)
}

private fun serviceReturningSingleWithDelay(delay: Long): Single<Long> {
    return Single.just(delay)
        .delay(delay, TimeUnit.SECONDS)
        .doOnSuccess {
            println("延迟 $it 线程 ${Thread.currentThread().name}")
        }
}
英文:

Another way based on @bubbles answer. If you want to keep the flatmap and in Kotlin

fun main(args: Array&lt;String&gt;) {
    Observable.just(5L, 8L, 10L)
            .flatMap {
                serviceReturningSingleWithDelay(it).toObservable()
            }
            .takeWhile { delay -&gt; delay != 8L }
            .subscribeBy(
                    onNext = { println(&quot;Delay period $it&quot;)}
            )

    Thread.sleep(10000)
}

private fun serviceReturningSingleWithDelay(delay: Long): Single&lt;Long&gt; {
    return Single.just(delay)
            .delay(delay, TimeUnit.SECONDS)
            .doOnSuccess {
                println(&quot;Delay $it Thread ${Thread.currentThread().name}&quot;)
            }
}

huangapple
  • 本文由 发表于 2020年9月18日 20:42:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/63956012.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定