如何合并活动和反应式流?

huangapple go评论78阅读模式
英文:

How could I merge active reactive streams?

问题

我刚开始探索项目反应器,我目前的理解是它的主要目的是以异步的方式从多个源获取信息。例如,我有一个源会快速产生数据,另一个源会比较慢地产生数据。我想要合并这两个源,并在我收到信息后尽快返回两者的信息。我尝试了类似这样的代码:

@RestController
public class ReactiveController {

    @RequestMapping("/info")
    public Flux<String> getInfoFromServices() {
        return Flux.merge(getDataFromSlowSource(), getDataFromFastSource()).take(10);
    }

    private Flux<String> getDataFromFastSource() {
        return Flux.generate(sink -> {
            sink.next("data from fast source\n");
            try {
                Thread.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    private Flux<String> getDataFromSlowSource() {
        return Flux.generate(sink -> {
            sink.next("data from slow source\n");
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

我期望从服务器端得到类似以下的响应:

data from fast source
data from fast source
data from slow source
data from fast source
data from fast source
data from slow source
data from fast source
data from fast source
data from slow source

但实际上我得到了:

data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source

那么,我是否能以某种方式在源产生数据后立即同时从这两个源获取数据呢?

英文:

I just started exploring project reactor and my current understanding of it's main purpose is to get information from several sources in asynchronous manner. For example, I have one source, that produces data fast and one, that produces data pretty slow. I wanna merge these two sources and return information from both of them as soon, as i got it.
I've tried code like this:

@RestController
public class ReactiveController {

@RequestMapping(&quot;/info&quot;)
public Flux&lt;String&gt; getInfoFromServices() {
    return Flux.merge(getDataFromSlowSource(), getDataFromFastSource()).take(10);
}

private Flux&lt;String&gt; getDataFromFastSource() {
    return Flux.generate(sink -&gt; {
        sink.next(&quot;data from fast source\n&quot;);
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

private Flux&lt;String&gt; getDataFromSlowSource() {
    return Flux.generate(sink -&gt; {
        sink.next(&quot;data from slow source\n&quot;);
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

I expected something like this in answer from server:

data from fast source
data from fast source
data from slow source
data from fast source
data from fast source
data from slow source
data from fast source
data from fast source
data from slow source

But I've got:

data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source

So, could I somehow take data from both sources at the same time as soon as source produces it?

答案1

得分: 2

你的问题在于Thread.sleep会阻塞当前线程,这就是你看到那种行为的原因。不要使用Thread.sleep,而是在快速和慢速数据源中使用delayElements,你会看到预期的行为。

private Flux<Object> getDataFromFastSource() {
    return Flux.generate(sink -> {
        sink.next("data from fast source\n");
    }).delayElements(Duration.ofSeconds(1));
}

private Flux<Object> getDataFromSlowSource() {
    return Flux.generate(sink -> {
        sink.next("data from slow source\n");
    }).delayElements(Duration.ofSeconds(2));
}

注意:

始终使整个反应式链条非阻塞,并使用适当的调度器来实现。更多信息请查看这里。

http://www.vinsguru.com/reactive-programming-schedulers/

英文:

Your problem is Thread.sleep which blocks the current thread. That is why you see that behavior. Instead of using Thread.sleep, use delayElements in the fast and slow sources, you would see the expected behavior.

private Flux&lt;Object&gt; getDataFromFastSource() {
    return Flux.generate(sink -&gt; {
        sink.next(&quot;data from fast source\n&quot;);
    }).delayElements(Duration.ofSeconds(1));
}

private Flux&lt;Object&gt; getDataFromSlowSource() {
    return Flux.generate(sink -&gt; {
        sink.next(&quot;data from slow source\n&quot;);
    }).delayElements(Duration.ofSeconds(2));
}

Note:

Always make the whole reactor chain non-blocking and use appropriate schedulers to do that. More info is here.

http://www.vinsguru.com/reactive-programming-schedulers/

答案2

得分: 2

你的问题是合并操作、慢速源和快速源都在同一个线程上运行。因此,这两个源之间没有竞争。

如果你修改你的代码如下,使慢速源和快速源在不同的线程(调度器)上运行,你将会看到预期的结果:

@RequestMapping("/info")
public Flux<String> getInfoFromServices() {
    return Flux.merge(
      getDataFromSlowSource().subscribeOn(Schedulers.boundedElastic()),
      getDataFromFastSource().subscribeOn(Schedulers.boundedElastic())
    ).take(10);
}

关于vins的回答:delayElements(duration)方法也使用了调度器。

英文:

Your problem is that merge operation, slow source and fast source all run on the same thread. Thus there is no race between the two sources.

If you modify your code like this, so that slow and fast sources run on separate threads (schedulers), you will see the expected result:

@RequestMapping(&quot;/info&quot;)
public Flux&lt;String&gt; getInfoFromServices() {
    return Flux.merge(
      getDataFromSlowSource().subscribeOn(Schedulers.boundedElastic(),
      getDataFromFastSource().subscribeOn(Schedulers.boundedElastic()
    ).take(10);
}

Regarding vins's answer: delayElements(duration) method also employs a scheduler.

huangapple
  • 本文由 发表于 2020年7月28日 22:29:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/63136493.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定