英文:
Angular interceptor queue
问题
我想在Angular中创建一个队列机制。在将数据收集到数组中后,我想按顺序发送请求,并在成功后发送下一个请求。
我做的但没有成功的代码如下:
private requestQueue: HttpRequest<any>[] = [];
isQueueActive: boolean = false;
intercept(request: HttpRequest<any>, next: HttpHandler): any {
this.requestQueue.push(request);
if (!this.isQueueActive) {
this.isQueueActive = true;
return this.processQueue(next);
}
return new Observable<HttpEvent<any>>();
}
private processQueue(next: HttpHandler): Observable<HttpEvent<any>> {
const request = this.requestQueue.shift();
if (request) {
return next.handle(request).pipe(
tap((data: any) => {
this.processQueue(next);
}),
);
} else {
this.isQueueActive = false;
return new Observable<HttpEvent<any>>();
}
}
我的代码是这样的,我也可以接受其他建议,如npm包等。如果您能帮助我,我将非常高兴
英文:
I want to create a queue mechanism in Angular. After collecting the data in an array, I want to throw the requests in order and throw the next one when successful.
The code that I did but did not work is below
<!-- begin snippet: js hide: false console: true babel: false -->
<!-- language: lang-js -->
private requestQueue: HttpRequest<any>[] = [];
isQueueActive: boolean = false;
intercept(request: HttpRequest<any>, next: HttpHandler): any {
this.requestQueue.push(request);
if (!this.isQueueActive) {
this.isQueueActive = true;
return this.processQueue(next);
}
return new Observable<HttpEvent<any>>();
}
private processQueue(next: HttpHandler): Observable<HttpEvent<any>> {
const request = this.requestQueue.shift();
if (request) {
return next.handle(request).pipe(
tap((data: any) => {
this.processQueue(next);
}),
);
} else {
this.isQueueActive = false;
return new Observable<HttpEvent<any>>();
}
}
<!-- end snippet -->
my code in this way, I am open to other suggestions, npm package etc. I would be very happy if you help me
答案1
得分: 3
I think the reason you're struggling here is because you're trying to mix an imperative and a reactive approach. It's usually a better idea to stick to one only. When it gets to managing complex async operations, sticking to reactive programming is probably your best bet.
Here's how I'd approach it:
@Injectable()
export class QueueRequestsInterceptor implements HttpInterceptor {
private intercepted$$ = new Subject<Intercepted>();
private queue$ = this.intercepted$$.pipe(
concatMap(({ request, next }) =>
next.handle(request).pipe(
materialize(),
map((materializedResponse) => ({
request,
materializedResponse,
}))
)
),
share()
);
public intercept(
request: HttpRequest<unknown>,
next: HttpHandler
): Observable<HttpEvent<unknown>> {
return merge(
this.queue$.pipe(
filter((res) => res.request === request),
first(),
map(({ materializedResponse }) => materializedResponse),
dematerialize()
),
defer(() => {
this.intercepted$$.next({ request, next });
return EMPTY;
})
);
}
}
First of all, I create a subject where we'll push our requests in queue.
Then we create the queue and use concatMap
to make sure that the requests happen sequentially, in order.
If one request fails (if an HTTP call returns an error), we want to make sure that we don't make the entire queue error, or the queue would be lost/stopped, we use materialize
. Whether it's a success or a throw, it'll wrap the observable result into an object. Meaning it'll never throw anymore. Then, we map
to keep both the request as we'll want to filter on it later on and the materialized response. Finally for that block, we share
otherwise each request would create a new subscription to the queue and nothing would be queued up.
Now, in the intercept
block, we'll come back to the merge
and defer
, let's look at our queue first. We want to listen to the queue and filter for the current request only. Which we can do as using the map
we discussed above, we returned the exact request so we can compare that directly. With the filter done, the first thing to make sure of is to avoid a potential memory leak by keeping the observable open! Because the queue will remain open, so we use first
. Now, if the original HTTP call failed, we still want our app to receive the original behavior for that HTTP call and not the wrapped object (that we got because of materialize
to not kill the queue). Therefore we call dematerialize
which does the exact opposite and will throw for the current request if there was an error (but our queue will still be alive for other requests!).
Now let's get back to the merge. We could have had the following code:
this.intercepted$$.next({ request, next });
return this.queue$.pipe(
filter((res) => res.request === request),
map(({ materializedResponse }) => materializedResponse),
dematerialize()
);
It'd work in most cases... But if the request is resolved straight away (unlikely... but could happen for example in a test if you mock the http response using of
which would be synchronously handled) OR if the HTTP call fails straight away, which I assume could happen for example when you're not connected to any network and you get an error 0, I suspect this may not even be asynchronous and therefore would be missed as we start listening to the queue too late. To make sure we cover 100% of the cases and not 99%, I've used a merge
to make sure we start listening to the queue first, and then subscribe to the defer
which triggers the side effect of putting the current request in the queue.
I've made a live demo where I created a second interceptor to mock the responses of HTTP calls.
I've applied a 3s delay on each request and here's the output:
All at the same time we first get:
Putting request http://mocked-request-anyway/0 in queue
Putting request http://mocked-request-anyway/1 in queue
+3s:
Received response for request http://mocked-request-anyway/0
{count: 1}
+3s:
Received response for request http://mocked-request-anyway/1
{count: 2}
英文:
I think the reason you're struggling here is because you're trying to mix an imperative and a reactive approach. It's usually a better idea to stick to one only. When it gets to managing complex async operations, sticking to reactive programming is probably your best bet.
Here's how I'd approach it:
@Injectable()
export class QueueRequestsInterceptor implements HttpInterceptor {
private intercepted$$ = new Subject<Intercepted>();
private queue$ = this.intercepted$$.pipe(
concatMap(({ request, next }) =>
next.handle(request).pipe(
materialize(),
map((materializedResponse) => ({
request,
materializedResponse,
}))
)
),
share()
);
public intercept(
request: HttpRequest<unknown>,
next: HttpHandler
): Observable<HttpEvent<unknown>> {
return merge(
this.queue$.pipe(
filter((res) => res.request === request),
first(),
map(({ materializedResponse }) => materializedResponse),
dematerialize()
),
defer(() => {
this.intercepted$$.next({ request, next });
return EMPTY;
})
);
}
}
First of all, I create a subject where we'll push our requests in queue.
Then we create the queue and use concatMap
to make sure that the requests happen sequentially, in order.
If one request fails (if an HTTP call returns an error), we want to make sure that we don't make the entire queue error, or the queue would be lost/stopped, we use materialize
. Whether it's a success or a throw, it'll wrap the observable result into an object. Meaning it'll never throw anymore. Then, we map
to keep both the request as we'll want to filter on it later on and the materialized response. Finally for that block, we share
otherwise each request would create a new subscription to the queue and nothing would be queued up.
Now, in the intercept
block, we'll come back to the merge
and defer
, let's look at our queue first. We want to listen to the queue and filter for the current request only. Which we can do as using the map
we discussed above, we returned the exact request so we can compare that directly. With the filter done, the first thing to make sure of is to avoid a potential memory leak by keeping the observable open! Because the queue will remain open, so we use first
. Now, if the original HTTP call failed, we still want our app to receive the original behavior for that HTTP call and not the wrapped object (that we got because of materialize
to not kill the queue). Therefore we call dematerialize
which does the exact opposite and will throw for the current request if there was an error (but our queue will still be alive for other requests!).
Now let's get back to the merge. We could have had the following code:
this.intercepted$$.next({ request, next });
return this.queue$.pipe(
filter((res) => res.request === request),
map(({ materializedResponse }) => materializedResponse),
dematerialize()
);
It'd work in most cases... But if the request is resolved straight away (unlikely... but could happen for example in a test if you mock the http response using of
which would be synchronously handled) OR if the HTTP call fails straight away, which I assume could happen for example when you're not connected to any network and you get an error 0, I suspect this may not even be asynchronous and therefore would be missed as we start listening to the queue too late. To make sure we cover 100% of the cases and not 99%, I've used a merge
to make sure we start listening to the queue first, and then subscribe to the defer
which triggers the side effect of putting the current request in the queue.
I've made a live demo where I created a second interceptor to mock the responses of HTTP calls.
I've applied a 3s delay on each request and here's the output:
All at the same time we first get:
Putting request http://mocked-request-anyway/0 in queue
Putting request http://mocked-request-anyway/1 in queue
+3s:
Received response for request http://mocked-request-anyway/0
{count: 1}
+3s:
Received response for request http://mocked-request-anyway/1
{count: 2}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论