Parallel.ForEach循环ThreadAbortException已超过最大的等待/休眠/加入时间。

huangapple go评论123阅读模式
英文:

Parallel.ForEach loop ThreadAbortException The Maximum Wait/Sleep/Join time has been exceeded

问题

以下是代码的翻译部分:

// 创建 HttpClient
using (HttpClient httpClient = new HttpClient())
{
    // 尝试以并行线程方式获取文档
    Parallel.ForEach(documentIds, 
        new ParallelOptions { MaxDegreeOfParallelism = 
            Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.50) * 2.0))},
        (documentId) =>
    {
        try
        {
            // 将数据发送到服务
            using (HttpResponseMessage response =
                httpClient.PostAsync(serviceURL, new StringContent(
                requestBody.ToString())).GetAwaiter().GetResult())
            {
                // 如果服务失败则显示错误
                if (response.IsSuccessStatusCode == false)
                {
                    throw new Exception("获取内容时发生错误:" +
                        response?.ReasonPhrase);
                }

                // 获取返回的内容
                using (Stream responseStream = response.Content.ReadAsStreamAsync()
                    .GetAwaiter().GetResult())
                {
                    // 将文件写入文件系统
                }
            }
        }
        catch (Exception ex)
        {
            // 捕获错误

            // 注意:虽然可以捕获 ThreadAbortException,但它会跳出 Parallel.ForEach,
            // 但我希望它能够继续处理其他文档
        }
    });
}

更新 1
我已经让它工作了,但解决方法并不是一个好方法。
基本上,问题出在 HttpClient 上。
如果我在 Parallel.ForEach 循环内创建一个新的 HttpClient,那么它将允许我处理 ThreadAbortException 而不会跳出循环。
Microsoft 的最佳实践建议不要创建多个 HttpClient,否则它可能在清理之前保留太多的打开套接字。

英文:

I am using Parallel.ForEach loop to download multiple files from a Web API in parallel, in a .NET 4.8 application.
The problem is that sometimes I get a ThreadAbortException / The Maximum Wait/Sleep/Join time has been exceeded (00:05:00).
I can't seem to find a way to extend the "Maximum Wait Timeout" for the treads inside the Parallel.ForEach loop.

Sample code causing the problem.

// create the httpclient
using (HttpClient httpClient = new HttpClient())
{
	// now try to get documents in a parallel threaded task.
	Parallel.ForEach(documentIds, 
        new ParallelOptions { MaxDegreeOfParallelism = 
            Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.50) * 2.0))},
        (documentId) =>
	{
		try
		{
			// post the data to the service
			using (HttpResponseMessage response =
				httpClient.PostAsync(serviceURL, new StringContent(
                requestBody.ToString())).GetAwaiter().GetResult())
			{
				// show error if service failed
				if (response.IsSuccessStatusCode == false)
				{
					throw new Exception("Error getting content. " +
                        response?.ReasonPhrase);
				}

				// get back content
				using (Stream responseStream = response.Content.ReadAsStreamAsync()
                    .GetAwaiter().GetResult())
				{
					// write the file to the FileSystem
				}
			}
		}
		catch (Exception ex)
		{
			// eat the error
			
			// NOTE: This does catch the ThreadAbortException but it breaks out
            // of the Parallel.ForEach BUT I want it to continue with other documents
		}
	});
}

Update 1
I got it to work but the solution is not a good one.
Basically the HttpClient is the problem.
If I create a NEW HttpClient inside the Parallel.ForEach loop, then it will let me handle the ThreadAbortException without breaking out of the loop.
MS best practices says to not create multiple HttpClients else it can keep to many open sockets before cleanup.

答案1

得分: 3

这是代码的翻译部分:

"That's a misuse of Parallel.ForEach. That method is simply not meant for concurrent operations, it's only meant for in-memory data parallelism. This code blocks all CPU cores spinwaiting while waiting for responses. That means it pegs the CPU at 100% for a while before the threads start getting evicted.

Use Parallel.ForEachAsync instead, or Dataflow ActionBlock, eg :

