Queue up same observables to go in order in Angular 14 with rxjs

huangapple go评论71阅读模式
英文:

Queue up same observables to go in order in Angular 14 with rxjs

问题

目前我有这样的代码:

testReports() {
  this.dataSeries.forEach((x, index) => {
    setTimeout(() => {
      x.Status = FileStatus.PENDING;

      this._service.validateReport(x.Location).subscribe({
        next: y => this.convertResponseToGridView(y, x),
        error: () => console.error('Issues in validation')
      });
    }, index * 1500)
  });
}

我想知道是否有更好的方法来使用rxjs来完成这个操作。我不知道从'this.dataSeries'获取到的observable的数量,因为它们实际上是来自最终用户输入的位置字符串,可能会更改。它们被获取后,'this._service.validateReport(x.Location).subscribe...'被触发以获取数据。虽然它都能正常工作,但1500毫秒的数量更像是一个猜测,有时线程会更进一步,有时不会。如果你不知道observables的数量,是否有更好的方法来执行某些操作?

我查找了这个答案:https://stackoverflow.com/questions/30519645/how-to-make-one-observable-sequence-wait-for-another-to-complete-before-emitting,其中concat似乎是一个有希望的方法,但我不确定是否可以在执行之前使用某个预定方法来创建observable数组。非常感谢任何帮助。

英文:

Currently I have code like this:

testReports() {
  this.dataSeries.forEach((x, index) => {
    setTimeout(() => {
      x.Status = FileStatus.PENDING;

      this._service.validateReport(x.Location).subscribe({
        next: y => this.convertResponseToGridView(y, x),
        error: () => console.error('Issues in validation')
      });
    }, index * 1500)
  });
}

I was curious if there was a better way to do this with rxjs. I do not know know the number of observables obtained from 'this.dataSeries' as they essentially strings of locations that may change from an end user input. They are obtained and then the 'this._service.validateReport(x.Location).subscribe...' kicks off to get data. It all works but number of 1500 milliseconds is more of a guess and sometimes the thread is further along or not. Is there a better way to do some operation if you do NOT know the observables statically?

I looked up this answer: https://stackoverflow.com/questions/30519645/how-to-make-one-observable-sequence-wait-for-another-to-complete-before-emitting and concat seemed promising but I was not sure if you could use that with some method predetermined to make an array of observables before executing it. Any help is much appreciated.

答案1

得分: 1

以下是翻译好的部分:

你可以将数据系列的数组映射到相应请求的数组,然后使用 concat 依次执行它们。

testReports() {
  // 将数组的每个元素映射到请求 observable
  const requests = this.dataSeries.map((x) =>
    this._service.validateReport(x.Location).pipe(
      tap({
        subscribe: () => x.Status = FileStatus.PENDING,
        next: y => this.convertResponseToGridView(y, x),
        error: () => console.error('Issues in validation')
      })
    )
  );

  // 依次执行它们
  concat(...requests).subscribe();
}
英文:

You can map your array of dataSeries to an array of the corresponding requests and then use concat to execute them all sequentialy.

testReports() {
  //map each element of the array to the request observable
  const requests = this.dataSeries.map((x) => 
    this._service.validateReport(x.Location).pipe(
      tap({
        subscribe: () => x.Status = FileStatus.PENDING,
        next: y => this.convertResponseToGridView(y, x),
        error: () => console.error('Issues in validation')
      })
    )
  );
  
  //execute them all sequentially
  concat(...requests).subscribe();
}

答案2

得分: 1

尽管一开始可能会感到“可怕”,但拥有高阶可观察对象(用来表示你有一个可观察的可观察对象)实际上通常是处理这些情况的最佳方法。

你的代码中另一个问题是你混合了命令式和响应式编程。在方法中使用 subscribe 总是一个代码异味。如果该函数被调用多次,你可能会遇到竞态条件。

