英文:
Why merged Fluxes do not interleave concurrently?
问题
以下是翻译好的代码部分:
public class Main {
public static void main(String[] args) throws InterruptedException {
Flux.merge(fun1(10), fun2(10)).subscribe(System.out::println);
}
public static Flux<Integer> fun1(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2;
});
}
public static Flux<Integer> fun2(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2 + 1;
});
}
}
英文:
I'm new to the Reactor, so I'd like to know, why is merging in this sample code working not as it's subscribed in the documentation. One flux returns odd numbers with a delay of 3 secs and other returns evens with a delay of 2 secs, but when I subscribe, it prints results of the first flux and then of the second one sequentially, but not interspersed. Return is 2, 4, 6, 8, .... 1, 3, 5, 7 etc, while I'm expecting 2, 4, 3, 6, 8, 5, 10, 7 etc. Other words: why don't two sourse fluces run in parallel?
Here's a sample code
public class Main {
public static void main(String[] args) throws InterruptedException {
Flux.merge(fun1(10), fun2(10)).subscribe(System.out::println);
}
public static Flux<Integer> fun1(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2;
});
}
public static Flux<Integer> fun2(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2 + 1;
});
}
}
答案1
得分: 3
Reactor是并发无关的:默认情况下,所有操作都在主线程上运行。在您的示例中,这意味着`Thread.sleep`会阻塞主线程的进度,这就是为什么第二个`Flux`在第一个之后才开始工作。要解决这个问题...
**1. 您可以使用`delay`操作符,它以非阻塞的方式等待,并在内部切换到不同的线程。**
```java
public static void main(String[] args) {
Flux.merge(fun1(10), fun2(10))
.doOnNext(System.out::println)
.blockLast(); // 阻塞以使主线程等待后台线程完成
}
public static Flux<Integer> fun1(int i) {
return Flux.range(1, i)
.concatMap(integer -> Mono.delay(Duration.ofMillis(2000)).thenReturn(integer * 2));
}
public static Flux<Integer> fun2(int i) {
return Flux.range(1, i)
.concatMap(integer -> Mono.delay(Duration.ofMillis(3000)).thenReturn(integer * 2 + 1));
}
2. 您可以手动切换到不同的线程,使用Scheduler
。在大多数实际用例中,您通常会使用Thread.sleep
的地方,例如阻塞的数据库调用。
public static void main(String[] args) {
Flux.merge(fun1(10), fun2(10))
.doOnNext(System.out::println)
.blockLast(); // 阻塞以使主线程等待后台线程完成
}
public static Flux<Integer> fun1(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2;
}).subscribeOn(Schedulers.boundedElastic());
}
public static Flux<Integer> fun2(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2 + 1;
}).subscribeOn(Schedulers.boundedElastic());
}
需要注意的是,如果您正在使用响应式的非阻塞客户端(例如Spring WebClient,R2DBC等),您无需显式指定任何Scheduler
,因为这些客户端会像delay
操作符的情况一样为您处理好线程调度。
有关文档参考阅读:https://projectreactor.io/docs/core/release/reference/#schedulers
<details>
<summary>英文:</summary>
Reactor is concurrency agnostic: by default everything runs on the main thread. In your example this means that `Thread.sleep` is blocking the progress in the main thread that's why the second `Flux` only starts to work after the first. To fix this...
**1. You can use `delay` operator which waits in a non-blocking manner and internally switches to a different thread.**
```java
public static void main(String[] args) {
Flux.merge(fun1(10), fun2(10))
.doOnNext(System.out::println)
.blockLast(); // block so main thread waits for background threads to finish
}
public static Flux<Integer> fun1(int i) {
return Flux.range(1, i)
.concatMap(integer -> Mono.delay(Duration.ofMillis(2000)).thenReturn(integer * 2));
}
public static Flux<Integer> fun2(int i) {
return Flux.range(1, i)
.concatMap(integer -> Mono.delay(Duration.ofMillis(3000)).thenReturn(integer * 2 + 1));
}
2. You can switch to a different thread manually using a Scheduler
. In most real-world use cases you'll need this as usually instead of Thread.sleep
you have for example a blocking database call.
public static void main(String[] args) {
Flux.merge(fun1(10), fun2(10))
.doOnNext(System.out::println)
.blockLast(); // block so main thread waits for background threads to finish
}
public static Flux<Integer> fun1(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2;
}).subscribeOn(Schedulers.boundedElastic());
}
public static Flux<Integer> fun2(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2 + 1;
}).subscribeOn(Schedulers.boundedElastic());
}
Note that though, if you are using a reactive non-blocking client (Spring WebClient, R2DBC, etc.) you don't need to specify any Scheduler
explicitly as these clients take care of that for you just like in case of the delay
operator.
Useful read in reference docs: https://projectreactor.io/docs/core/release/reference/#schedulers
答案2
得分: 0
默认情况下,subscription
在调用线程上进行,本例中即为 main
线程。它首先进入 fun1(int i)
,并完成内部的第一个 flux
。然后进入 fun2(int i)
,并完成第二个 flux
。Reactor 的流水线默认在调用线程上运行(与默认在 fork-join
池上运行的 CompletableFuture<T>
不同)。您可以在以下代码中看到交织的输出:
Flux.merge(fun1(10).subscribeOn(Schedulers.parallel()), fun2(10).subscribeOn(Schedulers.parallel())).subscribe(System.out::println);
考虑使用 Flux.delay
替代通过使用 Thread.sleep
阻塞线程。
英文:
By default, the subscription
happens on the calling thread, which in this case would be main
. It first enters fun1(int i)
, and completes the inner flux
. Then it enters fun2(int i)
, and finishes the second flux
. Reactor's pipelines run on the calling thread by default (unlike say CompletableFuture<T>
that runs on the fork-join
pool by default). You can see interweaved output in the following code:
Flux.merge(fun1(10).subscribeOn(Schedulers.parallel()), fun2(10).subscribeOn(Schedulers.parallel())).subscribe(System.out::println);
Consider using Flux.delay
instead of blocking the thread by using Thread.sleep
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论