将多个 observables 合并成一个单一的 RxJS 流。

huangapple go评论126阅读模式
英文:

Combine multiple observables in to a single RxJS stream

问题

抱歉,由于您的要求,我会跳过代码部分,只回答关于RxJS操作符的问题。

您可以使用RxJS的forkJoin操作符来同时执行多个Observable并等待它们全部完成。在您的情况下,您可以使用forkJoin来同时查询用户和评论,然后在它们都完成后进行处理,就像这样:

  1. import { forkJoin } from 'rxjs';
  2. // 在查询 submission 后使用 forkJoin 同时查询用户和评论
  3. this.submissionGQL.watch({
  4. id: this.submissionId
  5. }).valueChanges.pipe(
  6. switchMap(submission => {
  7. this.submission = submission;
  8. const userQuery = this.auth.user$;
  9. const commentsQuery = this.commentsGQL.watch({
  10. submissionId: submission.id
  11. }).valueChanges;
  12. // 使用 forkJoin 同时执行 userQuery 和 commentsQuery
  13. return forkJoin([userQuery, commentsQuery]);
  14. })
  15. ).subscribe(([user, comments]) => {
  16. // 在这里处理 user 和 comments
  17. if (user.id === this.submission.user.id) {
  18. // 用户是提交者的所有者
  19. }
  20. this.comments = comments;
  21. });

使用forkJoin操作符,您可以并行执行多个查询,然后在它们都完成后进行处理,这与您所描述的期望结果相匹配。

英文:

It appears I am lacking knowledge on which RxJS operator to resolve the following problem:

In my music application, I have a submission page (this is like a music album). To load the submission, I use the following query:

  1. this.submissionId = parseInt(params['album']);
  2. if (this.submissionId) {
  3. this.submissionGQL.watch({
  4. id: this.submissionId
  5. }).valueChanges.subscribe((submission) => {
  6. //submission loaded here!
  7. });
  8. }

Easy enough! However, once I've loaded the submission, I have to load some auxiliary information such as the current user (to check if they are the artist of the submission) and comments. In order to avoid nested subscriptions, I can modify the above query to use switchMap to switch the query stream to user and comments observables once the submission resolves:

  1. // stream to query for the submission and then switch query to user
  2. this.submissionGQL.watch({
  3. id: this.submissionId
  4. }).valueChanges.pipe(
  5. switchMap(submission => {
  6. this.submission = submission;
  7. return this.auth.user$
  8. })
  9. ).subscribe((user) => {
  10. // needs value of submission here
  11. if (user.id == this.submission.user.id) {
  12. //user is owner of submission
  13. }
  14. })
  15. // stream to query for the submission and then switch query to comments
  16. this.submissionGQL.watch({
  17. id: this.submissionId
  18. }).valueChanges.pipe(
  19. switchMap(submission => {
  20. this.comments$ = this.commentsGQL.watch({
  21. submissionId: submission.id //needs submission response here
  22. })
  23. return this.comments$.valueChanges
  24. })
  25. ).subscribe((comments) => {
  26. this.comments = comments;
  27. })

Great! I've avoided the nested subscription issue BUT now...the first part of each submission request is identical. Basically, once, the submission is queried, i want to launch off two parallel queries:

  • a query for the user
  • a query for the comments

Which RxJS operator can perform such an operation? I suppose the subscribe at the end would emit an array response like:

  1. .subscribe([user, comments] => {
  2. // check if user == submission.user.id here
  3. // also assign comments to component variable here
  4. })

I believe mergeMap is sort of what I need but I'm not sure how to implement that properly. Or is this a case where I should share() the submission query and then build off my parallel queries separately? I'm very curious! Please let me know, thanks!

答案1

得分: 1

你可以在这种情况下使用RxJS的forkJoin操作符。如文档所述,

> 当所有 observables 完成时,发出每个 observables 最后发出的值。

  1. const userQuery$ = this.submissionGQL.watch({
  2. id: this.submissionId
  3. }).valueChanges.pipe(
  4. switchMap(submission => {
  5. this.submission = submission;
  6. return this.auth.user$
  7. })
  8. )
  9. // 查询提交并切换到评论的流
  10. const commentsQuery$ = this.submissionGQL.watch({
  11. id: this.submissionId
  12. }).valueChanges.pipe(
  13. switchMap(submission => {
  14. this.comments$ = this.commentsGQL.watch({
  15. submissionId: submission.id // 需要 submission 的响应数据
  16. })
  17. return this.comments$.valueChanges
  18. })
  19. )
  20. forkJoin(userQuery$, commentsQuery$).subscribe([user, comments] => {
  21. // 在这里检查 user 是否等于 submission.user.id
  22. // 同时在这里分配 comments 给组件变量
  23. })
