Angular拦截器队列

huangapple go评论60阅读模式
英文:

Angular interceptor queue

问题

我想在Angular中创建一个队列机制。在将数据收集到数组中后,我想按顺序发送请求,并在成功后发送下一个请求。

我做的但没有成功的代码如下:

private requestQueue: HttpRequest<any>[] = [];
isQueueActive: boolean = false;
intercept(request: HttpRequest<any>, next: HttpHandler): any {

    this.requestQueue.push(request);
    if (!this.isQueueActive) {
        this.isQueueActive = true;
        return this.processQueue(next);
    }
    return new Observable<HttpEvent<any>>();
}

private processQueue(next: HttpHandler): Observable<HttpEvent<any>> {
    const request = this.requestQueue.shift();

    if (request) {
        return next.handle(request).pipe(
            tap((data: any) => {
                this.processQueue(next);
            }),
        );
    } else {
        this.isQueueActive = false;
        return new Observable<HttpEvent<any>>();
    }
}

我的代码是这样的,我也可以接受其他建议,如npm包等。如果您能帮助我,我将非常高兴 Angular拦截器队列

英文:

I want to create a queue mechanism in Angular. After collecting the data in an array, I want to throw the requests in order and throw the next one when successful.

The code that I did but did not work is below

<!-- begin snippet: js hide: false console: true babel: false -->

<!-- language: lang-js -->

private requestQueue: HttpRequest&lt;any&gt;[] = [];
  isQueueActive: boolean = false;
  intercept(request: HttpRequest&lt;any&gt;, next: HttpHandler): any {
 
      this.requestQueue.push(request);
      if (!this.isQueueActive) {
        this.isQueueActive = true;
        return this.processQueue(next);
 }
      return new Observable&lt;HttpEvent&lt;any&gt;&gt;();
    } 


  private processQueue(next: HttpHandler): Observable&lt;HttpEvent&lt;any&gt;&gt; {
    const request = this.requestQueue.shift(); 

    if (request) {
      return next.handle(request).pipe(
        tap((data: any) =&gt; {
          this.processQueue(next);
        }),
      ); 
    } else {
      this.isQueueActive = false;
      return new Observable&lt;HttpEvent&lt;any&gt;&gt;(); 
    }
  }

<!-- end snippet -->

my code in this way, I am open to other suggestions, npm package etc. I would be very happy if you help me Angular拦截器队列

答案1

得分: 3

I think the reason you're struggling here is because you're trying to mix an imperative and a reactive approach. It's usually a better idea to stick to one only. When it gets to managing complex async operations, sticking to reactive programming is probably your best bet.

Here's how I'd approach it:

@Injectable()
export class QueueRequestsInterceptor implements HttpInterceptor {
  private intercepted$$ = new Subject<Intercepted>();

  private queue$ = this.intercepted$$.pipe(
    concatMap(({ request, next }) =>
      next.handle(request).pipe(
        materialize(),
        map((materializedResponse) => ({
          request,
          materializedResponse,
        }))
      )
    ),
    share()
  );

  public intercept(
    request: HttpRequest<unknown>,
    next: HttpHandler
  ): Observable<HttpEvent<unknown>> {
    return merge(
      this.queue$.pipe(
        filter((res) => res.request === request),
        first(),
        map(({ materializedResponse }) => materializedResponse),
        dematerialize()
      ),
      defer(() => {
        this.intercepted$$.next({ request, next });
        return EMPTY;
      })
    );
  }
}

First of all, I create a subject where we'll push our requests in queue.

Then we create the queue and use concatMap to make sure that the requests happen sequentially, in order.

If one request fails (if an HTTP call returns an error), we want to make sure that we don't make the entire queue error, or the queue would be lost/stopped, we use materialize. Whether it's a success or a throw, it'll wrap the observable result into an object. Meaning it'll never throw anymore. Then, we map to keep both the request as we'll want to filter on it later on and the materialized response. Finally for that block, we share otherwise each request would create a new subscription to the queue and nothing would be queued up.

Now, in the intercept block, we'll come back to the merge and defer, let's look at our queue first. We want to listen to the queue and filter for the current request only. Which we can do as using the map we discussed above, we returned the exact request so we can compare that directly. With the filter done, the first thing to make sure of is to avoid a potential memory leak by keeping the observable open! Because the queue will remain open, so we use first. Now, if the original HTTP call failed, we still want our app to receive the original behavior for that HTTP call and not the wrapped object (that we got because of materialize to not kill the queue). Therefore we call dematerialize which does the exact opposite and will throw for the current request if there was an error (but our queue will still be alive for other requests!).

Now let's get back to the merge. We could have had the following code:

this.intercepted$$.next({ request, next });

