响应式编程(Reactor):为什么主线程被阻塞?

huangapple go评论74阅读模式
英文:

Reactive programming (Reactor) : Why main thread is stuck?

问题

我正在学习使用project-reactor进行响应式编程。

我有以下的测试案例:

@Test
public void createAFlux_just() {
    Flux<String> fruitFlux = Flux.just("apple", "orange");
    fruitFlux.subscribe(f -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(f);
    });
    System.out.println("hello main thread");
}

通过执行测试,似乎主线程被阻塞了5秒。

我预期订阅的消费者应该在自己的线程中异步运行,也就是说,subscribe方法应该立即返回到主线程,因此hello main thread应该立即打印出来。

英文:

I'm learning Reactive programming with project-reactor.

I have the following test case:

@Test
public void createAFlux_just() {
	Flux&lt;String&gt; fruitFlux = Flux.just(&quot;apple&quot;, &quot;orange&quot;);
	fruitFlux.subscribe(f -&gt; {
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(f);
	});
	System.out.println(&quot;hello main thread&quot;);
}

By executing the test it seems that the main thread is stuck for 5 seconds.

I would expect that the subscribed consumer should run asynchronously in its own thread, that is, the subscribe invoke should return immediately to the main thread and consequently the hello main thread should print instantly.

答案1

得分: 2

主线程被卡住,因为订阅发生在 main 线程上。如果你想要它以异步方式运行,你需要将订阅发生在除 main 之外的线程上。你可以这样做:

@Test
public void createAFlux_just() {
    Flux<String> fruitFlux = Flux.just("apple", "orange");
    fruitFlux.subscribeOn(Schedulers.parallel()).subscribe(f -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(f);
    });
    System.out.println("hello main thread");
}

注意:我使用了 parallel 线程池。你可以使用任何你喜欢的线程池。Reactor 的管道默认在调用线程上执行(与 CompletableFuture<T> 不同,默认在 ForkJoin 线程池上运行)。

英文:

The main thread is stuck because the subscription happens on the main thread. If you want it to run asynchronously, you need to the subscription to happen on a thread other than main. You could do this as:

 @Test
public void createAFlux_just() {
    Flux&lt;String&gt; fruitFlux = Flux.just(&quot;apple&quot;, &quot;orange&quot;);
    fruitFlux.subscribeOn(Schedulers.parallel()).subscribe(f -&gt; {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(f);
    });
    System.out.println(&quot;hello main thread&quot;);
}

Note: I have used the parallel thread pool. You could use whatever pool you like. Reactor's pipelines are executed on the calling thread by default (unlike CompletableFuture&lt;T&gt; which runs in the ForkJoin pool by default).

答案2

得分: 1

这种行为会发生在您拥有一个异步的可观察对象(Flux)的情况下。您选择使用一个包含两个即时可用值的Flux,通过使用"just"方法来实现。由于这些值是立即可用的,它们会立即传递给订阅对象。

英文:

This behavior would be the case if you had an observable (Flux) that was asynchronous. You chose to use a Flux with two readily available values by using the just method. They were passed to the subscription object right away since they were immediately available.

答案3

得分: 1

来自 spring.io [documentation][1]

&gt; **线程模型**
Reactor 操作符通常与并发无关:它们不会强加特定的线程模型,只是在调用其 onNext 方法的线程上运行。
&gt; 
&gt; **调度器抽象**
在 Reactor 中,调度器是一个抽象,它使用户能够控制线程。调度器可以生成 Worker,它们在概念上是线程,但不一定由线程支持(稍后我们将看到一个示例)。调度器还包括时钟的概念,而 Worker 纯粹与任务调度有关。


因此,您应该通过 `subscribeOn` 方法在不同的线程上订阅,并且 `Thread.sleep(5000)` 将使调度程序的线程休眠。您可以在文档中找到更多类似这个示例的内容。

Flux.just("hello")
.doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
.publishOn(Scheduler.boundedElastic())
.doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
.delayElements(Duration.ofMillis(500))
.subscribeOn(Schedulers.elastic())
.subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));


  [1]: https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers
英文:

from spring.io documentation

> The Threading Model
Reactor operators generally are concurrent agnostic: they don’t impose a particular threading model and just run on the Thread on which their onNext method was invoked.
>
> The Scheduler abstraction
In Reactor, a Scheduler is an abstraction that gives the user control about threading. A Scheduler can spawn Worker which are conceptually Threads, but are not necessarily backed by a Thread (we’ll see an example of that later). A Scheduler also includes the notion of a clock, whereas the Worker is purely about scheduling tasks.

so you should subscribe on different thread by subscribeOn method and the Thread.sleep(5000) will sleep thread of the scheduler. You can see more examples like this one in the documentation.

Flux.just(&quot;hello&quot;)
    .doOnNext(v -&gt; System.out.println(&quot;just &quot; + Thread.currentThread().getName()))
    .publishOn(Scheduler.boundedElastic())
    .doOnNext(v -&gt; System.out.println(&quot;publish &quot; + Thread.currentThread().getName()))
    .delayElements(Duration.ofMillis(500))
    .subscribeOn(Schedulers.elastic())
    .subscribe(v -&gt; System.out.println(v + &quot; delayed &quot; + Thread.currentThread().getName()));

huangapple
  • 本文由 发表于 2020年9月28日 14:43:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/64097135.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定