这是使用forkJoin和mergeMap的正确方式吗?

huangapple go评论67阅读模式
英文:

Is this a correct way of using forkJoin and mergeMap?

问题

I need to make all requests in parallel whenever it's possible. Here is the simplified version of my code:

const arr = [
  'https://jsonplaceholder.typicode.com/users/1',
  'https://jsonplaceholder.typicode.com/users/2',
  'https://jsonplaceholder.typicode.com/users/3',
];

let observables = [];
for (let url of arr) {
  observables.push(this.http.get(url));
}

const obs$ = this.http.get('http://localhost:3000/users').pipe(
  map((data: any) => {
    //an array of urls like https://jsonplaceholder.typicode.com/users/5
    let urls = data.data;
    let observables = [];
    for (let url of urls) {
      observables.push(this.http.get(url));
    }
    return observables;
  }),
  mergeMap((data) => forkJoin([...data]))
);

forkJoin([...observables, obs$]).subscribe({
  next: (data) => console.log(data, 'from fork'),
  error: (err) => console.log(err)
});

So, I have in this case:

1. An array of observables `observables`
2. An observable `obs$` that fetches some urls, and upon fetching the urls, I need to make requests to those urls in parallel as well.

I don't care about results, I only care if something errors out. These example code seems to be working properly, however, I've got a couple of questions since I'm trying to wrap my head around RxJS.

1. Is this a proper way of doing things in order to achieve goals I described above?
2. `mergeMap((data) => forkJoin([...data]))` -> mergeMap here basically only helps me getting an Observable out of the nested Observable, right? So I could technically use concatMap or switchMap with the same effect?

<details>
<summary>英文:</summary>

