Using rxjs how can I queue HTTP requests such that it waits for an existing request to complete and then sends only the most recent request?

huangapple go评论119阅读模式
英文:

Using rxjs how can I queue HTTP requests such that it waits for an existing request to complete and then sends only the most recent request?

问题

我有一个可观察对象,用于跟踪表单的更改。当表单更新时,它应该通过向服务器发送HTTP PUT请求来自动保存。我希望的行为是,当进行更改时,首次发送HTTP请求。对于后续的更改,只有在前一个请求完成后才发送请求。当前一个请求完成后,我希望仅发送最近的更新。

我考虑过使用exhaustMap,例如 formChanges$.pipe(exhaustMap(data => sendHttpUpdate(data)));,因为这会给我想要的行为,即在请求进行时忽略所有更新。然而,在进行中会丢失所有数据,因此我需要重新触发formChanges$可观察对象以发送最新的更新,否则它将被忽略。

我还考虑过使用concatMap,例如 formChanges$.pipe(concatMap(data => sendHttpUpdate(data)));,因为它等待前一个请求完成,但按顺序发送所有更新,而不仅仅是最新的更新。

我想使用纯rxjs解决方案,即使用Observables、Subjects和操作符,而不使用变量设置状态。我是否忽略了一个明显的操作符或操作符组合,可以实现这一目标?

这是我迄今为止的最佳尝试:

const $send = new Subject();

formChanges$.pipe(
    sample($send),
    switchMap(this.fakeHttpRequest),
  )
  .subscribe(handleResponse);

formChanges$.pipe(first())
  .subscribe(() => $send.next());

其中fakeHttpRequest是一个函数,它接受formChanges$的值并返回HTTP请求的Observable。

这并不令人满意,因为我必须在第一次表单更改后手动触发第一次发送,并且我觉得它并没有很好地传达代码的意图。

英文:

I have a observable which tracks changes to a form. When the form is updated it should autosave by sending an HTTP PUT request to the server. I want the behaviour so that when changes are made the first HTTP request is sent. For any subsequent changes no requests should be sent until the previous request has completed. When the previous request has completed I want to send only the most recent update.

I have thought about using exhaustMap e.g. formChanges$.pipe(exhaustMap(data => sendHttpUpdate(data))); as this would give me the behaviour that would ignore all updates while a request was in progress. However, all data is lost while in progress so I would need to retrigger the formChanges$ observable to send the latest update or it would be ignored.

I also considered concatMap e.g. formChanges$.pipe(concatMap(data => sendHttpUpdate(data))); as this waits for previous request to complete but sends all updates in order rather than just the lastest one.

I would like to use a pure rxjs solution i.e using Observables, Subjects and operators without setting state using a variable. Am I missing an obvious operator or combination of operators that could achieve this?

Here is my best attempt so far:

    const $send = new Subject();

    formChanges$.pipe(
        sample($send),
        switchMap(this.fakeHttpRequest),
      )
      .subscribe(handleResponse);



    formChanges$.pipe(first())
      .subscribe(() => $send.next());

where fakeHttpRequest is a function which takes the value formChanges$ and returns an Observable of the HTTP request.

This is unsatisfactory as I have to manually trigger the first send after the first form change and I feel it doesn't convey the intention of the code very well.

答案1

得分: 2

I can provide the translated content as requested:

"我简直无法相信,花了我9个月的时间才弄清楚这个问题,但如果还有其他人为此而苦恼的话:

正如我意识到的那样,map操作符(concatMap、exhaustMap)都不能完全满足要求,所以你需要自己处理映射以及哪些值被订阅。解决方案是将每个表单值映射成一个HTTP请求的可观察对象(但尚未订阅,因此没有请求)。然后将这个可观察对象作为参数传递给throttle。Throttle接受一个可观察对象,它会订阅它,并在原始源的值完成之前忽略这些值。Throttle还接受一个配置参数,允许你定义前导值和尾随值的行为,使其非常灵活。在这种情况下,我们都要,因为我们想知道第一次请求的结果以及最后一个缓冲请求的结果。

从中我们现在有一个包含所有我们想要的源值的可观察对象,即请求数据。然而,我们想要的是响应数据!简单地再次订阅将导致重复的HTTP调用(可能具有不同的结果)。不用担心,这就是shareReplay派上用场的地方。如果我们通过shareReplay将原始请求可观察对象传递给它,那么对可观察对象的后续订阅将只获得原始响应,而不会发出新的请求。由于我们有一个可观察对象的可观察对象,我们将要使用mergeAll操作符将其扁平化。现在我们可以订阅它以获取我们的HTTP请求的结果。

