英文:
RXJS takeUntil and switchMap with a timer seems not to complete inner timer
问题
我有以下代码:
private takeControlTaskRunning$ = new BehaviorSubject
private initTakeControlTask() {
this.takeControlTaskRunning$
.pipe(
takeUntil(this.dependencies.destroyed$),
switchMap((isRunning) => {
if (!isRunning) {
return of(false);
}
return timer(0, this.updatePeriodMilliseconds).pipe(
takeUntil(this.dependencies.destroyed$), <--- 为什么需要这个?
map(() => {
return true;
})
);
}),
filter((isRunning) => isRunning === true),
)
.subscribe(async () => {
await this.syncVideo();
});
}
public takeControl() {
this.takeControlTaskRunning$.next(true);
}
public stopControl() {
this.takeControlTaskRunning$.next(false);
}
当我在一个fakeAsync测试中运行它时,
describe('takeControl', () => {
it('runs the update task periodically', fakeAsync(async () => {
// 给定一个控制器实例
const { controller, dependencies } = await create();
// 当调用takeControl时
await controller.takeControl();
// 并且经过一段时间
jasmine.clock().tick(1);
// 那么更新任务应该正在运行
expect(
dependencies.updateSync as jasmine.Spy
).toHaveBeenCalle();
destroyed$.next();
flush();
}));
});
我得到以下错误:
Error: 1 periodic timer(s) still in the queue.
添加一个内部的takeUntil似乎可以修复它,但我不明白为什么需要它。据我理解,外部的takeUntil应该完成可观察对象,而switchMap应该完成内部的计时器,为什么情况不是这样呢?
英文:
I have the following code:
private takeControlTaskRunning$ = new BehaviorSubject<boolean>(false);
private initTakeControlTask() {
this.takeControlTaskRunning$
.pipe(
takeUntil(this.dependencies.destroyed$),
switchMap((isRunning) => {
if (!isRunning) {
return of(false);
}
return timer(0, this.updatePeriodMilliseconds).pipe(
takeUntil(this.dependencies.destroyed$), <--- Why is this needed?
map(() => {
return true;
})
);
}),
filter((isRunning) => isRunning === true),
)
.subscribe(async () => {
await this.syncVideo();
});
}
public takeControl() {
this.takeControlTaskRunning$.next(true);
}
public stopControl() {
this.takeControlTaskRunning$.next(false);
}
When I run this in an fakeAsync test,
describe('takeControl', () => {
it('runs the update task periodically', fakeAsync(async () => {
// Given an controller instance
const { controller, dependencies } = await create();
// When takeControl is called
await controller.takeControl();
// And some time passes
jasmine.clock().tick(1);
// Then the update task should be running
expect(
dependencies.updateSync as jasmine.Spy
).toHaveBeenCalle();
destroyed$.next();
flush();
}));
});
I get the following error:
Error: 1 periodic timer(s) still in the queue.
Adding an inner takeUntil seems to fix it, but I don't understand why it is required. As far as I understand the outer takeUntil should complete the observable, and the switchMap should complete the inner timer, why is this not the case?
答案1
得分: 4
问题在于操作符的顺序。作为一个通用的经验法则,takeUntil()
应该始终是操作链中的最后一个操作符。
在你的情况下会发生什么是,this.takeControlTaskRunning$
发出,然后通过 switchMap
投影到 timer()
中。但是当 takeUntil()
发出 complete
通知时,它会被仍在等待其内部 Observable(在本例中是 timer()
)完成的 switchMap
阻止。
如果你将 takeUntil()
放在 switchMap()
之后,complete
通知将会到达观察者,从而触发取消订阅,即使 switchMap()
中的内部 Observable 仍然挂起,也会立即取消订阅。
英文:
The problem is order of operators. As a general rule of thumb takeUntil()
should always be the last operator in a chain.
What happens in your case is that this.takeControlTaskRunning$
emits which is projected using switchMap
into timer()
. But when takeUntil()
emits complete
notification it's stopped by switchMap
that still waits for its inner Observable (timer()
in this case) to complete.
If you put takeUntil()
after switchMap()
the complete
notification will reach the observer which triggers unsubscription even when the inner Observable in switchMap()
is still pending and will be unsubscribed immediatelly.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论