Best way to handle Async/Await and cancellation to cancel all running tasks if any fail

huangapple go评论58阅读模式
英文:

Best way to handle Async/Await and cancellation to cancel all running tasks if any fail

问题

我有以下的架构:

> 网站 -调用-> 代理 API -调用-> 外部 API

从网站上,我必须在网站和代理 API 之间进行 <= 25 次调用。我的目标是调用所有的 API 并且要么:

1. 获取所有的结果(目前使用 `Task.WhenAll`),或者
2. 一旦有任何一个失败,捕获异常并取消所有其他任务。

**网站代码**
```csharp
async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    var apiTasks = apiInfos
        .Select( apiInfo => {
            // 简化的代码...但返回一个调用 Task<T> 函数 *没有* await
            return GetWebApiResultAsync( info, token );
        } );

    var xmlResults = await Task.WhenAll( apiTasks );

    foreach( var result in xmlResults )
    {
        // 处理结果
    }
}

async Task<XElement> GetWebApiResultAsync( ApiInfo info, CancellationToken ct = default )
{
    // 简化但本质上是调用 SendAsync,并在前后处理 info 结果后返回 XElement。
    using var response = await httpClient.SendAsync( request, cancellationToken: cancellationToken );

    return new XElement( "ResultsGeneratedFromRESTCall" );

}

代理 API

app.MapGet("/call-external", (CancellationToken token) =>
{
    // 一些准备调用外部 API 的代码,但看起来与上面的调用相同
    // 但可能会抛出异常
    using var externalResponse = await httpClient.SendAsync( httpRequest, token );

    externalResponse.EnsureSuccessStatusCode();

    var externalResult = ( await coverageResponse.Content.ReadFromJsonAsync<JsonObject>() )!;

    if ( externalResult[ "CustomCheck" ] != null )
    {
        throw new ApplicationException( "来自外部来源的自定义失败" );
    }

    // 否则生成结果(可能需要对外部 API 进行其他调用)
});

我打算将模式更改为此答案,其中实现了自定义的 ForEachAsync 方法。只需将我的函数更改为如下,是否可以实现我上述的两个目标(具体来说是抛出它收到的第一个异常并取消所有其他请求)?

新的网站代码使用 ForEachAsync

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    var xmlResults =
        await apiInfos.ForEachAsync( 
            new ParallelOptions { 
                MaxDegreeOfParallelism = apiInfos.Length, 
                CancellationToken = token
            },
            async ( apiInfo, ct ) => {
                // 现在通过 await 返回!
                return await GetWebApiResultAsync( info, ct );
            } );

    // 其余代码保持不变
}

更新:使用 CancellationTokenSource 取消其他任务

我将我的调用更新如下,但现在我在‘结束时’唯一得到的异常是 System.Threading.Tasks.TaskCanceledException 而不是最初抛出的实际异常。这是预期的吗?是否有办法找到原始异常?

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    using var cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource( token );

    var xmlResults =
        await apiInfos.ForEachAsync( 
            new ParallelOptions { 
                MaxDegreeOfParallelism = apiInfos.Length, 
                CancellationToken = cancelTokenSource.Token
            },
            async ( apiInfo, ct ) => {
                // 现在通过 await 返回!
                try
                {
                    if ( simulatedException )
                    {
                        await Task.Delay( 1500 );
                        throw new ApplicationException( "不支持邮件端点" );
                    }

                    return await GetWebApiResultAsync( info, ct );
                }
                catch
                {
                    cancelTokenSource.Cancel();
                    throw;
                }
            } );

    // 其余代码保持不变
}

<details>
<summary>英文:</summary>

I have the following architecture:

&gt; Web Site -calls-&gt; Proxy Api -calls-&gt; External Api

From the website, I have to make &lt;= 25 calls from the web site to the proxy api.  My goal is to call all apis and either:

1. Get all results (currently using `Task.WhenAll`), or
2. As soon as any of them fail, catch that exception and cancel all other tasks.

**Website Code**

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
var apiTasks = apiInfos
.Select( apiInfo => {
// Simplified code...but return a call to Task<T> function without await
return GetWebApiResultAsync( info, token );
} );

var xmlResults = await Task.WhenAll( apiTasks );

foreach( var result in xmlResults )
{
    // Process results
}

}

async Task<XElement> GetWebApiResultAsync( ApiInfo info, CancellationToken ct = default )
{
// simplified but essentially call SendAsync with some processing of info before and results after to return XElement.
using var response = await httpClient.SendAsync( request, cancellationToken: cancellationToken );

return new XElement( &quot;ResultsGeneratedFromRESTCall&quot; );

}


**Proxy Api**

app.MapGet("/call-external", (CancellationToken token) =>
{
// Some code to prep to call external api, but looks the same as call above
// but could throw exception
using var externalResponse = await httpClient.SendAsync( httpRequest, token );

externalResponse.EnsureSuccessStatusCode();

var externalResult = ( await coverageResponse.Content.ReadFromJsonAsync&lt;JsonObject&gt;() )!;

if ( externalResult[ &quot;CustomCheck&quot; ] != null )
{
    throw new ApplicationException( &quot;Custom failure from external source&quot; );
}

// Otherwise generate results (could require making additional calls to external api)

});


I was going to change my pattern to [this answer][1] that implemented a custom `ForEachAsync` method.  Would simply changing my function to look like this accomplish my two goals above (specifically throwing first exception it receives and cancelling all other requests)?

**New WebSite Code with ForEachAsync**

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
var xmlResults =
await apiInfos.ForEachAsync(
new ParallelOptions {
MaxDegreeOfParallelism = apiInfos.Length,
CancellationToken = token
},
async ( apiInfo, ct ) => {
// Now returning via await!
return await GetWebApiResultAsync( info, ct );
} );

// remaining code unchanged

}


**Update: Using CancellationTokenSource to Cancel Other Tasks**

I updated my call to the following, but now the only exception I get &#39;at the end&#39; is `System.Threading.Tasks.TaskCanceledException` instead of the actual Exception that was originally thrown.  Is that expected?  Is there a way to find the original exception?

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
using var cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource( token );

var xmlResults =
    await apiInfos.ForEachAsync( 
        new ParallelOptions { 
            MaxDegreeOfParallelism = apiInfos.Length, 
            CancellationToken = cancelTokenSource.Token
        },
        async ( apiInfo, ct ) =&gt; {
            // Now returning via await!
            try
            {
                if ( simulatedException )
                {
                    await Task.Delay( 1500 );
                    throw new ApplicationException( &quot;Emails endpoint is not supported&quot; );
                }

                return await GetWebApiResultAsync( info, ct );
            }
            catch
            {
                cancelTokenSource.Cancel();
                throw;
            }
        } );

// remaining code unchanged

}



  [1]: https://stackoverflow.com/a/71129678/166231

</details>


# 答案1
**得分**: 1

是的,您在[其他答案][1]中使用的`ForEachAsync`操作是正确的,但其中的配置`MaxDegreeOfParallelism = apiInfos.Length`有问题。通常,您确实希望限制并发性。

如果您想要使用更简单的`Task.WhenAll`来完成相同的操作,可以这样做:

```csharp
async Task ProcessApiCallsAsync(ApiInfo[] apiInfos, CancellationToken cancellationToken)
{
    using CancellationTokenSource linkedCts = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);

    Task<XElement>[] apiTasks = apiInfos.Select(apiInfo =>
    {
        try { return GetWebApiResultAsync(apiInfo, linkedCts.Token); }
        catch { linkedCts.Cancel(); throw; }
    }).ToArray();

    XElement[] xmlResults = await Task.WhenAll(apiTasks);

    //...
}