将所有这些组合在一起:

formChanges$.pipe(
  map(formValues => this.fakeHttpRequest(formValues).pipe(shareReplay(1))),
  throttle(x => x, {leading: true, trailing: true}),
  mergeAll(),
).subscribe(result => console.log(result));

使用内部知识,我假设你关心结果。如果不是这样,那么这将更简单。我们可以简单地让throttle发送请求,而不必担心重复请求:

formChanges$.pipe(
  map(formValues => this.fakeHttpRequest(formValues))),
  throttle(x => x, {leading: true, trailing: true}),
).subscribe();
英文:

I can't believe it has taken me 9 months to figure this out but in case anyone else is tearing their hair out over this:

As I realised none of the map operators (concatMap, exhaustMap) do exactly what is required so you need to handle the mapping and then which values get subscribed to yourself. The solution is to map each form value into a HTTP request observable (not subscribed yet so no request). Then pass this observable as the parameter to throttle. Throttle takes an observable, which it subscribes to and ignores values from the original source until this completes. Throttle also takes a configuration parameter which allows you to define the behaviour of leading and trailing values - making it very flexible. For this circumstance we want both because we want to know the result of that first request and the result of that last buffered request.

From this we now we have an observable with all the source values we want i.e the request data. However, we want the response data! Simply subscribing again would end up in duplicate HTTP calls (which may have a different result). Fear not as this is where shareReplay comes to the rescue. If we pipe the original request observable through shareReplay then subsequent subscribes to the observable will now just get the original response rather making a new call. Since we have an observable of observables we are going to want to flatten this using the mergeAll operator. We can now subscribe to this to get the results of our HTTP requests.

Putting this all together:

formChanges$.pipe(
  map(formValues => this.fakeHttpRequest(formValues).pipe(shareReplay(1)))
  throttle(x => x, {leading: true, trailing: true}),
  mergeAll(),
).subscribe((result) => console.log(result));

Using insider knowledge I assumed you cared about the result. If this wasn't true then this would be simpler. We could simply let throttle send the requests and we wouldn't have to worry about duplicate requests:

formChanges$.pipe(
  map(formValues => this.fakeHttpRequest(formValues)))
  throttle(x => x, {leading: true, trailing: true}),
).subscribe();

答案2

得分: 0

Here is the translated code snippet:

// 开始代码片段: js 隐藏: false 控制台: true Babel: false

// 语言: lang-js

changeStarted: Subject<boolean>; // 用于跟踪保存是否已开始;
changeComplete: Subject<boolean>; // 用于跟踪保存是否已完成;

constructor() {
  this.changeStarted = new Subject<boolean>(); 
  this.changeComplete = new Subject<boolean>(); 

  formChanges$.pipe(
    bufferToggle(this.changeStarted, () => this.firstCall ? interval(1) : this.changeComplete) // 一旦开始更改,等待更改完成。
  ).subscribe(x => {
    this.changeStarted.next(true); // 信号开始缓冲。
    // 进行 HTTP 请求
    this.fakeHttpRequest.subscribe(() => this.changeComplete.next(true));
  });
}

// 结束代码片段

(Note: I've made corrections to the code to instantiate the Subject objects with new, as it was missing in the original code.)

英文:

hmm if i understand it something like this could work:

<!-- begin snippet: js hide: false console: true babel: false -->

<!-- language: lang-js -->

changeStarted: Subject&lt;boolean&gt;; // Keep track if save started;
changeComplete: Subject&lt;boolean&gt;; // Keep track if save finished;

constructor() {
  this.changeStarted =  Subject&lt;boolean&gt;(); 
  this.changeComplete =  Subject&lt;boolean&gt;(); 
  
  formChanges$.pipe(
    bufferToggle(this.changeStarted, () =&gt; this.firstCall ? interval(1) : this.changeComplete) // once change has started wait till change is completed.
  ).subscribe(x =&gt; {
    this.changeStarted.next(true) //signal to start buffering.
    // Make the http request
    this.fakeHttpRequest.subscribe(() =&gt; this.changeComplete.next(true));

}

<!-- end snippet -->

huangapple
  • 本文由 发表于 2020年1月3日 21:17:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/59579277.html