英文:
Disposable object is already disposed when the inner scope is not yet finished
问题
I want to write an IEnumerable extension to execute a Select
in parallel, but with a maximum number of parallel instances. Furthermore I want this extension to have a return value of IEnumerable<Task<TOutput>>
and not Task<IEnumerable<TOutput>>
.
I tried this using a semaphore. To cancel the whole execution I also provided a CancellationTokenSource.
public static IEnumerable<Task<TOutput>> SelectParallel<TInput, TOutput>(
this IEnumerable<TInput> inputList,
Func<TInput, Task<TOutput>> selectFunction,
int numberOfParallelTasks = 50)
{
// Cancellation source to cancel all tasks if one task fails.
using var cancellationTokenSource = new CancellationTokenSource();
// Limit the number of parallel tasks.
using var semaphore = new SemaphoreSlim(numberOfParallelTasks);
return inputList
.Select(async input =>
{
try
{
// Wait until a slot is available, to only execute numberOfParallelTasks tasks in parallel.
await semaphore.WaitAsync(cancellationTokenSource.Token);
return await selectFunction(input);
}
catch (Exception)
{
// Activates the cancellation token for all tasks, when one task fails.
cancellationTokenSource.Cancel();
throw;
}
finally
{
semaphore.Release();
}
})
// ToList() is required to dispose the semaphore and the cancellation token source,
// otherwise the select can be executed in an outer scope, when the elements are already disposed.
.ToList();
}
I then wrote a test to make sure this function behaves correctly:
[TestMethod]
public async Task SelectParallelShouldOnlyCallThreeTimesInParallel()
{
// Arrange
var timer = new Stopwatch();
timer.Start();
var enumerable = new[] { 1, 2, 3, 4, 5, 6 };
async Task<long> TestFunction(int i)
{
await Task.Delay(100);
return timer.ElapsedMilliseconds;
}
// Act
var result = (await Task.WhenAll(enumerable
.SelectParallel(TestFunction, 2)))
.ToList();
// Arrange
var first = result.Take(2).Average();
var middle = result.Skip(2).Take(2).Average();
var last = result.Skip(4).Take(2).Average();
var middleToFirstDiff = middle - first;
var lastToMiddleDiff = last - middle;
middleToFirstDiff.Should().BeGreaterThan(100);
lastToMiddleDiff.Should().BeGreaterThan(100);
}
This test will fail - even worse it will end up in an endless loop. The reason is, that the semaphore
is disposed, before the last Select
block has executed its finally block. When I temporary remove the using keyword from semaphore
the test will pass.
Is there something wrong with my code or is this a general issue / misunderstanding of Disposable objects and different execution scopes?
英文:
I want to write an IEnumerable extension to execute a Select
in parallel, but with a maximum number of parallel instances. Furthermore I want this extension to have a return value of IEnumerable<Task<TOutput>>
and not Task<IEnumerable<TOutput>>
.
I tried this using a semaphore. To cancel the whole execution I also provided a CancellationTokenSource.
public static IEnumerable<Task<TOutput>> SelectParallel<TInput, TOutput>(
this IEnumerable<TInput> inputList,
Func<TInput, Task<TOutput>> selectFunction,
int numberOfParallelTasks = 50)
{
// Cancellation source to cancel all tasks if one task fails.
using var cancellationTokenSource = new CancellationTokenSource();
// Limit the number of parallel tasks.
using var semaphore = new SemaphoreSlim(numberOfParallelTasks);
return inputList
.Select(async input =>
{
try
{
// Wait until a slot is available, to only execute numberOfParallelTasks tasks in parallel.
await semaphore.WaitAsync(cancellationTokenSource.Token);
return await selectFunction(input);
}
catch (Exception)
{
// Activates the cancellation token for all tasks, when one task fails.
cancellationTokenSource.Cancel();
throw;
}
finally
{
semaphore.Release();
}
})
// ToList() is required to dispose the semaphore and the cancellation token source,
// otherwise the select can be executed in an outer scope, when the elements are already disposed.
.ToList();
}
I then wrote a test to make sure this function behaves correctly:
[TestMethod]
public async Task SelectParallelShouldOnlyCallThreeTimesInParallel()
{
// Arrange
var timer = new Stopwatch();
timer.Start();
var enumerable = new[] { 1, 2, 3, 4, 5, 6 };
async Task<long> TestFunction(int i)
{
await Task.Delay(100);
return timer.ElapsedMilliseconds;
}
// Act
var result = (await Task.WhenAll(enumerable
.SelectParallel(TestFunction, 2)))
.ToList();
// Arrange
var first = result.Take(2).Average();
var middle = result.Skip(2).Take(2).Average();
var last = result.Skip(4).Take(2).Average();
var middleToFirstDiff = middle - first;
var lastToMiddleDiff = last - middle;
middleToFirstDiff.Should().BeGreaterThan(100);
lastToMiddleDiff.Should().BeGreaterThan(100);
}
This test will fail - even worse it will end up in an endless loop. The reason is, that the semaphore
is disposed, before the last Select
block has executed its finally block. When I temporary remove the using keyword from semaphore
the test will pass.
Is there something wrong with my code or is this a general issue / misunderstanding of Disposable objects and different execution scopes?
答案1
得分: 1
Walking through your code, it:
- 创建了一个
SemaphoreSlim
。 - 通过输入序列进行
Select
,这将惰性地创建任务序列。 - 调用了
ToList
,这会强制运行所有的Select
委托,从而创建了任务列表。 - 在块的末尾隐式地释放了
SemaphoreSlim
。
问题在于,即使任务仍在运行,信号量也被释放了。然而,您希望该方法返回仍在运行的任务序列。
要解决此问题,您可以选择:
- 只需删除
using
。我认为(但不能百分之百确定),在这种情况下,SemaphoreSlim
的释放不会产生任何影响,因为此用法不涉及操作系统信号量。 - 管理信号量的引用计数,然后仅在引用计数达到零时才释放它。请注意,这必须是线程安全的;如果您希望其他工具处理引用计数,可以使用我的 Nito.Disposables 库。
您的逻辑还存在另一个潜在问题:不能保证返回的任务将按顺序完成,甚至按顺序启动 selectFunction
,因为 SemaphoreSlim
不保证FIFO顺序。
英文:
Walking through your code, it:
- Creates a
SemaphoreSlim
. Selects
through an input sequence, which will lazily create a sequence of tasks.- Calls
ToList
, which forces all theSelect
delegates to run. This creates a list of tasks. - Disposes the
SemaphoreSlim
(implicitly at the end of the block).
The problem is that the semaphore is disposed even though the tasks are still running. However, you want the method to return a sequence of tasks that are still running.
To resolve, you can either:
- Just remove the
using
. I think (but am not 100% sure) that theSemaphoreSlim
disposal won't do anything in this case since there's no OS semaphore involved with this usage. - Manage a reference count for the semaphore and then only dispose it when the reference count reaches zero. Note that this has to be threadsafe; you can use my Nito.Disposables library if you want something else to handle the reference counting for you.
You still have another potential problem with your logic: there's no guarantee that the returned tasks will complete in order, or even start the selectFunction
in order, because a SemaphoreSlim
does not guarantee FIFO ordering.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论