为什么合并的Flux在并发时不会交错?

huangapple go评论76阅读模式
英文:

Why merged Fluxes do not interleave concurrently?

问题

以下是翻译好的代码部分:

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Flux.merge(fun1(10), fun2(10)).subscribe(System.out::println);
    }

    public static Flux<Integer> fun1(int i) {
        return Flux.range(1, i).map(integer -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return integer * 2;
        });
    }

    public static Flux<Integer> fun2(int i) {
        return Flux.range(1, i).map(integer -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return integer * 2 + 1;
        });
    }
}
英文:

I'm new to the Reactor, so I'd like to know, why is merging in this sample code working not as it's subscribed in the documentation. One flux returns odd numbers with a delay of 3 secs and other returns evens with a delay of 2 secs, but when I subscribe, it prints results of the first flux and then of the second one sequentially, but not interspersed. Return is 2, 4, 6, 8, .... 1, 3, 5, 7 etc, while I'm expecting 2, 4, 3, 6, 8, 5, 10, 7 etc. Other words: why don't two sourse fluces run in parallel?
Here's a sample code

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Flux.merge(fun1(10), fun2(10)).subscribe(System.out::println);
    }

    public static Flux&lt;Integer&gt; fun1(int i) {
        return Flux.range(1, i).map(integer -&gt; {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return integer * 2;
        });
    }

    public static Flux&lt;Integer&gt; fun2(int i) {
        return Flux.range(1, i).map(integer -&gt; {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return integer * 2 + 1;
        });
    }
}

答案1

得分: 3

Reactor是并发无关的默认情况下所有操作都在主线程上运行在您的示例中这意味着`Thread.sleep`会阻塞主线程的进度这就是为什么第二个`Flux`在第一个之后才开始工作要解决这个问题...

**1. 您可以使用`delay`操作符它以非阻塞的方式等待并在内部切换到不同的线程**

```java
public static void main(String[] args) {
    Flux.merge(fun1(10), fun2(10))
            .doOnNext(System.out::println)
            .blockLast(); // 阻塞以使主线程等待后台线程完成
}

public static Flux<Integer> fun1(int i) {
    return Flux.range(1, i)
            .concatMap(integer -> Mono.delay(Duration.ofMillis(2000)).thenReturn(integer * 2));
}

public static Flux<Integer> fun2(int i) {
    return Flux.range(1, i)
            .concatMap(integer -> Mono.delay(Duration.ofMillis(3000)).thenReturn(integer * 2 + 1));
}

2. 您可以手动切换到不同的线程,使用Scheduler。在大多数实际用例中,您通常会使用Thread.sleep的地方,例如阻塞的数据库调用。

public static void main(String[] args) {
    Flux.merge(fun1(10), fun2(10))
            .doOnNext(System.out::println)
            .blockLast(); // 阻塞以使主线程等待后台线程完成
}

public static Flux<Integer> fun1(int i) {
    return Flux.range(1, i).map(integer -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return integer * 2;
    }).subscribeOn(Schedulers.boundedElastic());
}

public static Flux<Integer> fun2(int i) {
    return Flux.range(1, i).map(integer -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return integer * 2 + 1;
    }).subscribeOn(Schedulers.boundedElastic());
}

需要注意的是,如果您正在使用响应式的非阻塞客户端(例如Spring WebClient,R2DBC等),您无需显式指定任何Scheduler,因为这些客户端会像delay操作符的情况一样为您处理好线程调度。

有关文档参考阅读:https://projectreactor.io/docs/core/release/reference/#schedulers


<details>
<summary>英文:</summary>

Reactor is concurrency agnostic: by default everything runs on the main thread. In your example this means that `Thread.sleep` is blocking the progress in the main thread that&#39;s why the second `Flux` only starts to work after the first. To fix this...

**1. You can use `delay` operator which waits in a non-blocking manner and internally switches to a different thread.**

```java
public static void main(String[] args) {
    Flux.merge(fun1(10), fun2(10))
            .doOnNext(System.out::println)
            .blockLast(); // block so main thread waits for background threads to finish
}

public static Flux&lt;Integer&gt; fun1(int i) {
    return Flux.range(1, i)
            .concatMap(integer -&gt; Mono.delay(Duration.ofMillis(2000)).thenReturn(integer * 2));
}

public static Flux&lt;Integer&gt; fun2(int i) {
    return Flux.range(1, i)
            .concatMap(integer -&gt; Mono.delay(Duration.ofMillis(3000)).thenReturn(integer * 2 + 1));
}

2. You can switch to a different thread manually using a Scheduler. In most real-world use cases you'll need this as usually instead of Thread.sleep you have for example a blocking database call.

public static void main(String[] args) {
    Flux.merge(fun1(10), fun2(10))
            .doOnNext(System.out::println)
            .blockLast(); // block so main thread waits for background threads to finish
}

public static Flux&lt;Integer&gt; fun1(int i) {
    return Flux.range(1, i).map(integer -&gt; {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return integer * 2;
    }).subscribeOn(Schedulers.boundedElastic());
}

public static Flux&lt;Integer&gt; fun2(int i) {
    return Flux.range(1, i).map(integer -&gt; {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return integer * 2 + 1;
    }).subscribeOn(Schedulers.boundedElastic());
}

Note that though, if you are using a reactive non-blocking client (Spring WebClient, R2DBC, etc.) you don't need to specify any Scheduler explicitly as these clients take care of that for you just like in case of the delay operator.

Useful read in reference docs: https://projectreactor.io/docs/core/release/reference/#schedulers

答案2

得分: 0

默认情况下,subscription 在调用线程上进行,本例中即为 main 线程。它首先进入 fun1(int i),并完成内部的第一个 flux。然后进入 fun2(int i),并完成第二个 flux。Reactor 的流水线默认在调用线程上运行(与默认在 fork-join 池上运行的 CompletableFuture<T> 不同)。您可以在以下代码中看到交织的输出:

Flux.merge(fun1(10).subscribeOn(Schedulers.parallel()), fun2(10).subscribeOn(Schedulers.parallel())).subscribe(System.out::println);

考虑使用 Flux.delay 替代通过使用 Thread.sleep 阻塞线程。

英文:

By default, the subscription happens on the calling thread, which in this case would be main. It first enters fun1(int i), and completes the inner flux. Then it enters fun2(int i), and finishes the second flux. Reactor's pipelines run on the calling thread by default (unlike say CompletableFuture&lt;T&gt; that runs on the fork-join pool by default). You can see interweaved output in the following code:

Flux.merge(fun1(10).subscribeOn(Schedulers.parallel()), fun2(10).subscribeOn(Schedulers.parallel())).subscribe(System.out::println);

Consider using Flux.delay instead of blocking the thread by using Thread.sleep.

huangapple
  • 本文由 发表于 2020年10月24日 18:18:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/64512297.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定