I need to make all requests in parallel whenever it&#39;s possible. Here is the simplified version of my code: 

 

    const arr = [
      &#39;https://jsonplaceholder.typicode.com/users/1&#39;,
      &#39;https://jsonplaceholder.typicode.com/users/2&#39;,
      &#39;https://jsonplaceholder.typicode.com/users/3&#39;,
    ];

    let observables = [];
    for (let url of arr) {
      observables.push(this.http.get(url));
    }

    const obs$ = this.http.get(&#39;http://localhost:3000/users&#39;).pipe(
      map((data: any) =&gt; {
        //an array of urls like https://jsonplaceholder.typicode.com/users/5
        let urls = data.data;
        let observables = [];
        for (let url of urls) {
          observables.push(this.http.get(url));
        }
        return observables;
      }),
      mergeMap((data) =&gt; forkJoin([...data]))
    );

    forkJoin([...observables, obs$]).subscribe({
      next: (data) =&gt; console.log(data, &#39;from fork&#39;),
      error:(err)=&gt;console.log(err)
    });


So, I have in this case: 

 1.  An array of observables `observables`
 2.  An observable `obs$` that fetches  some urls, and upon fetching the urls, I need to make requests to those urls in parallel as well.

I don&#39;t care about results, I only care if something errors out. These example code seems to be working properly, however, I&#39;ve got a couple of questions since I&#39;m trying to wrap my head around RxJS.

1. Is this a proper way of doing things in order to achieve goals I described above? 
2. `mergeMap((data) =&gt; forkJoin([...data]))` -&gt; mergeMap here basically only helps me getting an Observable out of the nested Observable, right? So I could technically use concatMap or switchMap with the same effect? 


</details>


# 答案1
**得分**: 2

你实际上不需要使用 `forkJoin`,你可以简单地创建一个发出单个 URL 的 Observable,然后使用 `mergeMap` 来请求该 URL 并发出结果。

由于你有两个不同的 URL 源,我们可以分别定义它们,然后只需使用 `merge` 来从两个源创建一个单一的 Observable:

```typescript
const urls_1$: Observable<string> = from(arr);

const urls_2$: Observable<string> = this.http.get('http://localhost:3000/users').pipe(
  mergeMap(response => response.data)
);

const response$ = merge(urls_1$, urls_2$).pipe(
  mergeMap(url => this.http.get(url))
);

response$.subscribe({
  next: data => console.log(data),
  error: err => console.log(err)
});
  • urls_1$: 我们使用 from 来创建 Observable,以单独发出数组元素。

  • urls_2$: 我们进行 HTTP 调用,然后使用 mergeMap 来单独发出接收到的数组元素。这与上面的 from 具有相同的效果(实际上,mergeMap 在内部使用了 from)。你是正确的,你可以在这里使用任何高阶操作符。

  • responses$: 这里我们使用 merge 来从我们的两个源创建一个单一的 Observable。这意味着每当 url_1$url_2$ 发出一个值时,“合并 Observable” 将发出它。然后,我们使用 mergeMap 来进行 HTTP 调用并发出其结果。与前面的 mergeMap 不同,这个无法被 switchMapconcatMap 替代,因为你希望请求并行进行。这里的结果是我们的 response$ Observable 将发出每个请求的响应。

    • 如果你想限制并发请求的数量,你可以提供第二个参数 concurrencymergeMap
mergeMap(url => this.http.get(url), 5) // <-- 限制为 5 个活跃请求
英文:

You actually don't need forkJoin, you can simply create an observable that emits individual urls, then use mergeMap to make the request for that url and emit the results.

Since you have two different url sources, we can define those separately, then just use merge to create a single observable from both sources:

const urls_1$: Observable&lt;string&gt; = from(arr);

const urls_2$: Observable&lt;string&gt; = this.http.get(&#39;http://localhost:3000/users&#39;).pipe(
  mergeMap(response =&gt; response.data)
);

const response$ = merge(urls_1$, urls_2$).pipe(
  mergeMap(url =&gt; this.http.get(url))
).

response$.subscribe({
  next: data =&gt; console.log(data),
  error: err =&gt; console.log(err)
});
  • urls_1$: we use from to create observable that emits the array elements individually

  • urls_2$: we make http call then use mergeMap to emit the received array elements individually. This has the same effect as from above (in fact mergeMap used from internally). You are correct, that you could use any of the higher order operators here.

  • responses$: here we use merge to create a single observable from both of our sources. This means whenever url_1$ or url_2$ emit a value, the "merge observable" will emit it. We then use mergeMap to make the http call an emit its result. Unlike the previous mergeMap, this one cannot be substituted by switchMap or concatMap because you want the requests to be made in parallel. The result here is that our response$ observable will emit the response of each request.

    • if you wanted to limit the number of concurrent requests, you can provide the second concurrency parameter to mergeMap:
mergeMap(url =&gt; this.http.get(url), 5) // &lt;-- limit to 5 active requests

答案2

得分: 1

  1. 这样做是为了实现我上面描述的目标的正确方式吗?

    是的。是的。

  2. mergeMap((data) => forkJoin([...data])) -> 这里的 mergeMap 主要是帮助我从嵌套的 Observable 中获取一个 Observable,对吗?所以我理论上可以使用 concatMap 或 switchMap 达到相同的效果?

    根据您尝试实现的目标来看,使用 mergeMap 是明智的选择。尽管 concatMap 和 switchMap 有类似的效果,但它们还会添加一些您不需要的逻辑。

    • concatMap:将每个源值映射为一个 Observable,这些 Observable 在输出 Observable 中按序合并,等待每个 Observable 完成后再合并下一个。

    • switchMap:将每个源值映射为一个 Observable,这些 Observable 在输出 Observable 中合并,仅从最近映射的 Observable 发出值。

英文:

1. Is this a proper way of doing things in order to achieve the goals I described above?

Yes. It is.

2. mergeMap((data) => forkJoin([...data])) -> mergeMap here basically only helps me getting an Observable out of the nested Observable, right? So I could technically use concatMap or switchMap with the same effect?

Based on what you are trying to achieve using mergeMap is the wisest. Even though concatMap and switchMap have similar effects, they also will add some logic you don't need.

concatMap: Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

switchMap: Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.

答案3

得分: 1

请注意,您可以将以下代码重写:

const arr = [
  'https://jsonplaceholder.typicode.com/users/1',
  'https://jsonplaceholder.typicode.com/users/2',
  'https://jsonplaceholder.typicode.com/users/3',
];

let observables = [];
for (let url of arr) {
  observables.push(this.http.get(url));
}

为:

const observables = [
  'https://jsonplaceholder.typicode.com/users/1',
  'https://jsonplaceholder.typicode.com/users/2',
  'https://jsonplaceholder.typicode.com/users/3',
].map(url => this.http.get(url));

您还重复声明了observables变量(一次在开头,一次在map内部)。请使用不同的名称(observables1observables2将起到作用)。

英文:

Off topic, but please note you could rewrite

const arr = [
&#39;https://jsonplaceholder.typicode.com/users/1&#39;,
&#39;https://jsonplaceholder.typicode.com/users/2&#39;,
&#39;https://jsonplaceholder.typicode.com/users/3&#39;,
];
let observables = [];
for (let url of arr) {
observables.push(this.http.get(url));
}

to

const observables = [
&#39;https://jsonplaceholder.typicode.com/users/1&#39;,
&#39;https://jsonplaceholder.typicode.com/users/2&#39;,
&#39;https://jsonplaceholder.typicode.com/users/3&#39;,
].map(url =&gt; this.http.get(url));

You're also shadowing observables variable by declaring it twice (once at the beginning, and once inside your map). Please use different names (observables1 and observables2 would do the trick).

huangapple
  • 本文由 发表于 2023年6月29日 01:16:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76575400.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定