Disposable object is already disposed when the inner scope is not yet finished.

huangapple go评论71阅读模式
英文:

Disposable object is already disposed when the inner scope is not yet finished

问题

I want to write an IEnumerable extension to execute a Select in parallel, but with a maximum number of parallel instances. Furthermore I want this extension to have a return value of IEnumerable<Task<TOutput>> and not Task<IEnumerable<TOutput>>.

I tried this using a semaphore. To cancel the whole execution I also provided a CancellationTokenSource.

public static IEnumerable<Task<TOutput>> SelectParallel<TInput, TOutput>(
    this IEnumerable<TInput> inputList,
    Func<TInput, Task<TOutput>> selectFunction,
    int numberOfParallelTasks = 50)
{
    // Cancellation source to cancel all tasks if one task fails.
    using var cancellationTokenSource = new CancellationTokenSource();

    // Limit the number of parallel tasks.
    using var semaphore = new SemaphoreSlim(numberOfParallelTasks);

    return inputList
        .Select(async input =>
        {
            try
            {
                // Wait until a slot is available, to only execute numberOfParallelTasks tasks in parallel.
                await semaphore.WaitAsync(cancellationTokenSource.Token);

                return await selectFunction(input);
            }
            catch (Exception)
            {
                // Activates the cancellation token for all tasks, when one task fails.
                cancellationTokenSource.Cancel();

                throw;
            }
            finally
            {
                semaphore.Release();
            }
        })
        // ToList() is required to dispose the semaphore and the cancellation token source,
        // otherwise the select can be executed in an outer scope, when the elements are already disposed.
        .ToList();
}

I then wrote a test to make sure this function behaves correctly:

[TestMethod]
public async Task SelectParallelShouldOnlyCallThreeTimesInParallel()
{
    // Arrange
    var timer = new Stopwatch();
    timer.Start();

    var enumerable = new[] { 1, 2, 3, 4, 5, 6 };
    async Task<long> TestFunction(int i)
    {
        await Task.Delay(100);

        return timer.ElapsedMilliseconds;
    }

    // Act
    var result = (await Task.WhenAll(enumerable
        .SelectParallel(TestFunction, 2)))
        .ToList();

    // Arrange
    var first = result.Take(2).Average();
    var middle = result.Skip(2).Take(2).Average();
    var last = result.Skip(4).Take(2).Average();

    var middleToFirstDiff = middle - first;
    var lastToMiddleDiff = last - middle;

    middleToFirstDiff.Should().BeGreaterThan(100);
    lastToMiddleDiff.Should().BeGreaterThan(100);
}

This test will fail - even worse it will end up in an endless loop. The reason is, that the semaphore is disposed, before the last Select block has executed its finally block. When I temporary remove the using keyword from semaphore the test will pass.

Is there something wrong with my code or is this a general issue / misunderstanding of Disposable objects and different execution scopes?

英文:

I want to write an IEnumerable extension to execute a Select in parallel, but with a maximum number of parallel instances. Furthermore I want this extension to have a return value of IEnumerable<Task<TOutput>> and not Task<IEnumerable<TOutput>>.

I tried this using a semaphore. To cancel the whole execution I also provided a CancellationTokenSource.

public static IEnumerable<Task<TOutput>> SelectParallel<TInput, TOutput>(
    this IEnumerable<TInput> inputList,
    Func<TInput, Task<TOutput>> selectFunction,
    int numberOfParallelTasks = 50)
{
    // Cancellation source to cancel all tasks if one task fails.
    using var cancellationTokenSource = new CancellationTokenSource();

    // Limit the number of parallel tasks.
    using var semaphore = new SemaphoreSlim(numberOfParallelTasks);

    return inputList
        .Select(async input =>
        {
            try
            {
                // Wait until a slot is available, to only execute numberOfParallelTasks tasks in parallel.
                await semaphore.WaitAsync(cancellationTokenSource.Token);

                return await selectFunction(input);
            }
            catch (Exception)
            {
                // Activates the cancellation token for all tasks, when one task fails.
                cancellationTokenSource.Cancel();

                throw;
            }
            finally
            {
                semaphore.Release();
            }
        })
        // ToList() is required to dispose the semaphore and the cancellation token source,
        // otherwise the select can be executed in an outer scope, when the elements are already disposed.
        .ToList();
}

I then wrote a test to make sure this function behaves correctly:

[TestMethod]
public async Task SelectParallelShouldOnlyCallThreeTimesInParallel()
{
    // Arrange
    var timer = new Stopwatch();
    timer.Start();

    var enumerable = new[] { 1, 2, 3, 4, 5, 6 };
    async Task<long> TestFunction(int i)
    {
        await Task.Delay(100);

        return timer.ElapsedMilliseconds;
    }

    // Act
    var result = (await Task.WhenAll(enumerable
        .SelectParallel(TestFunction, 2)))
        .ToList();

    // Arrange
    var first = result.Take(2).Average();
    var middle = result.Skip(2).Take(2).Average();
    var last = result.Skip(4).Take(2).Average();

    var middleToFirstDiff = middle - first;
    var lastToMiddleDiff = last - middle;

    middleToFirstDiff.Should().BeGreaterThan(100);
    lastToMiddleDiff.Should().BeGreaterThan(100);
}

This test will fail - even worse it will end up in an endless loop. The reason is, that the semaphore is disposed, before the last Select block has executed its finally block. When I temporary remove the using keyword from semaphore the test will pass.

Is there something wrong with my code or is this a general issue / misunderstanding of Disposable objects and different execution scopes?

答案1

得分: 1

Walking through your code, it:

  • 创建了一个 SemaphoreSlim
  • 通过输入序列进行 Select,这将惰性地创建任务序列。
  • 调用了 ToList,这会强制运行所有的 Select 委托,从而创建了任务列表。
  • 在块的末尾隐式地释放了 SemaphoreSlim

问题在于,即使任务仍在运行,信号量也被释放了。然而,您希望该方法返回仍在运行的任务序列。

要解决此问题,您可以选择:

  • 只需删除 using。我认为(但不能百分之百确定),在这种情况下,SemaphoreSlim 的释放不会产生任何影响,因为此用法不涉及操作系统信号量。
  • 管理信号量的引用计数,然后仅在引用计数达到零时才释放它。请注意,这必须是线程安全的;如果您希望其他工具处理引用计数,可以使用我的 Nito.Disposables 库。

您的逻辑还存在另一个潜在问题:不能保证返回的任务将按顺序完成,甚至按顺序启动 selectFunction,因为 SemaphoreSlim 不保证FIFO顺序。

英文:

Walking through your code, it:

  • Creates a SemaphoreSlim.
  • Selects through an input sequence, which will lazily create a sequence of tasks.
  • Calls ToList, which forces all the Select delegates to run. This creates a list of tasks.
  • Disposes the SemaphoreSlim (implicitly at the end of the block).

The problem is that the semaphore is disposed even though the tasks are still running. However, you want the method to return a sequence of tasks that are still running.

To resolve, you can either:

  • Just remove the using. I think (but am not 100% sure) that the SemaphoreSlim disposal won't do anything in this case since there's no OS semaphore involved with this usage.
  • Manage a reference count for the semaphore and then only dispose it when the reference count reaches zero. Note that this has to be threadsafe; you can use my Nito.Disposables library if you want something else to handle the reference counting for you.

You still have another potential problem with your logic: there's no guarantee that the returned tasks will complete in order, or even start the selectFunction in order, because a SemaphoreSlim does not guarantee FIFO ordering.

huangapple
  • 本文由 发表于 2023年4月13日 22:25:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/76006607.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定