RxJS:使用mergeMap并发时,Observable永不完成。

huangapple go评论72阅读模式
英文:

RxJS: Observable never completes when using mergeMap with concurrent

问题

我正在使用 RxJS 来调用数组的每个元素的异步函数。
我希望只有在元素满足特定条件时才调用异步函数,并且我还希望限制并发执行的数量。

所以我使用了带有 concurrent 参数的 mergeMap 操作符,但是我遇到了奇怪的行为。

最小可重现的示例是:

import { from, timer, mergeMap, EMPTY } from "rxjs";

from(Array(1000).keys()).pipe(
  mergeMap(n => n < 100 ? timer(10) : EMPTY, 10)
).subscribe({ complete: () => console.log('Complete') });

假设 Array(1000).keys() 是数组,n < 100 是条件,timer(10) 是异步函数。

我预期观察对象会完成,但它没有。

奇怪的是,如果未指定 concurrent 参数,或者异步函数在没有条件的情况下对所有元素进行调用,观察对象将会完成。

// 没有 concurrent 参数
mergeMap(n => n < 100 ? timer(10) : EMPTY)
// 或者没有条件
mergeMap(n => timer(10), 10)
// 或者
mergeMap(n => EMPTY, 10)

我还通过以下代码观察了 next 值。(请注意,我使用了 timer(10).pipe(map(() => n)) 来获取输入值)

import { from, timer, mergeMap, EMPTY, map } from "rxjs";

from(Array(1000).keys()).pipe(
  mergeMap(n => n < 100 ? timer(10).pipe(map(() => n)) : EMPTY, 10),
).subscribe({ next: console.log, complete: () => console.log('Complete') });

输出值在 90 停止。

...
86
87
88
89
90

我想知道为什么会出现这种行为。

英文:

I'm using RxJS to call an asynchronous function for each element of the array.
I want to call the asynchronous function only if the element satisfies certain conditions, and I also want to limit the number of concurrent executions.

So I used the mergeMap operator with the concurrent parameter, but I have experienced strange behavior.

The minimum reproducible example is:

import { from, timer, mergeMap, EMPTY } from &quot;rxjs&quot;;

