英文:
RxJava interrupt parallel execution when first result found
问题
我正在运行一些具有并行延迟的服务。任务是不等待所有服务都执行完毕。
以下是您的代码的翻译:
Observable.just(2, 3, 5)
.map(delay -> serviceReturningSingleWithDelay(delay))
.toList()
.flatMap(list ->
Single.zip(list, output ->
Arrays.stream(output)
.map(delay -> (Integer) delay)
.filter(delay -> delay == 3)
.findFirst()
.orElse(0)
))
.subscribe(System.out::println);
private Single<Integer> serviceReturningSingleWithDelay(Integer delay) {
return Single.just(delay)
.delay(delay, TimeUnit.SECONDS)
.doOnSuccess(s -> System.out.printf("延迟 %d: 线程 : %s \n", delay, Thread.currentThread().getName()));
}
现在我的输出是:
延迟 2: 线程 : RxComputationThreadPool-1
延迟 3: 线程 : RxComputationThreadPool-2
延迟 5: 线程 : RxComputationThreadPool-3
3
期望的结果是在 RxComputationThreadPool-3 线程执行完毕之前获得过滤值 3。感谢任何想法。
英文:
I am running some service working with delay in parallel. The task is to not wait while all services have finished execution.
Observable.just(2, 3, 5)
.map(delay -> serviceReturningSingleWithDelay(delay))
.toList()
.flatMap(list ->
Single.zip(list, output -> Arrays.stream(output)
.map(delay -> (Integer) delay)
.filter(delay -> delay == 3)
.findFirst()
.orElse(0)
))
.subscribe(System.out::println);
private Single<Integer> serviceReturningSingleWithDelay(Integer delay) {
return Single.just(delay)
.delay(delay, TimeUnit.SECONDS)
.doOnSuccess(s -> System.out.printf("Delay %d: Thread : %s \n", delay, Thread.currentThread().getName()));
}
Now my output is:
Delay 2: Thread : RxComputationThreadPool-1
Delay 3: Thread : RxComputationThreadPool-2
Delay 5: Thread : RxComputationThreadPool-3
3
The desired result is to obtain filtered value - 3 before RxComputationThreadPool-3 thread finished execution.
I will be thankful for any ideas.
答案1
得分: 1
如果您想要并行运行它们,并在收到值3时退出,您不需要使用zip
。而是使用takeWhile
来中断您的可观察对象,就像以下示例一样:
Observable.just(2, 3, 5)
.flatMapSingle(this::serviceReturningSingleWithDelay)
.takeWhile(e -> e != 3)
.subscribe(System.out::println);
如果您想要获取值3
,请改用takeUntil(e -> e == 3)
,而不是takeWhile(e -> e != 3)
。
英文:
If you want to run them all in parallel and exit when you receive value 3, you don't need to use zip
. Rather use takeWhile
to interrupt your observable like the following :
Observable.just(2, 3, 5)
.flatMapSingle(this::serviceReturningSingleWithDelay)
.takeWhile(e -> e != 3)
.subscribe(System.out::println);
And if you want the 3
value use takeUntil(e -> e == 3)
instead of takeWhile(e -> e != 3)
答案2
得分: 0
另一种基于 @bubbles 回答的方法。如果你想保留 flatmap
并且在 Kotlin 中:
fun main(args: Array<String>) {
Observable.just(5L, 8L, 10L)
.flatMap {
serviceReturningSingleWithDelay(it).toObservable()
}
.takeWhile { delay -> delay != 8L }
.subscribeBy(
onNext = { println("延迟时间 $it")}
)
Thread.sleep(10000)
}
private fun serviceReturningSingleWithDelay(delay: Long): Single<Long> {
return Single.just(delay)
.delay(delay, TimeUnit.SECONDS)
.doOnSuccess {
println("延迟 $it 线程 ${Thread.currentThread().name}")
}
}
英文:
Another way based on @bubbles answer. If you want to keep the flatmap
and in Kotlin
fun main(args: Array<String>) {
Observable.just(5L, 8L, 10L)
.flatMap {
serviceReturningSingleWithDelay(it).toObservable()
}
.takeWhile { delay -> delay != 8L }
.subscribeBy(
onNext = { println("Delay period $it")}
)
Thread.sleep(10000)
}
private fun serviceReturningSingleWithDelay(delay: Long): Single<Long> {
return Single.just(delay)
.delay(delay, TimeUnit.SECONDS)
.doOnSuccess {
println("Delay $it Thread ${Thread.currentThread().name}")
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论