TPL Dataflow块在完成后修改状态并发送单个消息

huangapple go评论65阅读模式
英文:

TPL Dataflow block that modifies state and sends a single message after it completes

问题

我刚开始学习TPL Dataflow,有一个如下描述的问题:

  1. “Block” 1、2、3 持有状态的引用。它们修改状态并在每次接收到消息时向下游发送消息。此类块的数量不定。
  2. “Aggregator” 从这些块接收消息并检查所有消息是否存在错误。当所有源块都已完成并聚合器将一条消息传递给“Releaser”。
  3. “Releaser” 持有状态的引用。它将从“Aggregator”了解更新是否正确完成,并将带有成功消息或失败消息的状态发送到下游。
public static void Run()
{
    var sourceBlock1 = new TransformBlock<int, int>(x => x * 2);
    var sourceBlock2 = new TransformBlock<int, int>(x => x * 3);

    // 如何实现一个聚合器,以从未知数量的源聚合消息,然后在所有源完成时返回一条消息?
    var aggregater = new TransformBlock<int, int[]>(x => ?); 
    var releaser = new TransformBlock<int[], int>(xs => xs.Sum());

    sourceBlock1.LinkTo(aggregater);
    sourceBlock2.LinkTo(aggregater);
    aggregater.LinkTo(releaser);

    sourceBlock1.Post(10);
    sourceBlock2.Post(20);

    targetBlock.Completion.Wait();
}

在代码中,需要实现一个聚合器,以从未知数量的源块聚合消息,然后在所有源块完成时返回一条消息。

英文:

I just started to learn about TPL Dataflow and have a question as described below:
TPL Dataflow块在完成后修改状态并发送单个消息

  1. "Block"s 1, 2, 3 holds references to states. They modify the states and send messages downstream each time they receive a message. The number of such blocks varies.

  2. The "Aggregator" receives messages from the Blocks and check all the messages for errors. After all source blocks are Completed and aggregator passes a single message to the "Releaser".

  3. "Releaser" holds a reference to the state. It will know from "Aggregator" whether the updating is done correctly and will send the state with a success message or a failure message downstream.

        public static void Run()
    {
        var sourceBlock1 = new TransformBlock&lt;int, int&gt;(x =&gt; x * 2);
        var sourceBlock2 = new TransformBlock&lt;int, int&gt;(x =&gt; x * 3);
    
        //How to implement the aggregator that aggregates messages from an unknown number of sources and then return a message
        //when all sources are complete?
        var aggregater = new TransformBlock&lt;int, int[]&gt;(x =&gt; ?); 
        var releaser = new TransformBlock&lt;int[], int&gt;(xs =&gt; xs.Sum());
    
        sourceBlock1.LinkTo(aggregater);
        sourceBlock2.LinkTo(aggregater);
        aggregater.LinkTo(releaser);
    
        sourceBlock1.Post(10);
        sourceBlock2.Post(20);
    
        targetBlock.Completion.Wait();
    }
    

答案1

得分: 1

在这行代码中:

sourceBlock1.LinkTo(aggregater);

...aggregater 没有收到通知,表明它已成为 sourceBlock1 的链接目标。ISourceBlock&lt;TOutput&gt;.LinkTo 方法仅改变源的状态,而不影响目标。只有当目标通过 ITargetBlock&lt;TInput&gt;.OfferMessage 方法收到第一条消息时,它才会意识到已经被链接:

public DataflowMessageStatus OfferMessage (
    DataflowMessageHeader messageHeader,
    TInput messageValue,
    ISourceBlock&lt;TInput&gt; source,
    bool consumeToAccept);

即便如此,也不能保证 source 参数将引用 sourceBlock1,因为 sourceBlock1 可选择拦截其与链接目标之间的内部代理 ISourceBlock&lt;TInput&gt; 实现。因此,我认为你不能仅仅使用现有接口的方法来实现你想要的。也许你可以为你的自定义聚合器块提供一个允许双向链接感知的 API。例如:

aggregater.Observe(sourceBlock1);
aggregater.Observe(sourceBlock2);

至于如何将多个块的完成传播到单个目标块,请参考以下链接:

英文:

In this line:

sourceBlock1.LinkTo(aggregater);

...the aggregater receives no notification that it has become the linked target of the sourceBlock1. The ISourceBlock&lt;TOutput&gt;.LinkTo method changes only the state of the source, not the target. The target will only become aware that it has been linked when it is offered the first message, via the ITargetBlock&lt;TInput&gt;.OfferMessage method:

public DataflowMessageStatus OfferMessage (
    DataflowMessageHeader messageHeader,
    TInput messageValue,
    ISourceBlock&lt;TInput&gt; source,
    bool consumeToAccept);

Even then, it's not guaranteed that the source argument will be a reference to the sourceBlock1, since the sourceBlock1 can opt to intercept an internal proxy ISourceBlock&lt;TInput&gt; implementation between itself and its linked target. So I don't think that you can achieve what you want using solely the methods of the existing interfaces. Maybe you could equip your custom aggregator block with an API that allows for bidirectional link-awareness. For example:

aggregater.Observe(sourceBlock1);
aggregater.Observe(sourceBlock2);

As for how to propagate the completion of multiple blocks to a single target block, take a look at these links:

huangapple
  • 本文由 发表于 2023年2月10日 10:46:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/75406455.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定