await Parallel.ForEachAsync(documentIds, async (documentId,ct)=>{ ... });

This will use roughly as many worker tasks as there cores to process the data. You can specify a different number (higher or lower) with the ParallelOptions parameter

await Parallel.ForEachAsync(documentIds, 
    new ParallelOptions { MaxDegreeOfParallelism = 4},
    async (documentId,ct)=>{ ... });

You may want to specify a lower number if the files are large, flooding the network, or you can increase it if there are a lot of smaller files, or the response takes a lot time to start.

ActionBlock<T> - .NET Framework 4.8

Another option, one that can scale to an entire processing pipeline, is to use a Dataflow ActionBlock to process data concurrently.

var dop=new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = 10,
            BoundedCapacity=10
         };
var httpClient = new HttpClient()
var downloader=new ActionBlock&lt;string&gt;(docId=&gt;DownloadAsync(httpClient,docId),dop);

Once you have the downloader block you can start posting doc IDs to it. When done, you tell the block to complete and await for all pending messages to finish processing:

foreach(var docId in documentIds)
{
    await downloader.SendAsync(docId);
}
downloader.Complete();
await downloader.Completion;

The downloading method itself can be fully asynchronous:

async Task DownloadAsync(HttpClient client,string documentId)
{
    var url=...;
    var req=new StringContent(requestBody.ToString());
    var response=await client.PostAsync(serviceURL, req);
    ...
    using var dlStream=await response.Content.ReadAsStreamAsync();
    using var fileStream=File.Create(somePath);
    await dlStream.CopyTo(fileStream);
}

All Dataflow blocks have an input buffer to hold input messages. By default, it has no limit. Adding BoundedCapacity=10 will cause the posting code to pause if more than 10 messages are pending. This can be used to throttle publishing code that's faster than the processing code and avoid filling up memory."

英文:

That's a misuse of Parallel.ForEach. That method is simply not meant for concurrent operations, it's only meant for in-memory data parallelism. This code blocks all CPU cores spinwaiting while waiting for responses. That means it pegs the CPU at 100% for a while before the threads start getting evicted.

Use Parallel.ForEachAsync instead, or Dataflow ActionBlock, eg :

await Parallel.ForEachAsync(documentIds, async (documentId,ct)=&gt;{
...
});

This will use roughly as many worker tasks as there cores to process the data. You can specify a different number (higher or lower) with the ParallelOptions parameter

await Parallel.ForEachAsync(documentIds, 
    new ParallelOptions { MaxDegreeOfParallelism = 4},
    async (documentId,ct)=&gt;{
    ...
    });

You may want to specify a lower number if the files are large, flooding the network, or you can increase it if there are a lot of smaller files, or the response takes a lot time to start.

ActionBlock<T> - .NET Framework 4.8

Another option, one that can scale to an entire processing pipeline, is to use a Dataflow ActionBlock to process data concurrently.

var dop=new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = 10,
            BoundedCapacity=10
         };
var httpClient = new HttpClient()
var downloader=new ActionBlock&lt;string&gt;(docId=&gt;DownloadAsync(httpClient,docId),dop);

Once you have the downloader block you can start posting doc IDs to it. When done, you tell the block to complete and await for all pending messages to finish processing:

foreach(var docId in documentIds)
{
    await downloader.SendAsync(docId);
}
downloader.Complete();
await downloader.Completion;

The downloading method itself can be fully asynchronous:

async Task DownloadAsync(HttpClient client,string documentId)
{
    var url=...;
    var req=new StringContent(requestBody.ToString());
    var response=await client.PostAsync(serviceURL, req);
    ...
    using var dlStream=await response.Content.ReadAsStreamAsync();
    using var fileStream=File.Create(somePath);
    await dlStream.CopyTo(fileStream);
}

All Dataflow blocks have an input buffer to hold input messages. By default, it has no limit. Adding BoundedCapacity=10 will cause the posting code to pause if more than 10 messages are pending. This can be used to throttle publishing code that's faster than the processing code and avoid filling up memory

huangapple
  • 本文由 发表于 2023年3月20日 23:47:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/75792442.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定