英文:
Combine multiple observables in to a single RxJS stream
问题
抱歉,由于您的要求,我会跳过代码部分,只回答关于RxJS操作符的问题。
您可以使用RxJS的forkJoin
操作符来同时执行多个Observable并等待它们全部完成。在您的情况下,您可以使用forkJoin
来同时查询用户和评论,然后在它们都完成后进行处理,就像这样:
import { forkJoin } from 'rxjs';
// 在查询 submission 后使用 forkJoin 同时查询用户和评论
this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.submission = submission;
const userQuery = this.auth.user$;
const commentsQuery = this.commentsGQL.watch({
submissionId: submission.id
}).valueChanges;
// 使用 forkJoin 同时执行 userQuery 和 commentsQuery
return forkJoin([userQuery, commentsQuery]);
})
).subscribe(([user, comments]) => {
// 在这里处理 user 和 comments
if (user.id === this.submission.user.id) {
// 用户是提交者的所有者
}
this.comments = comments;
});
使用forkJoin
操作符,您可以并行执行多个查询,然后在它们都完成后进行处理,这与您所描述的期望结果相匹配。
英文:
It appears I am lacking knowledge on which RxJS operator to resolve the following problem:
In my music application, I have a submission page (this is like a music album). To load the submission, I use the following query:
this.submissionId = parseInt(params['album']);
if (this.submissionId) {
this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.subscribe((submission) => {
//submission loaded here!
});
}
Easy enough! However, once I've loaded the submission
, I have to load some auxiliary information such as the current user
(to check if they are the artist of the submission) and comments
. In order to avoid nested subscriptions, I can modify the above query to use switchMap
to switch the query stream to user
and comments
observables once the submission
resolves:
// stream to query for the submission and then switch query to user
this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.submission = submission;
return this.auth.user$
})
).subscribe((user) => {
// needs value of submission here
if (user.id == this.submission.user.id) {
//user is owner of submission
}
})
// stream to query for the submission and then switch query to comments
this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.comments$ = this.commentsGQL.watch({
submissionId: submission.id //needs submission response here
})
return this.comments$.valueChanges
})
).subscribe((comments) => {
this.comments = comments;
})
Great! I've avoided the nested subscription issue BUT now...the first part of each submission
request is identical. Basically, once, the submission
is queried, i want to launch off two parallel queries:
- a query for the user
- a query for the comments
Which RxJS operator can perform such an operation? I suppose the subscribe at the end would emit an array response like:
.subscribe([user, comments] => {
// check if user == submission.user.id here
// also assign comments to component variable here
})
I believe mergeMap
is sort of what I need but I'm not sure how to implement that properly. Or is this a case where I should share()
the submission query and then build off my parallel queries separately? I'm very curious! Please let me know, thanks!
答案1
得分: 1
你可以在这种情况下使用RxJS的forkJoin操作符。如文档所述,
> 当所有 observables 完成时,发出每个 observables 最后发出的值。
const userQuery$ = this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.submission = submission;
return this.auth.user$
})
)
// 查询提交并切换到评论的流
const commentsQuery$ = this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.comments$ = this.commentsGQL.watch({
submissionId: submission.id // 需要 submission 的响应数据
})
return this.comments$.valueChanges
})
)
forkJoin(userQuery$, commentsQuery$).subscribe([user, comments] => {
// 在这里检查 user 是否等于 submission.user.id
// 同时在这里分配 comments 给组件变量
})
英文:
You can use the RxJS forkJoin operator for this scenario. As stated on the documentation,
> When all observables complete, emit the last emitted value from each.
const userQuery$ = this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.submission = submission;
return this.auth.user$
})
)
// stream to query for the submission and then switch query to comments
const commentsQuery$ = this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.comments$ = this.commentsGQL.watch({
submissionId: submission.id //needs submission response here
})
return this.comments$.valueChanges
})
)
forkJoin(userQuery$, commentsQuery$).subscribe([user, comments] => {
// check if user == submission.user.id here
// also assign comments to component variable here
})
答案2
得分: 0
尝试:
this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.submission = submission;
const user$ = this.auth.user$;
this.comments$ = this.commentsGQL.watch({
submissionId: submission.id
});
return combineLatest(user$, this.comments$);
}),
// 可能要添加 takeUntil 来取消订阅,以避免内存泄漏
).subscribe(([user, comments]) => {
// 在这里检查 user 是否等于 submission.user.id
// 并在这里将 comments 分配给组件变量
});
有一点值得考虑的是,你可以利用 Angular 提供的 async
管道来消除实例变量(https://malcoded.com/posts/angular-async-pipe/)。它会订阅可观察对象,将其呈现到视图中,并在视图销毁时自动取消订阅。
因此,使用它,我们可以通过以下方式摆脱 this.submissions = submission
:
submissions$: Observable<ISubmission>; // 假设有一个 ISubmission 接口,如果没有就不用放
// 然后当 this.submissionId 被定义时
this.submissions$ = this.submissionGQL.watch({
id: this.submissionId
}).valueChanges;
// 然后在视图中使用时,可以这样做:{{ this.submissions$ | async }}
对于 this.comments$
也可以采用相同的方法。这一切都是可选的,但我在使用 RxJS 时尽量减少实例变量,因为太多实例变量会导致混乱。
然后,你可以在 this.submissions$
可观察对象的基础上订阅其他主要流程。
this.submissions$.pipe(
switchMap(submission => ..... // 其余部分保持不变
)
我选择了 combineLatest
操作符,但你可以根据需要使用 zip
和 forkJoin
。它们之间都有微妙的区别(https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom)。
英文:
Try:
this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.submission = submission;
const user$ = this.auth.user$;
this.comments$ = this.commentsGQL.watch({
submissionId: submission.id
});
return combineLatest(user$, this.comments$);
}),
// maybe put a takeUntil to remove subscription and not cause memory leaks
).subscribe(([user, comments]) => {
// check if user == submission.user.id here
// also assign comments to component variable here
});
Something you should consider is eliminating instance variables with the help of the async
pipe given by Angular (https://malcoded.com/posts/angular-async-pipe/).
It will subscribe to the observable, present it into the view and automatically unsubscribe when the view is destroyed.
So, using that, we can get rid of this.submissions = submission
by putting:
submissions$: Observable<ISubmission>; // assuming there is an interface of ISubmission, if not put any
// then when this.submissionId is defined
this.submissions$ = this.submissionGQL.watch({
id: this.submissionId
}).valueChanges;
// then when using it in your view you can do {{ this.submissions$ | async }}
The same thing can go for this.comments$
. All of this is optional though. I try to minimize instance variables as much as possible when using RxJS because too many instance variables leads to confusion.
Then you can lead off of this.submissions$
observable and subscribe for the other main stream.
this.submission$.pipe(
switchMap(submission => ..... // everything else being the same
)
I chose the combineLatest
operator but you can use zip
and forkJoin
as you see fit. They all have subtle differences (https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论