英文:
rxjs: merge 2 streams, complete stream when a specific input emits
问题
I understand, here is the translation:
我正在编写一个Angular应用程序,并设置了一个系统,其中一个NGRX效果将向一个服务请求数据。
在底层,这个服务将执行两件事情:
- 检查本地缓存(sqlite)以获取所请求的数据,以及
- 联系API以获取最新的数据
我想定义这个服务的一个observable响应,它将发出API的响应(如果是首次/本地缓存未命中的情况)或者按顺序先本地然后是API的数据。我已经在下面形象地展示了这两种情况:
情景1(API首次/缓存未命中)
API ---1
CACHE ---------2
MERGE ---1
情景2(缓存命中)
API ---------1
CACHE ---2
MERGE ---2-----1
我考虑创建一个Subject,然后同时触发两个调用,并在API响应时完成Subject(无论是成功还是出错)。
伪代码:
const subject = new Subject();
this.cache.getData().then(res => subject.next(res)).catch(err => subject.error(err));
this.api.getData().then(res => subject.next(res)).catch(err => subject.error(err)).finally(() => subject.complete());
return subject.asObservable();
但我想知道是否有一个更清晰的解决方案来实现我在这里尝试的目标?非常感谢您的任何建议。
英文:
I'm writing an Angular app and setting up a system where an NGRX effect will request data to a service.
Underlyingly, this service will do two things:
- Check local cache (sqlite) for the requested data, and
- Contact the API to get the most up to date data
I would like to define an observable response of this service which would emit either only the API's response (in case it was first/local cache miss) or local and then API in that order. I've visualised these 2 scenarios below:
Scenario 1 (API is first/cache miss)
API ---1
CACHE ---------2
MERGE ---1
Scenario 2 (Cache hit)
API ---------1
CACHE ---2
MERGE ---2-----1
I'm thinking of creating a Subject, then firing off both calls and completing the Subject when the API responds (either successfully or with an error).
Pseudo code:
const subject = new Subject();
this.cache.getData().then(res => subject.next(res)).catch(err => subject.error(err));
this.api.getData().then(res => subject.next(res)).catch(err => subject.error(err)).finally(() => subject.complete());
return subject.asObservable();
But I'm wondering if there's a cleaner solution to what I'm trying to achieve here? Any input greatly appreciated.
答案1
得分: 2
如果我理解正确,您希望触发两个调用,A)API,B)缓存。如果A首先返回结果,请发出该结果,但不要来自B的响应。如果B首先返回,请发出它,但在A的响应到达时也发出来自A的响应。
基本上,您总是希望来自A的响应。只有当B在A之前收到时,您才希望来自B的响应。
如果是这种情况,您可以定义两个源可观察对象并使用merge
将它们合并。
private apiData$ = defer(() => this.api.getData()).pipe(share());
private cachedData$ = defer(() => this.cache.getData()).pipe(
filter(existing => !!existing),
takeUntil(apiData$)
);
data$ = merge(this.cachedData$, this.apiData$);
在这里,我们使用defer
在订阅时将Promise转换为Observable。
apiData$
使用share()
,因为我们将最终有两个订阅,我们不希望每个订阅都触发单独的调用。
cachedData$
使用filter
来抑制您不想要的发射(当数据不存在于缓存中时)。
takeUntil
用于在api$
发出任何值时完成cachedData$
可观察对象,因此在apiData$
之后,cachedData$
将不再发出值。
最后,merge
用于将两个源合并为一个单一的可观察对象,因此data$
将在任何一个源发出时都发出值。
这里有一个StackBlitz链接。
英文:
If I understand correctly, you want to fire two calls, A) API, B) Cache. If A returns first emit that result, but not response from B. If B returns first, emit that, but then also emit response from A when it arrives.
Essentially, you always want response from A. You only want response from B if it is received before A.
If that's the case you could define two source observables and merge
them.
private apiData$ = defer(() => this.api.getData()).pipe(share());
private cachedData$ = defer(() => this.cache.getData()).pipe(
filter(existing => !!existing),
takeUntil(apiData$)
);
data$ = merge(this.cachedData$, this.apiData$);
Here we use defer
to turn your Promise into Observable upon subscription.
apiData$
is using share()
because we are going to end up with two subscriptions and we don't want fire separate calls for each subscription.
cachedData$
uses filter
to suppress emissions you don't want (when data doesn't exist in cache).
takeUntil
is used to complete the cachedData$
observable when api$
emits any value, therefore cachedData$
will never emit after apiData$
Finally, merge
is used to combine both sources into a single observable, so data$
will emit whenever either source emits.
Here's a StackBlitz.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论