from(Array(1000).keys()).pipe(
  mergeMap(n =&gt; n &lt; 100 ? timer(10) : EMPTY, 10)
).subscribe({ complete: () =&gt; console.log(&#39;Complete&#39;) });

Suppose that Array(1000).keys() is the array, n &lt; 100 is the condition, and timer(10) is the asynchronous function.

I expected the observable to complete, but it didn't.

Strangely, the observable completes if the concurrent parameter is not specified, or the asynchronous function is called for all of the elements without the conditions.

// without concurrent parameter
mergeMap(n =&gt; n &lt; 100 ? timer(10) : EMPTY)
// or without condition
mergeMap(n =&gt; timer(10), 10)
// or
mergeMap(n =&gt; EMPTY, 10)

I also observed the next values by the following code. (Note that I used timer(10).pipe(map(() =&gt; n)) to get the input values)

import { from, timer, mergeMap, EMPTY, map } from &quot;rxjs&quot;;

from(Array(1000).keys()).pipe(
  mergeMap(n =&gt; n &lt; 100 ? timer(10).pipe(map(() =&gt; n)) : EMPTY, 10),
).subscribe({ next: console.log, complete: () =&gt; console.log(&#39;Complete&#39;) });

The output values stopped at 90.

...
86
87
88
89
90

I want to know why this behavior occurs.

答案1

得分: 3

以下是您要翻译的内容:

当未指定concurrentmergeMap的工作原理

当未指定时,concurrent的值为Infinity

mergeMap接收到新值时,它将基于project函数(即提供给mergeMap的函数)创建一个新的Observable(也称为inner Observable)并订阅它。

由于concurrent选项的值为Infinity,可以有尽可能多的传入值,mergeMap将为每个值创建一个inner Observable,而不会出现任何问题。

当指定concurrentmergeMap的工作原理

当指定了这个选项时,mergeMap将使用一个buffer,它将保存超出的值:

const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));

在上述代码中中,我们可以看到当活动的内部Observables的数量超过了允许的并发数量时,缓冲区将被填充新值。当mergeMap接收到值时,例如来自Array(1000)的[0, 999],将调用outerNext函数。

为什么Observable不会完成

这段代码

import { from, timer, mergeMap, EMPTY } from "rxjs";

from(Array(1000).keys()).pipe(
  mergeMap(n => n < 100 ? timer(10) : EMPTY, 10)
).subscribe({ complete: () => console.log('Complete') });

由于在doInnerSub()函数中使用了实现细节,它不会完成(从outerNext()调用的函数)。

关键是它是一个递归函数。这是它所做的伪代码实现(你可以在这里找到真正的实现):

const doInnerSub = value => {
  // 增加活动连接的数量。
  active++;

  // 从提供给`mergeMap`的投影函数创建。
  createInnerObservable({
    // 在拆卸阶段(也在调用`finalize`时)。
    afterComplete: () => {
      if (hasInnerObservableCompleted) {
        // 为新连接腾出空间。
        active--;

        while (buffer.length && active < concurrent) {
          bufferedValue = buffer.shift()!;
          doInnerSub(bufferedValue);
        }
      }
    }
  });
}

在所有使用timer(10)创建的Observables完成之后,只会创建EMPTY Observables。这种Observables会立即完成,因为它们被同步订阅。这导致在第90个timer(10) Observable(90 = 100 - 10)之后,以下行:

while (buffer.length && active < concurrent) {
  bufferedValue = buffer.shift()!;
  doInnerSub(bufferedValue);
}

将引发此错误:

RxJS:使用mergeMap并发时,Observable永不完成。

这里还有一个截图,显示了调用栈,似乎证明了doInnerSub()被同步(和递归地)调用了太多次。

RxJS:使用mergeMap并发时,Observable永不完成。

为什么在省略concurrent时Observable会完成

答案是因为doInnerSub()不再需要调用自身(因为不再使用缓冲区)。基本上,将会创建1000个内部Observables。

为什么在不使用条件时Observable会完成

对于这种情况:

mergeMap(n => timer(10), 10)

调用栈不会继续增加,因为timer(10)涉及异步操作(即setInterval())。

对于这种情况:

mergeMap(n => EMPTY, 10)

由于EMPTY会在被订阅时立即完成,缓冲区根本不会被填充。因此,不会发生调用栈超限错误。

RxJS:使用mergeMap并发时,Observable永不完成。

在上面的图像中,那个断点根本不会被触发。

为什么值停在90

至于为什么它在90处停止,原因是concurrent = 10,所以在{100(来自mergeMap条件) - concurrentValue(10)}个timer(10) Observable之后,调用栈将继续增加,因为第一个EMPTY Observable从缓冲区中取出,然后从那里开始无限递归。

如果你使用了,例如,11,最后打印的值将是89:

mergeMap(n => (n < 100 ? timer(10) : EMPTY), 11),

RxJS:使用mergeMap并发时,Observable永不完成。

英文:

How mergeMap works when concurrent is not specified

When not specified, the concurrent's value is Infinity.

When a new value is received by mergeMap, it will create a new Observable(a.k.a inner Observable) based on the project function(i.e. the function provided to mergeMap) and it will subscribe to it.

Since the concurrent option has the value of Infinity, there can be as many incoming values as possible, mergeMap will create an inner Observable for each of them without any issues.

How mergeMap works when concurrent is specified

When this option is specified, mergeMap will employ a buffer which will hold the exceeding values:

const outerNext = (value: T) =&gt; (active &lt; concurrent ? doInnerSub(value) : buffer.push(value));

In the code above, we see that the buffer is populated with new values whenever the number of active inner Observables exceeds the number of concurrent allowed. The outerNext function is called when mergeMap receives values, such as [0, 999] from Array(1000).

Why the Observable does not complete

This code

import { from, timer, mergeMap, EMPTY } from &quot;rxjs&quot;;

from(Array(1000).keys()).pipe(
  mergeMap(n =&gt; n &lt; 100 ? timer(10) : EMPTY, 10)
).subscribe({ complete: () =&gt; console.log(&#39;Complete&#39;) });

will not complete due to an implementation detail that is used in the doInnerSub() function(the function called from outerNext()).

The gist is that it is a recursive function. This is a pseudo-code implementation of what it does(you can find the real implementation here):

const doInnerSub = value =&gt; {
  // Increase number of active connections.
  active++;

  // Creating from the projection function provided to `mergeMap`.
  createInnerObservable({
    // During the teardown phase(also when `finalize` is called).
    afterComplete: () =&gt; {
      if (hasInnerObservableCompleted) {
        // Making space for a new connection.
        active--;

        while (buffer.length &amp;&amp; active &lt; concurrent) {
          bufferedValue = buffer.shift()!;
          doInnerSub(bufferedValue);
        }
      }
    }
  });
}

After all the Observables created with timer(10) complete, only EMPTY Observables will be created. Such Observables completed immediately as they're subscribed. In other words, they complete synchronously. The result of this is that, after the 90th timer(10) Observable(90 = 100 - 10), these lines:

while (buffer.length &amp;&amp; active &lt; concurrent) {
  bufferedValue = buffer.shift()!;
  doInnerSub(bufferedValue);
}

will cause this error to be thrown:

RxJS:使用mergeMap并发时,Observable永不完成。

Here's also a screenshot with the callstack that seems to prove that doInnerSub() is being called synchronously(and recursively) too many times.

RxJS:使用mergeMap并发时,Observable永不完成。

Why the Observable completes when concurrent is omitted

The answer is because doInnerSub() does not have to call itself anymore(because the buffer is no longer used). Basically, there will be 1000 inner Observables created.

Why the Observable completes when no condition is used

For this case:

mergeMap(n =&gt; timer(10), 10)

the callstack will not keep on increasing because timer(10) involves an asynchronous action(i.e. setInterval()).

For this case:

mergeMap(n =&gt; EMPTY, 10)

Since EMPTY completes immediately as it is being subscribed, the buffer won't be populated at all. So, the callstack exceeded error can't occur.

RxJS:使用mergeMap并发时,Observable永不完成。

In the above image, that breakpoint won't be hit at all.

Why the values stop at 90

As to why it stops as 90, the reason is because concurrent = 10, so after the {100(from mergeMap condition) - concurrentValue(10)} timer(10) Observable, the callstack will keep on increasing because the first EMPTY Observable is taken from the buffer then from there the infinite recursion starts.

If you were to use, for instance, 11, the last value printed would be 89:

mergeMap((n) =&gt; (n &lt; 100 ? timer(10) : EMPTY), 11),

RxJS:使用mergeMap并发时,Observable永不完成。

huangapple
  • 本文由 发表于 2023年7月23日 23:34:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/76749060.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定