但使用Parallel.ForEachAsync更好,因为它允许限制并发性,以避免同时向远程服务器发送大量请求。

顺便说一下,在您的情况下,有一种更简单的方法来收集Parallel.ForEachAsync的结果,因为源序列是一个已知大小的数组:

async Task ProcessApiCallsAsync(ApiInfo[] apiInfos, CancellationToken cancellationToken)
{
    ParallelOptions options = new()
    { 
        MaxDegreeOfParallelism = 5, // 一个合理的数字
        CancellationToken = cancellationToken
    };

    XElement[] xmlResults = new XElement[apiInfos.Length];

    await Parallel.ForEachAsync(Enumerable.Range(0, apiInfos.Length), options, async (i, ct) =>
    {
        xmlResults[i] = await GetWebApiResultAsync(apiInfos[i], ct);
    });

    // 在这里,xmlResults数组的所有槽都填充有数据。
}
英文:

Yes, what you are doing with the ForEachAsync of the other answer is correct, with the exception of the configuration MaxDegreeOfParallelism = apiInfos.Length, which is questionable. You generally do want to impose a limit to the concurrency.

If you wanted to do the same with the simpler Task.WhenAll, you would do something like this:

async Task ProcessApiCallsAsync(ApiInfo[] apiInfos, CancellationToken cancellationToken)
{
    using CancellationTokenSource linkedCts = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);

    Task&lt;XElement&gt;[] apiTasks = apiInfos.Select(apiInfo =&gt;
    {
        try { return GetWebApiResultAsync(info, linkedCts.Token); }
        catch { cts.Cancel(); throw; }
    }).ToArray();

    XElement[] xmlResults = await Task.WhenAll(apiTasks);

    //...
}

