Mono.just()在我的使用情况下仍然是阻塞的,当发出的元素处于挂起状态时

huangapple go评论57阅读模式
英文:

Mono.just() is still blocking in my use case when the emitted element is pending

问题

以下是您要翻译的内容:

第一个测试中,我直接发出了一个字符串,但在map()中使其阻塞。

    @Test
    public void testMono() throws InterruptedException {

        for (int i = 0; i < 10; i++) {
            Mono.just("Hello World")
                    .map(s -> {
                        System.out.println("Thread:" + Thread.currentThread().getName() + " " + s);
                        try {
                            sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return s.length();
                    })
                    .map(integer -> {
                        try {
                            sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return integer.toString();
                    })
                    .subscribeOn(Schedulers.parallel())

                    .subscribe(System.out::println);

            System.out.println("Loop: " + i + " " + "after MONO");
        }

        sleep(10000);
    }

但结果如预期的非阻塞,因为所有的输出 System.out.println("Loop: " + i + " " + "after MONO"); 在同一时间一起显示。

然而,在第二个测试中,我将发出的元素从"Hello World"替换为一个阻塞的getString()方法。将sleep()放在getString()中的目的是模拟获取发出的元素需要时间的情况。
结果现在是阻塞的,因为输出会在完成接收发出的元素后一个接一个地显示。

    @Test
    public void testMono2() throws InterruptedException {

        for (int i = 0; i < 10; i++) {
            Mono.just(getString())
                    .map(s -> {
                        System.out.println("Thread:" + Thread.currentThread().getName() + " " + s);
                        try {
                            sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return s.length();
                    })
                    .map(integer -> {
                        try {
                            sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return integer.toString();
                    })
                    .subscribeOn(Schedulers.parallel())

                    .subscribe(System.out::println);

            System.out.println("Loop: " + i + " " + "after MONO");
        }

        sleep(10000);
    }

    public String getString(){
        try {
            sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Hello World";
    }

在第三个测试中,它也表现得非阻塞。

    int count = 0;

    @Test
    public void testFluxSink() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            System.out.println("Start Loop: " + i);
            int curLoop = count++;
            Flux.create(fluxSink -> {
                try {
                    System.out.println("Loop: " + curLoop + " " + " start sleeping");
                    sleep(5000);
                    System.out.println("Loop: " + curLoop + " " + " end sleeping");
                    fluxSink.next(onNext());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).subscribeOn(Schedulers.parallel()).subscribe(s -> {
                System.out.println(Thread.currentThread());
                System.out.println("Loop: " + curLoop + " " + "completed receiving" + " " + s);
            });
        }
        sleep(100000);
    }

    public String onNext() throws InterruptedException {
        sleep(5000);
        return "onNext";
    }

想知道是否我误解了反应器的概念,并想知道我在第二个测试中使用它的方式是否有误?

英文:

I have two test that use sleep() to simulate an api call which takes time to process, and test if Mono.just() make it non-blocking.

In my first test, I emitted a String directly, but made it blocking in map().

    @Test
public void testMono() throws InterruptedException {
for (int i = 0; i < 10; i++) {
Mono.just("Hello World")
.map(s -> {
System.out.println("Thread:" + Thread.currentThread().getName() + " " + s);
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
})
.map(integer -> {
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer.toString();
})
.subscribeOn(Schedulers.parallel())
.subscribe(System.out::println);
System.out.println("Loop: " + i + " " + "after MONO");
}
sleep(10000);
}

And the result is expected as non-blocking, since all the outputs System.out.println("Loop: " + i + " " + "after MONO"); show up together at the same time.

However, in the second test, I replace the emitted element from "Hello world" to a blocking getString() method. The purpose of put sleep() in getString() is that, I'd like to simulate the scenario that it's taking time to get the emitted element.
And the result is now blocking since the output shows up one by one after it finished receiving the emitted element.

    @Test
public void testMono2() throws InterruptedException {
for (int i = 0; i < 10; i++) {
Mono.just(getString())
.map(s -> {
System.out.println("Thread:" + Thread.currentThread().getName() + " " + s);
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
})
.map(integer -> {
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer.toString();
})
.subscribeOn(Schedulers.parallel())
.subscribe(System.out::println);
System.out.println("Loop: " + i + " " + "after MONO");
}
sleep(10000);
}
public String getString(){
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello world";
}

And in the third test, it's also acting as non-blocking.

    int count = 0;
@Test
public void testFluxSink() throws InterruptedException {
for (int i = 0; i < 10; i++) {
System.out.println("Start Loop: " + i);
int curLoop = count++;
Flux.create(fluxSink -> {
try {
System.out.println("Loop: " + curLoop + " " + " start sleeping");
sleep(5000);
System.out.println("Loop: " + curLoop + " " + " end sleeping");
fluxSink.next(onNext());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).subscribeOn(Schedulers.parallel()).subscribe(s -> {
System.out.println(Thread.currentThread());
System.out.println("Loop: " + curLoop + " " + "completed receiving" + " " + s);
});
}
sleep(100000);
}
public String onNext() throws InterruptedException {
sleep(5000);
return "onNext";
}

Wondering if I misunderstand the concept of reactor, and would like to know how did I use it in the wrong way for the 2nd test?

答案1

得分: 1

首先,关键要理解的是 Reactor 并不会将阻塞调用魔法般地转换为非阻塞调用。要创建一个端到端的非阻塞应用程序,您需要使用非阻塞驱动程序、NIO 等。如果您将阻塞代码包装到 MonoFlux 中,线程仍然会被阻塞。

至于您的第二个示例,在 getString 方法中使用了 sleep,需要注意两点:

第一点:您使用了 Mono.just。重要的是,它通常用于立即提供已知值。这意味着 getString 的计算不会发生在 Reactor 链的范围内,而是在 Reactor 链组装阶段的主线程中进行。这就是为什么您看到“sequential”行为。如果您将其替换为 Mono.fromCallable,那么计算将在 Reactor 链中进行(并行调度器线程),您将看到与示例 1 或 3 中相同的行为。

第二点:重要的是要注意,将 getString 方法包装到 Mono.fromCallable 中并不会使您的代码变成非阻塞的。您的 sleep 仍然会暂停并行调度器的线程。在生产代码中,您的 getString 方法可能是数据库调用或其他应该使用非阻塞驱动程序或基于 NIO 的库(如 Netty)来完成的服务调用。要模拟它,请使用 delayElement 而不是 sleep,它以非阻塞方式工作。以下是一个示例:

public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < 3; i++) {
        getString()
            .map(s -> {
                System.out.println(Instant.now() + " Thread:" + Thread.currentThread().getName() + " " + s);
                return s.length();
            })
            .subscribeOn(Schedulers.parallel())
            .subscribe();
        System.out.println(Instant.now() + " Loop: " + i + " " + "after MONO");
    }

    TimeUnit.SECONDS.sleep(10);
}

public static Mono<String> getString() {
    return Mono.delay(Duration.ofSeconds(3))
        .map(it -> "hello world");
}

它会打印:

2023-05-17T20:31:12.099464Z Loop: 0 after MONO
2023-05-17T20:31:12.112953Z Loop: 1 after MONO
2023-05-17T20:31:12.113145Z Loop: 2 after MONO
2023-05-17T20:31:15.105750Z Thread:parallel-2 hello world
2023-05-17T20:31:15.117944Z Thread:parallel-5 hello world
2023-05-17T20:31:15.117944Z Thread:parallel-6 hello world

希望这对您有帮助。

英文:

First of all, it is crucial to understand that Reactor does not perform any magical conversion of blocking calls to non-blocking ones. To make an end-to-end non-blocking application, you need to use non-blocking drivers, NIO, etc. If you wrap your blocking code to the Mono or Flux, the thread is still will be blocked.

As for your second example, with sleep in the getString method. There are two points to pay attention at:

The first one: you use Mono.just. The important thing is that it is usually used to provide a known value immediately. It means that the getString calculation is happening not in the scope of the reactor chain, but in the main thread during the reactor chain assembling phase. That's why you see "sequential" behavior. If you replace it with Mono.fromCallable, then the calculation will be in the reactor chain (parallel scheduler threads), and you will see the same behavior as in the 1 or 3 examples.

The second one: It's important to note, that wrapping your getString method to the Mono.fromCallable does not make your code non-blocking. Your sleep still halts the threads of the parallel scheduler. In your production code, your getString method will likely be some database call or other service call that should be done via a non-blocking driver or NIO-based lib like Netty. To emulate it use delayElement instead of sleep, it works in a non-blocking way. Here is an example:

public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i &lt; 3; i++) {
        getString()
            .map(s -&gt; {
                System.out.println(Instant.now() + &quot; Thread:&quot; + Thread.currentThread().getName() + &quot; &quot; + s);
                return s.length();
            })
            .subscribeOn(Schedulers.parallel())
            .subscribe();
        System.out.println(Instant.now() + &quot; Loop: &quot; + i + &quot; &quot; + &quot;after MONO&quot;);
    }

    TimeUnit.SECONDS.sleep(10);
}

public static Mono&lt;String&gt; getString() {
    return Mono.delay(Duration.ofSeconds(3))
        .map(it -&gt; &quot;hello world&quot;);
}

It prints

2023-05-17T20:31:12.099464Z Loop: 0 after MONO
2023-05-17T20:31:12.112953Z Loop: 1 after MONO
2023-05-17T20:31:12.113145Z Loop: 2 after MONO
2023-05-17T20:31:15.105750Z Thread:parallel-2 hello world
2023-05-17T20:31:15.117944Z Thread:parallel-5 hello world
2023-05-17T20:31:15.117944Z Thread:parallel-6 hello world

huangapple
  • 本文由 发表于 2023年5月18日 02:23:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76275150.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定