英文:
IObservable<Task<T>> to IObservable<T> that only produces the latest T
问题
我有一个可观察的流 IObservable<Task<T>>
(称之为流A)。从这个流中,我希望产生一个 IObservable<T>
(流B)。如果这些是我的唯一要求,我认为只需要这样说就足够了
streamB = streamA.Select(async x => await x).Select(x => x.Result);
我预期第一个 Select
是真正异步的,因为流产生更多 Task<T>
时,每个都会被等待。然而,第二个 Select
会"阻塞",直到每个 Task<T>
按顺序完成。不过,如果我理解正确,实际上并没有真正的阻塞,因为第二个 Select
不会进入,直到第一个 Select
中的每个异步操作分别完成并恢复执行。这种模式对我来说的问题在于我必须等待每个任务按它们到达的顺序完成(即成功、失败或取消)。
考虑以下情景:假设我有两个 Task<T>
,t1
和 t2
。t1
在 t2
之前到达。然而,当 t2
到达时,t1
尚未完成,即在弹珠图中
stream A: -------------------t1-------------t2-------------------------------
<br>
async Select 完成 --------------------------t1-----------t2------------
<br>
stream B: -----------------------------------------------------t2.Result----
换句话说,从流A中的t2在t1完成之前到达实际上意味着t1应该被忽略,不会被流B生成。
我已经能够通过使用非常命令式的代码并添加(过多的)复杂性来解决这个问题,通过将任务传递给一个特殊的辅助类来解决这个问题,该类跟踪到达的所有任务,使用递增的long
来标识每个任务,然后等待每个任务并使用回调将id传回辅助类,告诉它哪个Task
已完成。如果id“较旧”比最后到达的任务,结果将被忽略。
我强烈感到自己过于复杂化了表面上看起来是一个简单的问题。在System.Reactive
或TPL中是否有解决这种问题的基础设施或模式?
英文:
I have an observable stream IObservable<Task<T>>
(call it stream A
). From this stream, I wish to produce an IObservable<T>
(stream B
). If these were my only requirements, I believe it would be sufficient to say
streamB = streamA.Select(async x => await x).Select(x => x.Result);
The first Select
I expect is truly asynchronous, in the sense that as the stream produces more Task<T>
each would be awaited. The second Select
however, will then "block" until each of the Task<T>
s complete in order. If I'm thinking correctly here, there's no actual blocking going on though, because the second Select
won't be entered until each async operation, respectively, in the first Select
completes and resumes execution. The problem for me with this pattern is the fact that I must wait for each task to complete (i.e. succeed, fail or be cancelled) in the order that they arrive.
Consider this scenario: Assume I have two Task<T>
s, t1
and t2
. t1
arrives before t2
. However, when t2
arrives, t1
has yet to complete, i.e. in a marble diagram
stream A: -------------------t1-------------t2-------------------------------
<br>
async Select completion --------------------------t1-----------t2------------
<br>
stream B: -----------------------------------------------------t2.Result----
In other words, the arrival of t2
from stream A before the completion of t1
essentially means that t1 should be ignored and not produced by stream B.
I have been able to solve this problem by using very imperative code and adding (too much) complexity, by passing the task to a special helper class that keeps track of all tasks that arrive, using an incrementing long
to id each, then await
ing each task and using callbacks that pass the id back to the helper class to tell it which Task
has completed. If the id is "older" than the latest to arrive, the result is ignored.
I strongly feel I am over-complicating what on the surface appears as a simple problem. Is there no infrastructure or pattern to solve this kind of problem, either in System.Reactive
or e.g. in TPL?
答案1
得分: 2
这是您需要的查询:
IObservable<T> streamB = streamA.Select(t => Observable.FromAsync(() => t)).Switch();
你几乎总是应该避免使用 .Select(async x => await x)
,因为它会破坏 Rx 协议,允许由于 async
将控制返回到调用线程而导致重叠执行。
将 IObservable<Task<T>>
转换为 IObservable<T>
的正常方法是使用 .SelectMany(t => Observable.FromAsync(() => t))
。
然而,在你的情况下,如果有新的任务进来,你想要丢弃当前正在计算的值。因此,这将查询从 .SelectMany(t => Observable.FromAsync(() => t))
更改为 .Select(t => Observable.FromAsync(() => t)).Switch()
。
.Switch()
将 IObservable<IObservable<T>>
转换为 IObservable<T>
,只生成外部可观测对象生成的最新内部可观测对象的值。它有效地忽略除最新的可观测对象之外的所有内容。这正是你所需要的。
以下是这个工作的演示:
void Main()
{
IObservable<Task<long>> streamA = new []
{
ReturnDelayedAsync(1),
ReturnDelayedAsync(42),
}.ToObservable();
IObservable<long> streamB = streamA.Select(t => Observable.FromAsync(() => t)).Switch();
streamB.Subscribe(Console.WriteLine);
}
public async Task<long> ReturnDelayedAsync(long x)
{
await Task.Delay(TimeSpan.FromSeconds(2.0));
return x;
}
这会在控制台上产生一个值 42
。
英文:
Here's the query you need:
IObservable<T> streamB = streamA.Select(t => Observable.FromAsync(() => t)).Switch();
You should almost always avoid .Select(async x => await x)
as it breaks the Rx contract allowing overlapping executions due to the async
sending control back to the calling thread.
The normal way to turn an IObservable<Task<T>>
to an IObservable<T>
is to use .SelectMany(t => Observable.FromAsync(() => t))
.
In your case, though, you want to throw away any currently computing values if a new task comes through. So this changes the query from .SelectMany(t => Observable.FromAsync(() => t))
to .Select(t => Observable.FromAsync(() => t)).Switch()
.
.Switch()
turns an IObservable<IObservable<T>>
into IObservable<T>
by only producing values from the latest inner observable produced by the outer observable. It effectively ignores all but the latest observable. Just what you need.
Here's a demonstration of this working:
void Main()
{
IObservable<Task<long>> streamA = new []
{
ReturnDelayedAsync(1),
ReturnDelayedAsync(42),
}.ToObservable();
IObservable<long> streamB = streamA.Select(t => Observable.FromAsync(() => t)).Switch();
streamB.Subscribe(Console.WriteLine);
}
public async Task<long> ReturnDelayedAsync(long x)
{
await Task.Delay(TimeSpan.FromSeconds(2.0));
return x;
}
That produces a single value of 42
on the console.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论