将多个 observables 合并成一个单一的 RxJS 流。

huangapple go评论98阅读模式
英文:

Combine multiple observables in to a single RxJS stream

问题

抱歉,由于您的要求,我会跳过代码部分,只回答关于RxJS操作符的问题。

您可以使用RxJS的forkJoin操作符来同时执行多个Observable并等待它们全部完成。在您的情况下,您可以使用forkJoin来同时查询用户和评论,然后在它们都完成后进行处理,就像这样:

import { forkJoin } from 'rxjs';

// 在查询 submission 后使用 forkJoin 同时查询用户和评论
this.submissionGQL.watch({
  id: this.submissionId
}).valueChanges.pipe(
  switchMap(submission => {
    this.submission = submission;
    const userQuery = this.auth.user$;
    const commentsQuery = this.commentsGQL.watch({
      submissionId: submission.id
    }).valueChanges;

    // 使用 forkJoin 同时执行 userQuery 和 commentsQuery
    return forkJoin([userQuery, commentsQuery]);
  })
).subscribe(([user, comments]) => {
  // 在这里处理 user 和 comments
  if (user.id === this.submission.user.id) {
    // 用户是提交者的所有者
  }
  this.comments = comments;
});

使用forkJoin操作符,您可以并行执行多个查询,然后在它们都完成后进行处理,这与您所描述的期望结果相匹配。

英文:

It appears I am lacking knowledge on which RxJS operator to resolve the following problem:

In my music application, I have a submission page (this is like a music album). To load the submission, I use the following query:

this.submissionId = parseInt(params['album']);

if (this.submissionId) {
  this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.subscribe((submission) => {
      //submission loaded here!
  });
}

Easy enough! However, once I've loaded the submission, I have to load some auxiliary information such as the current user (to check if they are the artist of the submission) and comments. In order to avoid nested subscriptions, I can modify the above query to use switchMap to switch the query stream to user and comments observables once the submission resolves:

// stream to query for the submission and then switch query to user
this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.submission = submission;
    return this.auth.user$
  })
).subscribe((user) => {
  // needs value of submission here
  if (user.id == this.submission.user.id) {
    //user is owner of submission
  }
})

// stream to query for the submission and then switch query to comments
this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.comments$ = this.commentsGQL.watch({
      submissionId: submission.id //needs submission response here
    })
    return this.comments$.valueChanges
  })
).subscribe((comments) => {
  this.comments = comments;
})

Great! I've avoided the nested subscription issue BUT now...the first part of each submission request is identical. Basically, once, the submission is queried, i want to launch off two parallel queries:

  • a query for the user
  • a query for the comments

Which RxJS operator can perform such an operation? I suppose the subscribe at the end would emit an array response like:

.subscribe([user, comments] => {
    // check if user == submission.user.id here
    // also assign comments to component variable here
})

I believe mergeMap is sort of what I need but I'm not sure how to implement that properly. Or is this a case where I should share() the submission query and then build off my parallel queries separately? I'm very curious! Please let me know, thanks!

答案1

得分: 1

你可以在这种情况下使用RxJS的forkJoin操作符。如文档所述,

> 当所有 observables 完成时,发出每个 observables 最后发出的值。

const userQuery$ = this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.submission = submission;
    return this.auth.user$
  })
)

// 查询提交并切换到评论的流
const commentsQuery$ = this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.comments$ = this.commentsGQL.watch({
      submissionId: submission.id // 需要 submission 的响应数据
    })
    return this.comments$.valueChanges
  })
)

forkJoin(userQuery$, commentsQuery$).subscribe([user, comments] => {
  // 在这里检查 user 是否等于 submission.user.id
  // 同时在这里分配 comments 给组件变量
})
英文:

You can use the RxJS forkJoin operator for this scenario. As stated on the documentation,

> When all observables complete, emit the last emitted value from each.

const userQuery$ = this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.submission = submission;
    return this.auth.user$
  })
)

// stream to query for the submission and then switch query to comments
const commentsQuery$ = this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.comments$ = this.commentsGQL.watch({
      submissionId: submission.id //needs submission response here
    })
    return this.comments$.valueChanges
  })
)

forkJoin(userQuery$, commentsQuery$).subscribe([user, comments] => {
  // check if user == submission.user.id here
  // also assign comments to component variable here
})

答案2

得分: 0

尝试:

  this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
   switchMap(submission => {
     this.submission = submission;
     const user$ = this.auth.user$;
     this.comments$ = this.commentsGQL.watch({
       submissionId: submission.id 
     });
     return combineLatest(user$, this.comments$);
   }),
   // 可能要添加 takeUntil 来取消订阅,以避免内存泄漏
  ).subscribe(([user, comments]) => {
    // 在这里检查 user 是否等于 submission.user.id
    // 并在这里将 comments 分配给组件变量
 });

有一点值得考虑的是,你可以利用 Angular 提供的 async 管道来消除实例变量(https://malcoded.com/posts/angular-async-pipe/)。它会订阅可观察对象,将其呈现到视图中,并在视图销毁时自动取消订阅。

因此,使用它,我们可以通过以下方式摆脱 this.submissions = submission

submissions$: Observable<ISubmission>; // 假设有一个 ISubmission 接口,如果没有就不用放

// 然后当 this.submissionId 被定义时
this.submissions$ = this.submissionGQL.watch({
  id: this.submissionId
}).valueChanges;

// 然后在视图中使用时,可以这样做:{{ this.submissions$ | async }}

对于 this.comments$ 也可以采用相同的方法。这一切都是可选的,但我在使用 RxJS 时尽量减少实例变量,因为太多实例变量会导致混乱。

然后,你可以在 this.submissions$ 可观察对象的基础上订阅其他主要流程。

 this.submissions$.pipe(
  switchMap(submission => ..... // 其余部分保持不变
 )

我选择了 combineLatest 操作符,但你可以根据需要使用 zipforkJoin。它们之间都有微妙的区别(https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom)。

英文:

Try:

  this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
   switchMap(submission =&gt; {
     this.submission = submission;
     const user$ = this.auth.user$;
     this.comments$ = this.commentsGQL.watch({
       submissionId: submission.id 
     });
     return combineLatest(user$, this.comments$);
   }),
   // maybe put a takeUntil to remove subscription and not cause memory leaks
  ).subscribe(([user, comments]) =&gt; {
    // check if user == submission.user.id here
    // also assign comments to component variable here
 });

Something you should consider is eliminating instance variables with the help of the async pipe given by Angular (https://malcoded.com/posts/angular-async-pipe/).
It will subscribe to the observable, present it into the view and automatically unsubscribe when the view is destroyed.

So, using that, we can get rid of this.submissions = submission by putting:

submissions$: Observable&lt;ISubmission&gt;; // assuming there is an interface of ISubmission, if not put any

// then when this.submissionId is defined
this.submissions$ = this.submissionGQL.watch({
  id: this.submissionId
}).valueChanges;

// then when using it in your view you can do {{ this.submissions$ | async }}

The same thing can go for this.comments$. All of this is optional though. I try to minimize instance variables as much as possible when using RxJS because too many instance variables leads to confusion.

Then you can lead off of this.submissions$ observable and subscribe for the other main stream.

 this.submission$.pipe(
  switchMap(submission =&gt; ..... // everything else being the same
 )

I chose the combineLatest operator but you can use zip and forkJoin as you see fit. They all have subtle differences (https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom).

huangapple
  • 本文由 发表于 2020年1月4日 12:13:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/59587812.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定