IObservable<Task<T>> 转为只产生最新 T 的 IObservable<T>

huangapple go评论46阅读模式
英文:

IObservable<Task<T>> to IObservable<T> that only produces the latest T

问题

我有一个可观察的流 IObservable&lt;Task&lt;T&gt;&gt;(称之为流A)。从这个流中,我希望产生一个 IObservable&lt;T&gt;(流B)。如果这些是我的唯一要求,我认为只需要这样说就足够了

streamB = streamA.Select(async x =&gt; await x).Select(x =&gt; x.Result);

我预期第一个 Select 是真正异步的,因为流产生更多 Task&lt;T&gt; 时,每个都会被等待。然而,第二个 Select 会"阻塞",直到每个 Task&lt;T&gt; 按顺序完成。不过,如果我理解正确,实际上并没有真正的阻塞,因为第二个 Select 不会进入,直到第一个 Select 中的每个异步操作分别完成并恢复执行。这种模式对我来说的问题在于我必须等待每个任务按它们到达的顺序完成(即成功、失败或取消)。

考虑以下情景:假设我有两个 Task&lt;T&gt;t1t2t1t2 之前到达。然而,当 t2 到达时,t1 尚未完成,即在弹珠图中

stream A: -------------------t1-------------t2-------------------------------
<br>
async Select 完成 --------------------------t1-----------t2------------
<br>
stream B: -----------------------------------------------------t2.Result----

换句话说,从流A中的t2在t1完成之前到达实际上意味着t1应该被忽略,不会被流B生成。

我已经能够通过使用非常命令式的代码并添加(过多的)复杂性来解决这个问题,通过将任务传递给一个特殊的辅助类来解决这个问题,该类跟踪到达的所有任务,使用递增的long来标识每个任务,然后等待每个任务并使用回调将id传回辅助类,告诉它哪个Task 已完成。如果id“较旧”比最后到达的任务,结果将被忽略。

我强烈感到自己过于复杂化了表面上看起来是一个简单的问题。在System.Reactive或TPL中是否有解决这种问题的基础设施或模式?

英文:

I have an observable stream IObservable&lt;Task&lt;T&gt;&gt; (call it stream A). From this stream, I wish to produce an IObservable&lt;T&gt; (stream B). If these were my only requirements, I believe it would be sufficient to say

streamB = streamA.Select(async x =&gt; await x).Select(x =&gt; x.Result);

The first Select I expect is truly asynchronous, in the sense that as the stream produces more Task&lt;T&gt; each would be awaited. The second Select however, will then "block" until each of the Task&lt;T&gt;s complete in order. If I'm thinking correctly here, there's no actual blocking going on though, because the second Select won't be entered until each async operation, respectively, in the first Select completes and resumes execution. The problem for me with this pattern is the fact that I must wait for each task to complete (i.e. succeed, fail or be cancelled) in the order that they arrive.

Consider this scenario: Assume I have two Task&lt;T&gt;s, t1 and t2. t1 arrives before t2. However, when t2 arrives, t1 has yet to complete, i.e. in a marble diagram

stream A: -------------------t1-------------t2-------------------------------
<br>
async Select completion --------------------------t1-----------t2------------
<br>
stream B: -----------------------------------------------------t2.Result----

In other words, the arrival of t2 from stream A before the completion of t1 essentially means that t1 should be ignored and not produced by stream B.

I have been able to solve this problem by using very imperative code and adding (too much) complexity, by passing the task to a special helper class that keeps track of all tasks that arrive, using an incrementing long to id each, then awaiting each task and using callbacks that pass the id back to the helper class to tell it which Task has completed. If the id is "older" than the latest to arrive, the result is ignored.

I strongly feel I am over-complicating what on the surface appears as a simple problem. Is there no infrastructure or pattern to solve this kind of problem, either in System.Reactive or e.g. in TPL?

答案1

得分: 2

这是您需要的查询:

IObservable<T> streamB = streamA.Select(t => Observable.FromAsync(() => t)).Switch();

你几乎总是应该避免使用 .Select(async x => await x),因为它会破坏 Rx 协议,允许由于 async 将控制返回到调用线程而导致重叠执行。

IObservable<Task<T>> 转换为 IObservable<T> 的正常方法是使用 .SelectMany(t => Observable.FromAsync(() => t))

然而,在你的情况下,如果有新的任务进来,你想要丢弃当前正在计算的值。因此,这将查询从 .SelectMany(t => Observable.FromAsync(() => t)) 更改为 .Select(t => Observable.FromAsync(() => t)).Switch()

.Switch()IObservable<IObservable<T>> 转换为 IObservable<T>,只生成外部可观测对象生成的最新内部可观测对象的值。它有效地忽略除最新的可观测对象之外的所有内容。这正是你所需要的。

以下是这个工作的演示:

void Main()
{
    IObservable<Task<long>> streamA = new []
    {
        ReturnDelayedAsync(1),
        ReturnDelayedAsync(42),
    }.ToObservable();
    
    IObservable<long> streamB = streamA.Select(t => Observable.FromAsync(() => t)).Switch();
    
    streamB.Subscribe(Console.WriteLine);
}

public async Task<long> ReturnDelayedAsync(long x)
{
    await Task.Delay(TimeSpan.FromSeconds(2.0));
    return x;
}

这会在控制台上产生一个值 42

英文:

Here's the query you need:

IObservable&lt;T&gt; streamB = streamA.Select(t =&gt; Observable.FromAsync(() =&gt; t)).Switch();

You should almost always avoid .Select(async x =&gt; await x) as it breaks the Rx contract allowing overlapping executions due to the async sending control back to the calling thread.

The normal way to turn an IObservable&lt;Task&lt;T&gt;&gt; to an IObservable&lt;T&gt; is to use .SelectMany(t =&gt; Observable.FromAsync(() =&gt; t)).

In your case, though, you want to throw away any currently computing values if a new task comes through. So this changes the query from .SelectMany(t =&gt; Observable.FromAsync(() =&gt; t)) to .Select(t =&gt; Observable.FromAsync(() =&gt; t)).Switch().

.Switch() turns an IObservable&lt;IObservable&lt;T&gt;&gt; into IObservable&lt;T&gt; by only producing values from the latest inner observable produced by the outer observable. It effectively ignores all but the latest observable. Just what you need.

Here's a demonstration of this working:

void Main()
{
	IObservable&lt;Task&lt;long&gt;&gt; streamA = new []
	{
		ReturnDelayedAsync(1),
		ReturnDelayedAsync(42),
	}.ToObservable();
	
	IObservable&lt;long&gt; streamB = streamA.Select(t =&gt; Observable.FromAsync(() =&gt; t)).Switch();
	
	streamB.Subscribe(Console.WriteLine);
}

public async Task&lt;long&gt; ReturnDelayedAsync(long x)
{
	await Task.Delay(TimeSpan.FromSeconds(2.0));
	return x;
}

That produces a single value of 42 on the console.

huangapple
  • 本文由 发表于 2023年2月7日 04:07:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/75366069.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定