在任务组中等待所有任务的Swift异步操作。

huangapple go评论63阅读模式
英文:

Swift Async awaiting all tasks in task group

问题

我正在使用一个任务组来包装对一个长时间运行的异步方法的重复调用。由于我可能需要对该方法进行多次调用,希望它们能够并行运行。最终,我有一些常规的同步代码,需要阻塞,直到所有这些异步方法都被调用完毕。

奇怪的是,与我看到的其他任务组示例不同,我实际上不需要异步方法的返回值。它们执行操作并将它们写入磁盘。

是否有办法简化这段代码?

let swiftAsyncSemaphore = DispatchSemaphore(value: 0)
let taskGroupTask = Task {
    await withThrowingTaskGroup(of: ASRJob.self) { group in
        for path in filePathsToWorkOn {
            group.addTask {
                return try await doHardWork(atPath: path)
            }
        }
        
        do {
            for try await _ in group {
                // 目前我们不存储这些信息
            }
        } catch {
            // 目前我们不存储错误
        }
    }
    swiftAsyncSemaphore.signal()
}
swiftAsyncSemaphore.wait()

我看到的大多数示例在组结束时使用 map/combine 函数,但我不需要数据,所以我应该如何等待它们都完成?

关于此代码运行的上下文:
此代码在 Synchronous Operation(NSOperation)的 main() 块中运行,该操作放入一个 OperationQueue 中。因此,调用 wait 的块不在 Swift 异步代码中。目标是阻塞操作的完成,直到此 Swift 异步工作完成,而且可能运行很长时间。

英文:

I am using a task group to wrap repeated calls to a long running async method. Since I may have many calls to this method that need to happen the hope is that they would run in parallel. At the end of the day I have some regular synchronous code that needs to block until all of these asyncs are called.

What's weird is unlike other examples of task groups I have seen I actually do not need the values from the async throws method. They do actions and write them to disk.

Is there not a way to clean up this code?

let swiftAsyncSemaphor = DispatchSemaphore(value: 0)
let taskGroupTask = Task {
    await withThrowingTaskGroup(of: ASRJob.self) { group in
        for path in filePathsToWorkOn {
            group.addTask {
                return try await doHardWork(atPath: path)
            }
        }
        
        do {
            for try await _ in group {
                // For now we do not store this information
            }
        } catch {
            // For now we do not store the error
        }
    }
    swiftAsyncSemaphor.signal()
}
swiftAsyncSemaphor.wait()

Most examples I see use a map/combine function at the end on the group but I don't need the data so how can I just await for all of them to finish?

Context on where this code will run:
This code is run within the main() block of a Synchronous Operation (NSOperation) that goes within an OperationQueue. So the block calling wait is not in swift async code. The goal is to block the Operations completion until this swift async work is done knowing that it may very well run for a long time.

答案1

得分: 3

如果您在异步上下文中使用 semaphore.wait(),它将产生以下警告:

> 实例方法 'wait' 在异步上下文中不可用;请等待任务句柄;这是 Swift 6 中的错误

我假设您必须在异步上下文之外等待此信号量,否则它会产生上述警告。


您不能在 async-await 任务之间的依赖关系中同时使用信号量。请参阅 Swift并发:幕后运作

> 诸如信号量之类的原语…与Swift并发一起使用是不安全的。这是因为它们会隐藏依赖信息,但会在代码中引入执行依赖关系。由于运行时不知道此依赖关系,因此无法做出正确的调度决策和解决它们。特别是不要使用创建无结构任务然后通过使用信号量或不安全的原语跨任务边界引入依赖关系的原语。这种代码模式意味着一个线程可以无限期地阻塞在信号量上,直到另一个线程能够解锁它。这违反了线程前进的运行时约定。


async 方法中的模式是await任务组的result

let task = Task {
    try await withThrowingTaskGroup(of: ASRJob.self) { group in
        for path in filePathsToWorkOn {
            group.addTask {
                try await doHardWork(atPath: path)
            }
        }

        try await group.waitForAll()
    }
}

_ = await task.result

对于抛出任务组,我们通常也会使用waitForAll


是的,您在 Operation 中可以在技术上使用 wait,但即使这也是一种反模式。通常我们会编写一个“并发”操作。请参阅 Operation 文档 中关于“并发操作”的讨论。或者参见 尝试理解异步操作子类

英文:

If you use semaphore.wait() from within an asynchronous context, it will produce this warning:

>Instance method 'wait' is unavailable from asynchronous contexts; Await a Task handle instead; this is an error in Swift 6

I assume that you must be waiting for this semaphore outside of a asynchronous context, otherwise it would have produced the above warning.


You cannot use semaphores for dependencies in conjunction between async-await tasks. See Swift concurrency: Behind the scenes:

> [Primitives] like semaphores ... are unsafe to use with Swift concurrency. This is because they hide dependency information from the Swift runtime, but introduce a dependency in execution in your code. Since the runtime is unaware of this dependency, it cannot make the right scheduling decisions and resolve them. In particular, do not use primitives that create unstructured tasks and then retroactively introduce a dependency across task boundaries by using a semaphore or an unsafe primitive. Such a code pattern means that a thread can block indefinitely against the semaphore until another thread is able to unblock it. This violates the runtime contract of forward progress for threads.


The pattern in async methods would be to await the result of the task group.

let task = Task {
    try await withThrowingTaskGroup(of: ASRJob.self) { group in
        for path in filePathsToWorkOn {
            group.addTask {
                try await doHardWork(atPath: path)
            }
        }

        try await group.waitForAll()
    }
}

_ = await task.result

For throwing tasks groups, we would also often waitForAll.


Yes, you can technically wait in an Operation, but even that is an anti-pattern. We would generally write a “concurrent” operation. See discussion of “concurrent operations” in the Operation documentation. Or see Trying to Understand Asynchronous Operation Subclass.

huangapple
  • 本文由 发表于 2023年7月28日 00:00:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/76781545.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定