英文:
Reactive programming (Reactor) : Why main thread is stuck?
问题
我正在学习使用project-reactor进行响应式编程。
我有以下的测试案例:
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux.just("apple", "orange");
fruitFlux.subscribe(f -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(f);
});
System.out.println("hello main thread");
}
通过执行测试,似乎主线程被阻塞了5秒。
我预期订阅的消费者应该在自己的线程中异步运行,也就是说,subscribe方法应该立即返回到主线程,因此hello main thread
应该立即打印出来。
英文:
I'm learning Reactive programming with project-reactor.
I have the following test case:
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux.just("apple", "orange");
fruitFlux.subscribe(f -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(f);
});
System.out.println("hello main thread");
}
By executing the test it seems that the main thread is stuck for 5 seconds.
I would expect that the subscribed consumer should run asynchronously in its own thread, that is, the subscribe invoke should return immediately to the main thread and consequently the hello main thread
should print instantly.
答案1
得分: 2
主线程被卡住,因为订阅发生在 main
线程上。如果你想要它以异步方式运行,你需要将订阅发生在除 main
之外的线程上。你可以这样做:
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux.just("apple", "orange");
fruitFlux.subscribeOn(Schedulers.parallel()).subscribe(f -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(f);
});
System.out.println("hello main thread");
}
注意:我使用了 parallel
线程池。你可以使用任何你喜欢的线程池。Reactor 的管道默认在调用线程上执行(与 CompletableFuture<T>
不同,默认在 ForkJoin
线程池上运行)。
英文:
The main thread is stuck because the subscription happens on the main
thread. If you want it to run asynchronously, you need to the subscription to happen on a thread other than main
. You could do this as:
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux.just("apple", "orange");
fruitFlux.subscribeOn(Schedulers.parallel()).subscribe(f -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(f);
});
System.out.println("hello main thread");
}
Note: I have used the parallel
thread pool. You could use whatever pool you like. Reactor's pipelines are executed on the calling thread by default (unlike CompletableFuture<T>
which runs in the ForkJoin
pool by default).
答案2
得分: 1
这种行为会发生在您拥有一个异步的可观察对象(Flux)的情况下。您选择使用一个包含两个即时可用值的Flux,通过使用"just"方法来实现。由于这些值是立即可用的,它们会立即传递给订阅对象。
英文:
This behavior would be the case if you had an observable (Flux) that was asynchronous. You chose to use a Flux with two readily available values by using the just method. They were passed to the subscription object right away since they were immediately available.
答案3
得分: 1
来自 spring.io [documentation][1]
> **线程模型**
Reactor 操作符通常与并发无关:它们不会强加特定的线程模型,只是在调用其 onNext 方法的线程上运行。
>
> **调度器抽象**
在 Reactor 中,调度器是一个抽象,它使用户能够控制线程。调度器可以生成 Worker,它们在概念上是线程,但不一定由线程支持(稍后我们将看到一个示例)。调度器还包括时钟的概念,而 Worker 纯粹与任务调度有关。
因此,您应该通过 `subscribeOn` 方法在不同的线程上订阅,并且 `Thread.sleep(5000)` 将使调度程序的线程休眠。您可以在文档中找到更多类似这个示例的内容。
Flux.just("hello")
.doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
.publishOn(Scheduler.boundedElastic())
.doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
.delayElements(Duration.ofMillis(500))
.subscribeOn(Schedulers.elastic())
.subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));
[1]: https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers
英文:
from spring.io documentation
> The Threading Model
Reactor operators generally are concurrent agnostic: they don’t impose a particular threading model and just run on the Thread on which their onNext method was invoked.
>
> The Scheduler abstraction
In Reactor, a Scheduler is an abstraction that gives the user control about threading. A Scheduler can spawn Worker which are conceptually Threads, but are not necessarily backed by a Thread (we’ll see an example of that later). A Scheduler also includes the notion of a clock, whereas the Worker is purely about scheduling tasks.
so you should subscribe on different thread by subscribeOn
method and the Thread.sleep(5000)
will sleep thread of the scheduler. You can see more examples like this one in the documentation.
Flux.just("hello")
.doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
.publishOn(Scheduler.boundedElastic())
.doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
.delayElements(Duration.ofMillis(500))
.subscribeOn(Schedulers.elastic())
.subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论