使用RXJS依次链接依赖的Observables,但获取每个发射以用于进度条。

huangapple go评论89阅读模式
英文:

RXJS Chain dependent observables sequentially, but getting each emission for a progress bar

问题

I'm facing a problem, and I've been trying to find a solution using RxJs, but can't seem to find one that fits it...

  • I have 3 different REST requests, that will be called sequentially, and each of them needs the response of the previous one as an argument
  • I want to implement a progress bar, which increments as the requests succeed

Here is what I thought :

  • I am going to use pipes and concatMap() to avoid nested subscriptions and subscribe to each request when the previous one is done.

Consider this very simplified version. Assume that each of represents a whole REST successful request (will handle errors later), and that I will do unshown work with the n parameter...

const request1 = of('success 1').pipe(
  delay(500),
  tap(n => console.log('received ' + n)),
);

const request2 = (n) => of('success 2').pipe(
  delay(1000),
  tap(n => console.log('received ' + n))
);

const request3 = (n) => of('success 3').pipe(
  delay(400),
  tap(n => console.log('received ' + n))
);

request1.pipe(
  concatMap(n => request2(n).pipe(
    concatMap(n => request3(n))
  ))
)

However, when I subscribe to the last piece of code, I will only get the response of the last request, which is expected as the pipe resolves to that.

So with concatMap(), I can chain my dependent REST calls correctly, but can't follow the progress.

Though I could follow the progress quite easily with nested subscriptions, but I am trying hard to avoid this and use the best practice way.

How can I chain my dependent REST calls, but still be able to do stuff each time a call succeeds ?

英文:

I'm facing a problem, and I've been trying to find a solution using RxJs, but can't seem to find one that fits it...

  • I have 3 different REST requests, that will be called sequentially, and each of them needs the response of the previous one as an argument
  • I want to implement a progress bar, which increments as the requests succeed

Here is what I thought :

  • I am going to use pipes and concatMap() to avoid nested subscriptions and subscribe to each request when the previous one is done.

Consider this very simplified version. Assume that each of represents a whole REST successful request (will handle errors later), and that I will do unshown work with the n parameter...

const request1 = of('success 1').pipe(
  delay(500),
  tap(n => console.log('received ' + n)),
);

const request2 = (n) => of('success 2').pipe(
  delay(1000),
  tap(n => console.log('received ' + n))
);

const request3 = (n) => of('success 3').pipe(
  delay(400),
  tap(n => console.log('received ' + n))
);

request1.pipe(
  concatMap(n => request2(n).pipe(
    concatMap(n => request3(n))
  ))
)

However, when I subscribe to the last piece of code, I will only get the response of the last request, which is expected as the pipe resolves to that.

So with concatMap(), I can chain my dependent REST calls correctly, but can't follow the progress.

Though I could follow the progress quite easily with nested subscriptions, but I am trying hard to avoid this and use the best practice way.

How can I chain my dependent REST calls, but still be able to do stuff each time a call succeeds ?

答案1

得分: 4

This is a generalized solution, though not as simple. But it does make progress observable while still avoiding the share operator, which can introduce unexpected statefulness if used incorrectly.

这是一个通用的解决方案,虽然不太简单,但它确实使进展可观察,同时避免了share运算符,如果使用不正确可能会引入意外的状态。

const chainRequests = (firstRequestFn, ...otherRequestFns) => (
  initialParams
) => {
  return otherRequestFns.reduce(
    (chain, nextRequestFn) =>
      chain.pipe(op.concatMap((response) => nextRequestFn(response))),
    firstRequestFn(initialParams)
  );
};

chainRequests 接受可变数量的函数,并返回一个接受初始参数并返回一个 observable 的函数,该 observable 将这些函数使用 concatMap 连接在一起,就像在问题中手动显示的那样。它通过将每个函数减少为累积值来实现,该累积值恰好是一个 observable。

Remember, RxJS leads us out of callback hell if we know the path.

记住,如果我们知道路径,RxJS可以让我们走出回调地狱。

const chainRequestsWithProgress = (...requestFns) => (initialParams) =>  {
  const progress$ = new Rx.BehaviorSubject(0);
  const wrappedFns = requestFns.map((fn, i) => (...args) =>
    fn(...args).pipe(op.tap(() => progress$.next((i + 1) / requestFns.length)))
  );
  const chain$ = Rx.defer(() => {
    progress$.next(0);
    return chainRequests(...wrappedFns)(initialParams);
  });
  return [chain$, progress$];
};