return this.queue$.pipe(
  filter((res) => res.request === request),
  map(({ materializedResponse }) => materializedResponse),
  dematerialize()
);

It'd work in most cases... But if the request is resolved straight away (unlikely... but could happen for example in a test if you mock the http response using of which would be synchronously handled) OR if the HTTP call fails straight away, which I assume could happen for example when you're not connected to any network and you get an error 0, I suspect this may not even be asynchronous and therefore would be missed as we start listening to the queue too late. To make sure we cover 100% of the cases and not 99%, I've used a merge to make sure we start listening to the queue first, and then subscribe to the defer which triggers the side effect of putting the current request in the queue.

I've made a live demo where I created a second interceptor to mock the responses of HTTP calls.

I've applied a 3s delay on each request and here's the output:

All at the same time we first get:

Putting request http://mocked-request-anyway/0 in queue
Putting request http://mocked-request-anyway/1 in queue

+3s:

Received response for request http://mocked-request-anyway/0
{count: 1}

+3s:

Received response for request http://mocked-request-anyway/1
{count: 2}
英文:

I think the reason you're struggling here is because you're trying to mix an imperative and a reactive approach. It's usually a better idea to stick to one only. When it gets to managing complex async operations, sticking to reactive programming is probably your best bet.

Here's how I'd approach it:

@Injectable()
export class QueueRequestsInterceptor implements HttpInterceptor {
  private intercepted$$ = new Subject&lt;Intercepted&gt;();

  private queue$ = this.intercepted$$.pipe(
    concatMap(({ request, next }) =&gt;
      next.handle(request).pipe(
        materialize(),
        map((materializedResponse) =&gt; ({
          request,
          materializedResponse,
        }))
      )
    ),
    share()
  );

  public intercept(
    request: HttpRequest&lt;unknown&gt;,
    next: HttpHandler
  ): Observable&lt;HttpEvent&lt;unknown&gt;&gt; {
    return merge(
      this.queue$.pipe(
        filter((res) =&gt; res.request === request),
        first(),
        map(({ materializedResponse }) =&gt; materializedResponse),
        dematerialize()
      ),
      defer(() =&gt; {
        this.intercepted$$.next({ request, next });
        return EMPTY;
      })
    );
  }
}

First of all, I create a subject where we'll push our requests in queue.

Then we create the queue and use concatMap to make sure that the requests happen sequentially, in order.

If one request fails (if an HTTP call returns an error), we want to make sure that we don't make the entire queue error, or the queue would be lost/stopped, we use materialize. Whether it's a success or a throw, it'll wrap the observable result into an object. Meaning it'll never throw anymore. Then, we map to keep both the request as we'll want to filter on it later on and the materialized response. Finally for that block, we share otherwise each request would create a new subscription to the queue and nothing would be queued up.

Now, in the intercept block, we'll come back to the merge and defer, let's look at our queue first. We want to listen to the queue and filter for the current request only. Which we can do as using the map we discussed above, we returned the exact request so we can compare that directly. With the filter done, the first thing to make sure of is to avoid a potential memory leak by keeping the observable open! Because the queue will remain open, so we use first. Now, if the original HTTP call failed, we still want our app to receive the original behavior for that HTTP call and not the wrapped object (that we got because of materialize to not kill the queue). Therefore we call dematerialize which does the exact opposite and will throw for the current request if there was an error (but our queue will still be alive for other requests!).

Now let's get back to the merge. We could have had the following code:

this.intercepted$$.next({ request, next });

return this.queue$.pipe(
  filter((res) =&gt; res.request === request),
  map(({ materializedResponse }) =&gt; materializedResponse),
  dematerialize()
);

It'd work in most cases... But if the request is resolved straight away (unlikely... but could happen for example in a test if you mock the http response using of which would be synchronously handled) OR if the HTTP call fails straight away, which I assume could happen for example when you're not connected to any network and you get an error 0, I suspect this may not even be asynchronous and therefore would be missed as we start listening to the queue too late. To make sure we cover 100% of the cases and not 99%, I've used a merge to make sure we start listening to the queue first, and then subscribe to the defer which triggers the side effect of putting the current request in the queue.

I've made a live demo where I created a second interceptor to mock the responses of HTTP calls.

I've applied a 3s delay on each request and here's the output:

All at the same time we first get:

Putting request http://mocked-request-anyway/0 in queue
Putting request http://mocked-request-anyway/1 in queue

+3s:

Received response for request http://mocked-request-anyway/0
{count: 1}

+3s:

Received response for request http://mocked-request-anyway/1
{count: 2}

huangapple
  • 本文由 发表于 2023年6月22日 04:23:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/76526882.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定