所以,首先要做的是将你的方法转换成一个主题(Subject)。这样,你可以从该可观察对象中发出一个事件,告诉它,请测试报告。

public testReports$$ = new Subject<void>();

然后从视图中可以像这样使用:

<button (click)="testReports$$.next()">Test report</button>

现在,我们有了一种在发生事件时进行警告的方法,接下来我们需要对此进行反应。为此,我们将在构造函数中订阅我们的主题。

constructor() {
  this.testReports$$.subscribe()
}

在这里首先要考虑的是,对于任何订阅,我们必须确保不要忘记取消订阅以避免内存泄漏。为了以响应式的方式使其工作,你可以利用最近的 inject 函数和 DestroyRef 令牌来创建一个可重复使用的函数,如下所示:

export function untilDestroyed() {
  const subject = new Subject();

  inject(DestroyRef).onDestroy(() => {
    subject.next(true);
    subject.complete();
  });

  return <T>(obs: Observable<T>) =>
    obs.pipe(takeUntil<T>(subject.asObservable()));
}

将其放在一个共享位置,因为你将能够重复使用它。

然后,我们可以更新我们刚刚订阅的流:

constructor() {
  this.testReports$$.pipe(untilDestroyed()).subscribe()
}

到目前为止,我们知道我们会保持事件流处于打开状态,直到组件被销毁,然后关闭它。

接下来,我们知道我们将要处理一个可观察对象,所以我们可以在 switchMapmergeMapconcatMapexhaustMap 中选择。我猜测,如果用户在检查仍在运行时两次单击按钮,我们应该重新执行所有操作。这就是 switchMap 的作用:

constructor() {
  this.testReports$$.pipe(
    switchMap(() => TODO),
    untilDestroyed()
  ).subscribe()
}

然后,让我们关注主要的逻辑部分。我将分享代码并解释它,跟着我走:

from(
  this.dataSeries.map((dataSerie) =>
    validateReport(dataSerie.Location).pipe(
      tap((res) => convertResponseToGridView(res, dataSerie)),
      catchError((e) => {
        console.error(`Issues in validation`, e);
        return EMPTY;
      })
    )
  )
).pipe(concatAll())

这里的主要思想是循环遍历数据系列,并为每个条目创建一个冷可观察对象,一旦订阅就准备好验证报告。

然后,我们将所有这些包装在 from 中,它将获取我们刚刚构建的可观察对象数组,并将其转换为可观察的可观察对象,从 Observable<Array<DataSerie>>Observable<Observable<DataSerie>>

最后,我们使用高阶操作符 concatAll 来获取我们的内部 Observable<DataSerie>,并依次订阅它们(在订阅下一个之前等待每个完成)。

这是我们组件的最终代码:

@Component({
  selector: 'app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css'],
  standalone: true,
})
export class AppComponent {
  public testReports$$ = new Subject<void>();

  private dataSeries: DataSerie[] = [
    { Location: { x: 0, y: 0 } },
    { Location: { x: 1, y: 1 } },
    { Location: { x: 2, y: 2 } },
  ];

  constructor() {
    this.testReports$$
      .pipe(
        switchMap(() =>
          from(
            this.dataSeries.map((dataSerie) =>
              validateReport(dataSerie.Location).pipe(
                tap((res) => convertResponseToGridView(res, dataSerie)),
                catchError((e) => {
                  console.error(`Issues in validation`, e);
                  return EMPTY;
                })
              )
            )
          ).pipe(concatAll())
        ),
        untilDestroyed()
      )
      .subscribe();
  }
}

请查看这个在线演示,我已经模拟了数据。打开控制台并单击按钮“Test report”。看到所有的请求都是一个接一个地执行的。

英文:

Despite it being "scarry" at first, having higher order observable (big word to say that you have an observable of observable) is actually often the best approach for those situations.

Another issue with your code is that you're mixing up imperative and reactive programming. Having a subscribe in a method is always a code smell. If that function is called more than once, you could end up with race conditions.

