英文:
Is this a correct way of using forkJoin and mergeMap?
问题
I need to make all requests in parallel whenever it's possible. Here is the simplified version of my code:
const arr = [
'https://jsonplaceholder.typicode.com/users/1',
'https://jsonplaceholder.typicode.com/users/2',
'https://jsonplaceholder.typicode.com/users/3',
];
let observables = [];
for (let url of arr) {
observables.push(this.http.get(url));
}
const obs$ = this.http.get('http://localhost:3000/users').pipe(
map((data: any) => {
//an array of urls like https://jsonplaceholder.typicode.com/users/5
let urls = data.data;
let observables = [];
for (let url of urls) {
observables.push(this.http.get(url));
}
return observables;
}),
mergeMap((data) => forkJoin([...data]))
);
forkJoin([...observables, obs$]).subscribe({
next: (data) => console.log(data, 'from fork'),
error: (err) => console.log(err)
});
So, I have in this case:
1. An array of observables `observables`
2. An observable `obs$` that fetches some urls, and upon fetching the urls, I need to make requests to those urls in parallel as well.
I don't care about results, I only care if something errors out. These example code seems to be working properly, however, I've got a couple of questions since I'm trying to wrap my head around RxJS.
1. Is this a proper way of doing things in order to achieve goals I described above?
2. `mergeMap((data) => forkJoin([...data]))` -> mergeMap here basically only helps me getting an Observable out of the nested Observable, right? So I could technically use concatMap or switchMap with the same effect?
<details>
<summary>英文:</summary>
I need to make all requests in parallel whenever it's possible. Here is the simplified version of my code:
const arr = [
'https://jsonplaceholder.typicode.com/users/1',
'https://jsonplaceholder.typicode.com/users/2',
'https://jsonplaceholder.typicode.com/users/3',
];
let observables = [];
for (let url of arr) {
observables.push(this.http.get(url));
}
const obs$ = this.http.get('http://localhost:3000/users').pipe(
map((data: any) => {
//an array of urls like https://jsonplaceholder.typicode.com/users/5
let urls = data.data;
let observables = [];
for (let url of urls) {
observables.push(this.http.get(url));
}
return observables;
}),
mergeMap((data) => forkJoin([...data]))
);
forkJoin([...observables, obs$]).subscribe({
next: (data) => console.log(data, 'from fork'),
error:(err)=>console.log(err)
});
So, I have in this case:
1. An array of observables `observables`
2. An observable `obs$` that fetches some urls, and upon fetching the urls, I need to make requests to those urls in parallel as well.
I don't care about results, I only care if something errors out. These example code seems to be working properly, however, I've got a couple of questions since I'm trying to wrap my head around RxJS.
1. Is this a proper way of doing things in order to achieve goals I described above?
2. `mergeMap((data) => forkJoin([...data]))` -> mergeMap here basically only helps me getting an Observable out of the nested Observable, right? So I could technically use concatMap or switchMap with the same effect?
</details>
# 答案1
**得分**: 2
你实际上不需要使用 `forkJoin`,你可以简单地创建一个发出单个 URL 的 Observable,然后使用 `mergeMap` 来请求该 URL 并发出结果。
由于你有两个不同的 URL 源,我们可以分别定义它们,然后只需使用 `merge` 来从两个源创建一个单一的 Observable:
```typescript
const urls_1$: Observable<string> = from(arr);
const urls_2$: Observable<string> = this.http.get('http://localhost:3000/users').pipe(
mergeMap(response => response.data)
);
const response$ = merge(urls_1$, urls_2$).pipe(
mergeMap(url => this.http.get(url))
);
response$.subscribe({
next: data => console.log(data),
error: err => console.log(err)
});
-
urls_1$
: 我们使用from
来创建 Observable,以单独发出数组元素。 -
urls_2$
: 我们进行 HTTP 调用,然后使用mergeMap
来单独发出接收到的数组元素。这与上面的from
具有相同的效果(实际上,mergeMap
在内部使用了from
)。你是正确的,你可以在这里使用任何高阶操作符。 -
responses$
: 这里我们使用merge
来从我们的两个源创建一个单一的 Observable。这意味着每当url_1$
或url_2$
发出一个值时,“合并 Observable” 将发出它。然后,我们使用mergeMap
来进行 HTTP 调用并发出其结果。与前面的mergeMap
不同,这个无法被switchMap
或concatMap
替代,因为你希望请求并行进行。这里的结果是我们的response$
Observable 将发出每个请求的响应。- 如果你想限制并发请求的数量,你可以提供第二个参数
concurrency
给mergeMap
:
- 如果你想限制并发请求的数量,你可以提供第二个参数
mergeMap(url => this.http.get(url), 5) // <-- 限制为 5 个活跃请求
英文:
You actually don't need forkJoin
, you can simply create an observable that emits individual urls, then use mergeMap
to make the request for that url and emit the results.
Since you have two different url sources, we can define those separately, then just use merge
to create a single observable from both sources:
const urls_1$: Observable<string> = from(arr);
const urls_2$: Observable<string> = this.http.get('http://localhost:3000/users').pipe(
mergeMap(response => response.data)
);
const response$ = merge(urls_1$, urls_2$).pipe(
mergeMap(url => this.http.get(url))
).
response$.subscribe({
next: data => console.log(data),
error: err => console.log(err)
});
-
urls_1$
: we usefrom
to create observable that emits the array elements individually -
urls_2$
: we make http call then usemergeMap
to emit the received array elements individually. This has the same effect asfrom
above (in factmergeMap
usedfrom
internally). You are correct, that you could use any of the higher order operators here. -
responses$
: here we usemerge
to create a single observable from both of our sources. This means wheneverurl_1$
orurl_2$
emit a value, the "merge observable" will emit it. We then usemergeMap
to make the http call an emit its result. Unlike the previousmergeMap
, this one cannot be substituted byswitchMap
orconcatMap
because you want the requests to be made in parallel. The result here is that ourresponse$
observable will emit the response of each request.- if you wanted to limit the number of concurrent requests, you can provide the second
concurrency
parameter tomergeMap
:
- if you wanted to limit the number of concurrent requests, you can provide the second
mergeMap(url => this.http.get(url), 5) // <-- limit to 5 active requests
答案2
得分: 1
-
这样做是为了实现我上面描述的目标的正确方式吗?
是的。是的。
-
mergeMap((data) => forkJoin([...data])) -> 这里的 mergeMap 主要是帮助我从嵌套的 Observable 中获取一个 Observable,对吗?所以我理论上可以使用 concatMap 或 switchMap 达到相同的效果?
根据您尝试实现的目标来看,使用 mergeMap 是明智的选择。尽管 concatMap 和 switchMap 有类似的效果,但它们还会添加一些您不需要的逻辑。
-
concatMap:将每个源值映射为一个 Observable,这些 Observable 在输出 Observable 中按序合并,等待每个 Observable 完成后再合并下一个。
-
switchMap:将每个源值映射为一个 Observable,这些 Observable 在输出 Observable 中合并,仅从最近映射的 Observable 发出值。
-
英文:
1. Is this a proper way of doing things in order to achieve the goals I described above?
Yes. It is.
2. mergeMap((data) => forkJoin([...data])) -> mergeMap here basically only helps me getting an Observable out of the nested Observable, right? So I could technically use concatMap or switchMap with the same effect?
Based on what you are trying to achieve using mergeMap is the wisest. Even though concatMap and switchMap have similar effects, they also will add some logic you don't need.
concatMap: Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.
switchMap: Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable.
答案3
得分: 1
请注意,您可以将以下代码重写:
const arr = [
'https://jsonplaceholder.typicode.com/users/1',
'https://jsonplaceholder.typicode.com/users/2',
'https://jsonplaceholder.typicode.com/users/3',
];
let observables = [];
for (let url of arr) {
observables.push(this.http.get(url));
}
为:
const observables = [
'https://jsonplaceholder.typicode.com/users/1',
'https://jsonplaceholder.typicode.com/users/2',
'https://jsonplaceholder.typicode.com/users/3',
].map(url => this.http.get(url));
您还重复声明了observables
变量(一次在开头,一次在map
内部)。请使用不同的名称(observables1
和observables2
将起到作用)。
英文:
Off topic, but please note you could rewrite
const arr = [
'https://jsonplaceholder.typicode.com/users/1',
'https://jsonplaceholder.typicode.com/users/2',
'https://jsonplaceholder.typicode.com/users/3',
];
let observables = [];
for (let url of arr) {
observables.push(this.http.get(url));
}
to
const observables = [
'https://jsonplaceholder.typicode.com/users/1',
'https://jsonplaceholder.typicode.com/users/2',
'https://jsonplaceholder.typicode.com/users/3',
].map(url => this.http.get(url));
You're also shadowing observables
variable by declaring it twice (once at the beginning, and once inside your map). Please use different names (observables1
and observables2
would do the trick).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论