chainRequestsWithProgress 返回两个 observables - 最终发出最后响应的 observables,以及在订阅第一个 observables 时发出进度值的 observables。我们通过创建一个 BehaviorSubject 来作为我们的进度值流,并将每个请求函数包装起来,以返回与它们通常返回的 observable 相同的 observable,但我们还将其传递给 tap,以便它可以将新的进度值推送到 BehaviorSubject

每次订阅第一个 observable 时,进度都会归零。

如果你想返回一个单一的 observable,它既产生进度状态又产生最终的结果值,你可以让 chainRequestsWithProgress 返回:

chain$.pipe(
  op.startWith(null),
  op.combineLatest(progress$, (result, progress) => ({ result, progress }))
)

然后你将获得一个 observable,它发出一个表示朝着最终结果的进度,然后是结果本身的对象。值得思考的是 - progress$ 是否只能发出数字?

注意事项

这假定请求 observables 仅发出一个值。

英文:

This is a generalized solution, though not as simple. But it does make progress observable while still avoiding the share operator, which can introduce unexpected statefulness if used incorrectly.

const chainRequests = (firstRequestFn, ...otherRequestFns) => (
  initialParams
) => {
  return otherRequestFns.reduce(
    (chain, nextRequestFn) =>
      chain.pipe(op.concatMap((response) => nextRequestFn(response))),
    firstRequestFn(initialParams)
  );
};

chainRequests takes a variable number of functions and returns a function that accepts initial parameters and returns an observable that concatMaps the functions together as shown manually in the question. It does this by reducing each function into an accumulation value that happens to be an observable.

Remember, RxJS leads us out of callback hell if we know the path.

const chainRequestsWithProgress = (...requestFns) => (initialParams) =>  {
  const progress$ = new Rx.BehaviorSubject(0);
  const wrappedFns = requestFns.map((fn, i) => (...args) =>
    fn(...args).pipe(op.tap(() => progress$.next((i + 1) / requestFns.length)))
  );
  const chain$ = Rx.defer(() => {
    progress$.next(0);
    return chainRequests(...wrappedFns)(initialParams);
  });
  return [chain$, progress$];
};

chainRequestsWithProgress returns two observables - the one that eventually emits the last response, and one that emits progress values when the first observable is subscribed to. We do this by creating a BehaviorSubject to serve as our stream of progress values, and wrapping each of our request functions to return the same observable they normally would, but we also pipe it to tap so it can push a new progress value to the BehaviorSubject.

The progress is zeroed out upon each subscription to the first observable.

If you wanted to return a single observable that produced the progress state as well as the eventual result value, you could have chainRequestsWithProgress instead return:

chain$.pipe(
  op.startWith(null),
  op.combineLatest(progress$, (result, progress) => ({ result, progress }))
)

and you'll have an observable that emits an object representing the progress toward the eventual result, then that result itself. Food for thought - does progress$ have to emit just numbers?

Caveat

This assumes request observables emit exactly one value.

答案2

得分: 3

最简单的解决方案是拥有一个进度计数器变量,在每次响应返回时从一个触发器进行更新。

    let progressCounter = 0;
   
    request1.pipe(
      tap(_ => progressCounter = 0.33),
      concatMap(n => request2(n).pipe(
        tap(_ => progressCounter = 0.66),
        concatMap(n => request3(n)
          .pipe(tap(_ => progressCounter = 1)))
      ))
    );

如果你希望进度本身可以被观察到,那么你需要共享请求可观察对象(以避免重复请求),然后将它们组合以获取进度。

你可能希望以这种方式处理的示例可以在以下链接找到:https://www.learnrxjs.io/recipes/progressbar.html

英文:

The simplest solution would be to have a progress counter variable that is updated from a tap when each response comes back.

    let progressCounter = 0;
   
    request1.pipe(
      tap(_ => progressCounter = 0.33),
      concatMap(n => request2(n).pipe(
        tap(_ => progressCounter = 0.66),
        concatMap(n => request3(n)
          .pipe(tap(_ => progressCounter = 1)))
      ))
    );

If you want the progress itself to be observable then you want to share the request observables as to not make duplicate requests) and then combine them to get the progress.

An example of how you may want to approach that can be found at: https://www.learnrxjs.io/recipes/progressbar.html

huangapple
  • 本文由 发表于 2020年1月3日 23:28:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/59581191.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定