英文:

You can use the RxJS forkJoin operator for this scenario. As stated on the documentation,

> When all observables complete, emit the last emitted value from each.

  1. const userQuery$ = this.submissionGQL.watch({
  2. id: this.submissionId
  3. }).valueChanges.pipe(
  4. switchMap(submission => {
  5. this.submission = submission;
  6. return this.auth.user$
  7. })
  8. )
  9. // stream to query for the submission and then switch query to comments
  10. const commentsQuery$ = this.submissionGQL.watch({
  11. id: this.submissionId
  12. }).valueChanges.pipe(
  13. switchMap(submission => {
  14. this.comments$ = this.commentsGQL.watch({
  15. submissionId: submission.id //needs submission response here
  16. })
  17. return this.comments$.valueChanges
  18. })
  19. )
  20. forkJoin(userQuery$, commentsQuery$).subscribe([user, comments] => {
  21. // check if user == submission.user.id here
  22. // also assign comments to component variable here
  23. })

答案2

得分: 0

尝试:

  1. this.submissionGQL.watch({
  2. id: this.submissionId
  3. }).valueChanges.pipe(
  4. switchMap(submission => {
  5. this.submission = submission;
  6. const user$ = this.auth.user$;
  7. this.comments$ = this.commentsGQL.watch({
  8. submissionId: submission.id
  9. });
  10. return combineLatest(user$, this.comments$);
  11. }),
  12. // 可能要添加 takeUntil 来取消订阅,以避免内存泄漏
  13. ).subscribe(([user, comments]) => {
  14. // 在这里检查 user 是否等于 submission.user.id
  15. // 并在这里将 comments 分配给组件变量
  16. });

有一点值得考虑的是,你可以利用 Angular 提供的 async 管道来消除实例变量(https://malcoded.com/posts/angular-async-pipe/)。它会订阅可观察对象,将其呈现到视图中,并在视图销毁时自动取消订阅。

因此,使用它,我们可以通过以下方式摆脱 this.submissions = submission

  1. submissions$: Observable<ISubmission>; // 假设有一个 ISubmission 接口,如果没有就不用放
  2. // 然后当 this.submissionId 被定义时
  3. this.submissions$ = this.submissionGQL.watch({
  4. id: this.submissionId
  5. }).valueChanges;
  6. // 然后在视图中使用时,可以这样做:{{ this.submissions$ | async }}

对于 this.comments$ 也可以采用相同的方法。这一切都是可选的,但我在使用 RxJS 时尽量减少实例变量,因为太多实例变量会导致混乱。

然后,你可以在 this.submissions$ 可观察对象的基础上订阅其他主要流程。

  1. this.submissions$.pipe(
  2. switchMap(submission => ..... // 其余部分保持不变
  3. )

我选择了 combineLatest 操作符,但你可以根据需要使用 zipforkJoin。它们之间都有微妙的区别(https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom)。

英文:

Try:

  1. this.submissionGQL.watch({
  2. id: this.submissionId
  3. }).valueChanges.pipe(
  4. switchMap(submission =&gt; {
  5. this.submission = submission;
  6. const user$ = this.auth.user$;
  7. this.comments$ = this.commentsGQL.watch({
  8. submissionId: submission.id
  9. });
  10. return combineLatest(user$, this.comments$);
  11. }),
  12. // maybe put a takeUntil to remove subscription and not cause memory leaks
  13. ).subscribe(([user, comments]) =&gt; {
  14. // check if user == submission.user.id here
  15. // also assign comments to component variable here
  16. });

Something you should consider is eliminating instance variables with the help of the async pipe given by Angular (https://malcoded.com/posts/angular-async-pipe/).
It will subscribe to the observable, present it into the view and automatically unsubscribe when the view is destroyed.

So, using that, we can get rid of this.submissions = submission by putting:

  1. submissions$: Observable&lt;ISubmission&gt;; // assuming there is an interface of ISubmission, if not put any
  2. // then when this.submissionId is defined
  3. this.submissions$ = this.submissionGQL.watch({
  4. id: this.submissionId
  5. }).valueChanges;
  6. // then when using it in your view you can do {{ this.submissions$ | async }}

The same thing can go for this.comments$. All of this is optional though. I try to minimize instance variables as much as possible when using RxJS because too many instance variables leads to confusion.

Then you can lead off of this.submissions$ observable and subscribe for the other main stream.

  1. this.submission$.pipe(
  2. switchMap(submission =&gt; ..... // everything else being the same
  3. )

I chose the combineLatest operator but you can use zip and forkJoin as you see fit. They all have subtle differences (https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom).

huangapple
  • 本文由 发表于 2020年1月4日 12:13:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/59587812.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定