如何解决涉及BehaviorSubject和异步函数的竞态条件?

huangapple go评论84阅读模式
英文:

how to solve race condition involving behavior subject and async function?

问题

我有一个异步函数,同时被多次调用,我假设在Behavior Subject上调用.next(true)也是异步的,因此当同时多次调用这个函数时,后续调用中await firstValueFrom(this.isBusy$);返回的值仍然为false,尽管通过初始函数调用调用了.next(true)

如何才能确保后续的调用会正确反映isBusy$的值?

我制作了一个StackBlitz示例来说明我的问题:
https://stackblitz.com/edit/rxjs-wrw3h6?file=index.ts

另外,我应该注意,尽管我在StackBlitz中没有涉及Angular,但实际上这段代码来自Angular的HTTP拦截器,其中会执行以下操作:

  1. intercept(request: HttpRequest<unknown>, next: HttpHandler): Observable<HttpEvent<unknown>> {
  2. let headers: AuthorizationHeaders;
  3. const getAuthorizationHeader = async () => {
  4. headers = await this.authService.headers();
  5. };
  6. return from(getAuthorizationHeader()).pipe(
  7. switchMap(() => next.handle(request.clone({
  8. setHeaders: headers,
  9. })))
  10. );
  11. }

问题是,由于它从未知道授权调用是否繁忙,所以在页面上有多个组件同时进行API调用时,它会重复调用以获取刷新令牌。

因此,问题是,如何确保进行大量API调用,但只有一个调用用于刷新令牌,所有其他调用在令牌调用完成之前都等待,然后返回授权标头?

英文:

I have an async function that is called multiple times simultaneously, and I am assuming calling .next(true) on a behavior subject is also asynchronous, therefore when multiple calls to this function happen simultaneously the value returned from await firstValueFrom(this.isBusy$); in the subsequent calls are still false, even though .next(true) was called via the initial function caller.

How can I make it so that subsequent calls will properly reflect the correct isBusy$ value?

I have made a stackblitz to illustrate my problem:
https://stackblitz.com/edit/rxjs-wrw3h6?file=index.ts

Also I should note, although I made the stackblitz not involve Angular, the code is actually from an Angular HTTP Interceptor, where it would be doing:

  1. intercept(request: HttpRequest&lt;unknown&gt;, next: HttpHandler): Observable&lt;HttpEvent&lt;unknown&gt;&gt; {
  2. let headers: AuthorizationHeaders;
  3. const getAuthorizationHeader = async () =&gt; {
  4. headers = await this.authService.headers();
  5. };
  6. return from(getAuthorizationHeader()).pipe(
  7. switchMap(() =&gt; next.handle(request.clone({
  8. setHeaders: headers,
  9. })))
  10. );
  11. }

And the problem is, because it's never knowing that the auth call is busy, it's calling to get a refresh token repeatedly when there are multiple components on the page making api calls simultaneously.

So the question is, how can I make it so that a bunch of api calls are made, but only one call to refresh a token is made, and all other ones wait until the token call is completed before it returns the authorization header?

答案1

得分: 1

不要混合使用 promises 和 RxJs。将头部暴露为可观察对象。

  1. headers$ = isBusy$.pipe(
  2. filter(isBusy => !isBusy),
  3. switchMap(() => {
  4. if (this.isTokenExpired) {
  5. this.isBusySubject.next(true);
  6. return getTokenCall().pipe(
  7. tap(token => {
  8. this.token = token;
  9. this.isBusySubject.next(false);
  10. }),
  11. map(token => ({ Authorization: `Bearer ${this.token}` }))
  12. );
  13. } else {
  14. return of({ Authorization: `Bearer ${this.token}` });
  15. }
  16. })
  17. )

然后,你只需在拦截器中从服务返回可观察对象。

关于行为主题完全同步。

  1. const { BehaviorSubject } = rxjs;
  2. const bs$ = new BehaviorSubject(0);
  3. bs$.subscribe(val => { console.log('Emitting ' + val); });
  4. console.log('Calling next on 1');
  5. bs$.next(1);
  6. console.log('Calling next on 2');
  7. bs$.next(2);
  8. console.log('Calling next on 3');
  9. bs$.next(3);

将延迟应用于它们会使它们异步,并排队执行。

  1. const { BehaviorSubject, delay } = rxjs;
  2. const bs$ = new BehaviorSubject(0);
  3. bs$.pipe(delay()).subscribe(val => { console.log('Emitting ' + val); });
  4. console.log('Calling next on 1');
  5. bs$.next(1);
  6. console.log('Calling next on 2');
  7. bs$.next(2);
  8. console.log('Calling next on 3');
  9. bs$.next(3);
英文:

Don't mix promises and RxJs. Expose the header as an observable.

  1. headers$ = isBusy$.pipe(
  2. filter(isBusy =&gt; !isBusy),
  3. switchMap(() =&gt; {
  4. if (this.isTokenExpired) {
  5. this.isBusySubject.next(true);
  6. return getTokenCall().pipe(
  7. tap(token) =&gt; {
  8. this.token = token;
  9. this.isBusySubject.next(false);
  10. }),
  11. map(token =&gt; (({ Authorization: `Bearer ${this.token}` })))
  12. );
  13. } else {
  14. return of({ Authorization: `Bearer ${this.token}` });
  15. }
  16. })
  17. )

Then you just return the observable from the service in your interceptor.

About behavior subjects being completely synchronous.

<!-- begin snippet: js hide: false console: true babel: false -->

<!-- language: lang-js -->

  1. const { BehaviorSubject } = rxjs;
  2. const bs$ = new BehaviorSubject(0);
  3. bs$.subscribe(val =&gt; { console.log(&#39;Emitting &#39; + val); });
  4. console.log(&#39;Calling next on 1&#39;);
  5. bs$.next(1);
  6. console.log(&#39;Calling next on 2&#39;);
  7. bs$.next(2);
  8. console.log(&#39;Calling next on 3&#39;);
  9. bs$.next(3);

<!-- language: lang-html -->

  1. &lt;script src=&quot;https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js&quot;&gt;&lt;/script&gt;

<!-- end snippet -->

putting a delay on them makes the asynchronous and will queue them for execution

<!-- begin snippet: js hide: false console: true babel: false -->

<!-- language: lang-js -->

  1. const { BehaviorSubject, delay } = rxjs;
  2. const bs$ = new BehaviorSubject(0);
  3. bs$.pipe(delay()).subscribe(val =&gt; { console.log(&#39;Emitting &#39; + val); });
  4. console.log(&#39;Calling next on 1&#39;);
  5. bs$.next(1);
  6. console.log(&#39;Calling next on 2&#39;);
  7. bs$.next(2);
  8. console.log(&#39;Calling next on 3&#39;);
  9. bs$.next(3);

<!-- language: lang-html -->

  1. &lt;script src=&quot;https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js&quot;&gt;&lt;/script&gt;

<!-- end snippet -->

答案2

得分: 0

BehaviorSubject以同步方式工作。而不是使用 await firstValueFrom(this.isBusy$),请使用 this.isBusySubject.value

英文:

BehaviorSubject works in synchronous fashion. Instead of using await firstValueFrom(this.isBusy$) use this.isBusySubject.value.

huangapple
  • 本文由 发表于 2023年2月24日 10:56:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/75552236.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定