如何解决涉及BehaviorSubject和异步函数的竞态条件?

huangapple go评论52阅读模式
英文:

how to solve race condition involving behavior subject and async function?

问题

我有一个异步函数,同时被多次调用,我假设在Behavior Subject上调用.next(true)也是异步的,因此当同时多次调用这个函数时,后续调用中await firstValueFrom(this.isBusy$);返回的值仍然为false,尽管通过初始函数调用调用了.next(true)

如何才能确保后续的调用会正确反映isBusy$的值?

我制作了一个StackBlitz示例来说明我的问题:
https://stackblitz.com/edit/rxjs-wrw3h6?file=index.ts

另外,我应该注意,尽管我在StackBlitz中没有涉及Angular,但实际上这段代码来自Angular的HTTP拦截器,其中会执行以下操作:

intercept(request: HttpRequest<unknown>, next: HttpHandler): Observable<HttpEvent<unknown>> {

  let headers: AuthorizationHeaders;
  const getAuthorizationHeader = async () => {
    headers = await this.authService.headers();
  };

  return from(getAuthorizationHeader()).pipe(
    switchMap(() => next.handle(request.clone({
      setHeaders: headers,
    })))
  );
}

问题是,由于它从未知道授权调用是否繁忙,所以在页面上有多个组件同时进行API调用时,它会重复调用以获取刷新令牌。

因此,问题是,如何确保进行大量API调用,但只有一个调用用于刷新令牌,所有其他调用在令牌调用完成之前都等待,然后返回授权标头?

英文:

I have an async function that is called multiple times simultaneously, and I am assuming calling .next(true) on a behavior subject is also asynchronous, therefore when multiple calls to this function happen simultaneously the value returned from await firstValueFrom(this.isBusy$); in the subsequent calls are still false, even though .next(true) was called via the initial function caller.

How can I make it so that subsequent calls will properly reflect the correct isBusy$ value?

I have made a stackblitz to illustrate my problem:
https://stackblitz.com/edit/rxjs-wrw3h6?file=index.ts

Also I should note, although I made the stackblitz not involve Angular, the code is actually from an Angular HTTP Interceptor, where it would be doing:

intercept(request: HttpRequest&lt;unknown&gt;, next: HttpHandler): Observable&lt;HttpEvent&lt;unknown&gt;&gt; {

  let headers: AuthorizationHeaders;
  const getAuthorizationHeader = async () =&gt; {
    headers = await this.authService.headers();
  };

  return from(getAuthorizationHeader()).pipe(
    switchMap(() =&gt; next.handle(request.clone({
      setHeaders: headers,
    })))
  );
}

And the problem is, because it's never knowing that the auth call is busy, it's calling to get a refresh token repeatedly when there are multiple components on the page making api calls simultaneously.

So the question is, how can I make it so that a bunch of api calls are made, but only one call to refresh a token is made, and all other ones wait until the token call is completed before it returns the authorization header?

答案1

得分: 1

不要混合使用 promises 和 RxJs。将头部暴露为可观察对象。

headers$ = isBusy$.pipe(
  filter(isBusy => !isBusy),
  switchMap(() => {
    if (this.isTokenExpired) {
      this.isBusySubject.next(true);
      return getTokenCall().pipe(
        tap(token => {
          this.token = token;
          this.isBusySubject.next(false);
        }),
        map(token => ({ Authorization: `Bearer ${this.token}` }))
      );
    } else {
      return of({ Authorization: `Bearer ${this.token}` });
    }
  })
)

然后,你只需在拦截器中从服务返回可观察对象。

关于行为主题完全同步。

const { BehaviorSubject } = rxjs;

const bs$ = new BehaviorSubject(0);

bs$.subscribe(val => { console.log('Emitting ' + val); });

console.log('Calling next on 1');
bs$.next(1);
console.log('Calling next on 2');
bs$.next(2);
console.log('Calling next on 3');
bs$.next(3);

将延迟应用于它们会使它们异步,并排队执行。

const { BehaviorSubject, delay } = rxjs;

const bs$ = new BehaviorSubject(0);

bs$.pipe(delay()).subscribe(val => { console.log('Emitting ' + val); });

console.log('Calling next on 1');
bs$.next(1);
console.log('Calling next on 2');
bs$.next(2);
console.log('Calling next on 3');
bs$.next(3);
英文:

Don't mix promises and RxJs. Expose the header as an observable.

headers$ = isBusy$.pipe(
  filter(isBusy =&gt; !isBusy),
  switchMap(() =&gt; {
    if (this.isTokenExpired) {
      this.isBusySubject.next(true);
      return getTokenCall().pipe(
        tap(token) =&gt; {
          this.token = token;
          this.isBusySubject.next(false);
        }),
        map(token =&gt; (({ Authorization: `Bearer ${this.token}` })))
      );
    } else {
      return of({ Authorization: `Bearer ${this.token}` });
    }
  })
)

Then you just return the observable from the service in your interceptor.

About behavior subjects being completely synchronous.

<!-- begin snippet: js hide: false console: true babel: false -->

<!-- language: lang-js -->

const { BehaviorSubject } = rxjs;

const bs$ = new BehaviorSubject(0);

bs$.subscribe(val =&gt; { console.log(&#39;Emitting &#39; + val); });

console.log(&#39;Calling next on 1&#39;);
bs$.next(1);
console.log(&#39;Calling next on 2&#39;);
bs$.next(2);
console.log(&#39;Calling next on 3&#39;);
bs$.next(3);

<!-- language: lang-html -->

&lt;script src=&quot;https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js&quot;&gt;&lt;/script&gt;

<!-- end snippet -->

putting a delay on them makes the asynchronous and will queue them for execution

<!-- begin snippet: js hide: false console: true babel: false -->

<!-- language: lang-js -->

const { BehaviorSubject, delay } = rxjs;

const bs$ = new BehaviorSubject(0);

bs$.pipe(delay()).subscribe(val =&gt; { console.log(&#39;Emitting &#39; + val); });

console.log(&#39;Calling next on 1&#39;);
bs$.next(1);
console.log(&#39;Calling next on 2&#39;);
bs$.next(2);
console.log(&#39;Calling next on 3&#39;);
bs$.next(3);

<!-- language: lang-html -->

&lt;script src=&quot;https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js&quot;&gt;&lt;/script&gt;

<!-- end snippet -->

答案2

得分: 0

BehaviorSubject以同步方式工作。而不是使用 await firstValueFrom(this.isBusy$),请使用 this.isBusySubject.value

英文:

BehaviorSubject works in synchronous fashion. Instead of using await firstValueFrom(this.isBusy$) use this.isBusySubject.value.

huangapple
  • 本文由 发表于 2023年2月24日 10:56:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/75552236.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定