英文:
how to solve race condition involving behavior subject and async function?
问题
我有一个异步函数,同时被多次调用,我假设在Behavior Subject上调用.next(true)
也是异步的,因此当同时多次调用这个函数时,后续调用中await firstValueFrom(this.isBusy$);
返回的值仍然为false,尽管通过初始函数调用调用了.next(true)
。
如何才能确保后续的调用会正确反映isBusy$
的值?
我制作了一个StackBlitz示例来说明我的问题:
https://stackblitz.com/edit/rxjs-wrw3h6?file=index.ts
另外,我应该注意,尽管我在StackBlitz中没有涉及Angular,但实际上这段代码来自Angular的HTTP拦截器,其中会执行以下操作:
intercept(request: HttpRequest<unknown>, next: HttpHandler): Observable<HttpEvent<unknown>> {
let headers: AuthorizationHeaders;
const getAuthorizationHeader = async () => {
headers = await this.authService.headers();
};
return from(getAuthorizationHeader()).pipe(
switchMap(() => next.handle(request.clone({
setHeaders: headers,
})))
);
}
问题是,由于它从未知道授权调用是否繁忙,所以在页面上有多个组件同时进行API调用时,它会重复调用以获取刷新令牌。
因此,问题是,如何确保进行大量API调用,但只有一个调用用于刷新令牌,所有其他调用在令牌调用完成之前都等待,然后返回授权标头?
英文:
I have an async function that is called multiple times simultaneously, and I am assuming calling .next(true)
on a behavior subject is also asynchronous, therefore when multiple calls to this function happen simultaneously the value returned from await firstValueFrom(this.isBusy$);
in the subsequent calls are still false, even though .next(true)
was called via the initial function caller.
How can I make it so that subsequent calls will properly reflect the correct isBusy$
value?
I have made a stackblitz to illustrate my problem:
https://stackblitz.com/edit/rxjs-wrw3h6?file=index.ts
Also I should note, although I made the stackblitz not involve Angular, the code is actually from an Angular HTTP Interceptor, where it would be doing:
intercept(request: HttpRequest<unknown>, next: HttpHandler): Observable<HttpEvent<unknown>> {
let headers: AuthorizationHeaders;
const getAuthorizationHeader = async () => {
headers = await this.authService.headers();
};
return from(getAuthorizationHeader()).pipe(
switchMap(() => next.handle(request.clone({
setHeaders: headers,
})))
);
}
And the problem is, because it's never knowing that the auth call is busy, it's calling to get a refresh token repeatedly when there are multiple components on the page making api calls simultaneously.
So the question is, how can I make it so that a bunch of api calls are made, but only one call to refresh a token is made, and all other ones wait until the token call is completed before it returns the authorization header?
答案1
得分: 1
不要混合使用 promises 和 RxJs。将头部暴露为可观察对象。
headers$ = isBusy$.pipe(
filter(isBusy => !isBusy),
switchMap(() => {
if (this.isTokenExpired) {
this.isBusySubject.next(true);
return getTokenCall().pipe(
tap(token => {
this.token = token;
this.isBusySubject.next(false);
}),
map(token => ({ Authorization: `Bearer ${this.token}` }))
);
} else {
return of({ Authorization: `Bearer ${this.token}` });
}
})
)
然后,你只需在拦截器中从服务返回可观察对象。
关于行为主题完全同步。
const { BehaviorSubject } = rxjs;
const bs$ = new BehaviorSubject(0);
bs$.subscribe(val => { console.log('Emitting ' + val); });
console.log('Calling next on 1');
bs$.next(1);
console.log('Calling next on 2');
bs$.next(2);
console.log('Calling next on 3');
bs$.next(3);
将延迟应用于它们会使它们异步,并排队执行。
const { BehaviorSubject, delay } = rxjs;
const bs$ = new BehaviorSubject(0);
bs$.pipe(delay()).subscribe(val => { console.log('Emitting ' + val); });
console.log('Calling next on 1');
bs$.next(1);
console.log('Calling next on 2');
bs$.next(2);
console.log('Calling next on 3');
bs$.next(3);
英文:
Don't mix promises and RxJs. Expose the header as an observable.
headers$ = isBusy$.pipe(
filter(isBusy => !isBusy),
switchMap(() => {
if (this.isTokenExpired) {
this.isBusySubject.next(true);
return getTokenCall().pipe(
tap(token) => {
this.token = token;
this.isBusySubject.next(false);
}),
map(token => (({ Authorization: `Bearer ${this.token}` })))
);
} else {
return of({ Authorization: `Bearer ${this.token}` });
}
})
)
Then you just return the observable from the service in your interceptor.
About behavior subjects being completely synchronous.
<!-- begin snippet: js hide: false console: true babel: false -->
<!-- language: lang-js -->
const { BehaviorSubject } = rxjs;
const bs$ = new BehaviorSubject(0);
bs$.subscribe(val => { console.log('Emitting ' + val); });
console.log('Calling next on 1');
bs$.next(1);
console.log('Calling next on 2');
bs$.next(2);
console.log('Calling next on 3');
bs$.next(3);
<!-- language: lang-html -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
<!-- end snippet -->
putting a delay on them makes the asynchronous and will queue them for execution
<!-- begin snippet: js hide: false console: true babel: false -->
<!-- language: lang-js -->
const { BehaviorSubject, delay } = rxjs;
const bs$ = new BehaviorSubject(0);
bs$.pipe(delay()).subscribe(val => { console.log('Emitting ' + val); });
console.log('Calling next on 1');
bs$.next(1);
console.log('Calling next on 2');
bs$.next(2);
console.log('Calling next on 3');
bs$.next(3);
<!-- language: lang-html -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js"></script>
<!-- end snippet -->
答案2
得分: 0
BehaviorSubject以同步方式工作。而不是使用 await firstValueFrom(this.isBusy$)
,请使用 this.isBusySubject.value
。
英文:
BehaviorSubject works in synchronous fashion. Instead of using await firstValueFrom(this.isBusy$)
use this.isBusySubject.value
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论