But using the Parallel.ForEachAsync is better because it allows to limit the concurrency, so that you don't bombard the remoter server with dozens of requests simultaneously.

Btw in your case there is a simpler way to collect the results of the Parallel.ForEachAsync, because the source sequence is an array with known size:

async Task ProcessApiCallsAsync(ApiInfo[] apiInfos, CancellationToken cancellationToken)
{
    ParallelOptions options = new()
    { 
        MaxDegreeOfParallelism = 5, // A reasonable number
        CancellationToken = cancellationToken
    };

    XElement[] xmlResults = new XElement[apiInfos.Length];

    await Parallel.ForEachAsync(Enumerable.Range(0, apiInfos.Length), options, async (i, ct) =&gt;
    {
        xmlResults[i] = await GetWebApiResultAsync(apiInfos[i], ct);
    });

    // Here all the slots of the xmlResults array are filled with data.
}

答案2

得分: 1

你不需要一个关联的令牌,Parallel.ForEachAsync 会在第一个异常时自动取消任何任务,如果令牌被取消,它也会取消它们。

你需要确保将令牌传递给在 ForEachAsync Lambda 中调用的任何 async 函数,比如 Task.Delay

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    var xmlResults = new ConcurrentBag&lt;ApiInfo&gt;();
    await Parallel.ForEachAsync(apiInfos, cancelToken,
        async (apiInfo, ct) =&gt; {
            if ( simulatedException )
            {
                await Task.Delay(1500, ct);  // 传递令牌
                throw new ApplicationException(&quot;Emails endpoint is not supported&quot;);
            }

            var result = await GetWebApiResultAsync(info, ct);  // 传递令牌
            xmlResults.Add(result);
        });
    // 剩余代码保持不变
}

你可以在这个 fiddle中看到它在异常时立即取消剩余的任务。

英文:

You don't need a linked token, Parallel.ForEachAsync will automatically cancel any tasks on the first exception, and will also cancel them if the token is canceled as well.

You need to make sure to pass the token in to any async functions you are calling within the ForEachAsync lambda, such as Task.Delay.

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    var xmlResults = new ConcurrentBag&lt;ApiInfo&gt;();
    await Parallel.ForEachAsync(apiInfos, cancelToken,
        async (apiInfo, ct) =&gt; {
            if ( simulatedException )
            {
                await Task.Delay(1500, ct);  // pass token in
                throw new ApplicationException(&quot;Emails endpoint is not supported&quot;);
            }

            var result = await GetWebApiResultAsync(info, ct);  // pass token in
            xmlResults.Add(result);
        });
    // remaining code unchanged
}

You can see this in action in this fiddle where the remaining task gets canceled immediately upon an exception.

huangapple
  • 本文由 发表于 2023年6月19日 02:05:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76501926.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定