So the first thing here is to actually transform your method, into a subject. This way, you can emit an event from that observable saying hey, please test the reports.

public testReports$$ = new Subject&lt;void&gt;();

And from the view we can have something like this:

&lt;button (click)=&quot;testReports$$.next()&quot;&gt;Test report&lt;/button&gt;

Now that we've got a way to be warn when there's an event, we need to react to this. For that, we will subscribe to our subject from the constructor.

constructor() {
  this.testReports$$.subscribe()
}

The first thing to think of here, is that for any subscribe, we must make sure we don't forget to unsubscribe to avoid memory leaks. To have that working in a reactive way, you can take advantage of the recent inject function and DestroyRef token to create a reusable function that looks like this:

export function untilDestroyed() {
  const subject = new Subject();

  inject(DestroyRef).onDestroy(() =&gt; {
    subject.next(true);
    subject.complete();
  });

  return &lt;T&gt;(obs: Observable&lt;T&gt;) =&gt;
    obs.pipe(takeUntil&lt;T&gt;(subject.asObservable()));
}

Put it somewhere shared as you'll be able to reuse it.

Then we can update our stream on which we just subscribed:

constructor() {
  this.testReports$$.pipe(untilDestroyed()).subscribe()
}

Excellent, so far we know we'll keep our event stream open until the component is destroyed and then shut it down.

Next, we know we'll have to deal with an observable, so we have the choice between switchMap, mergeMap, concatMap, exhaustMap pretty much. I'm guessing here that if a user clicks twice on the button while the checks are still running, we should just redo everything all over again. That's what switchMap is for:

constructor() {
  this.testReports$$.pipe(
    switchMap(() =&gt; TODO),
    untilDestroyed()
  ).subscribe()
}

Then, let's focus on the bulk of the logic. I'll share the code and go over it, bear with me:

from(
  this.dataSeries.map((dataSerie) =&gt;
    validateReport(dataSerie.Location).pipe(
      tap((res) =&gt; convertResponseToGridView(res, dataSerie)),
      catchError((e) =&gt; {
        console.error(`Issues in validation`, e);
        return EMPTY;
      })
    )
  )
).pipe(concatAll())

The main idea here is to loop on the data series and create for each entry a cold observable that will be ready to validate the report once subscribed to.

We then wrap all that up in a from, which will take the array of observables we've just built, and turn it into an observable, of observables. Effectively going from Observable&lt;Array&lt;DataSerie&gt;&gt; to Observable&lt;Observable&lt;DataSerie&gt;&gt;.

Finally, we use the higher order operator concatAll to take our inner Observable&lt;DataSerie&gt; and subscribe to them, one after the other (waiting for each to finish before subscribing to next!).

Here's the final code for our component:

@Component({
  selector: &#39;app&#39;,
  templateUrl: &#39;./app.component.html&#39;,
  styleUrls: [&#39;./app.component.css&#39;],
  standalone: true,
})
export class AppComponent {
  public testReports$$ = new Subject&lt;void&gt;();

  private dataSeries: DataSerie[] = [
    { Location: { x: 0, y: 0 } },
    { Location: { x: 1, y: 1 } },
    { Location: { x: 2, y: 2 } },
  ];

  constructor() {
    this.testReports$$
      .pipe(
        switchMap(() =&gt;
          from(
            this.dataSeries.map((dataSerie) =&gt;
              validateReport(dataSerie.Location).pipe(
                tap((res) =&gt; convertResponseToGridView(res, dataSerie)),
                catchError((e) =&gt; {
                  console.error(`Issues in validation`, e);
                  return EMPTY;
                })
              )
            )
          ).pipe(concatAll())
        ),
        untilDestroyed()
      )
      .subscribe();
  }
}

Have a look to this live demo where I've mocked the data. Open up the console and click on the button "Test report". See that all the requests are done one after another.

huangapple
  • 本文由 发表于 2023年6月6日 01:58:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/76408907.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定