在这个Combine asyncMap实现中,首先进先出(FIFO)是否有保证且安全?

huangapple go评论60阅读模式
英文:

Is first-in-first-out guaranteed and safe in this Combine asyncMap implementation?

问题

In our codebase, we often use async 方法封装在 Combine 流水线中的 Task 中。

在阅读了 swiftbysundell 的一篇文章后,我们在文章中实现了一个带有 asyncMap 方法的 Publisher 扩展。

然而,我们意识到它的实现方式并没有确保先进先出。经过一些尝试,我们发现如果在 flatMap 中将 maxPublisher 设置为 .max(1),似乎可以实现先进先出。至少我们的测试现在似乎是正常运行的。

然而,我们并没有在其他地方看到类似这样的实现,也没有作为 Swift 的一个简单特性。我们在思考,自问是否有特定的问题,为什么不能或者不应该以类似的方式实现。

我们在想,也在问,如果我们有遗漏什么,也许在运行一些应该按顺序进行的异步后端调用时,可能会遇到一些竞态条件或者无序调用的问题。

这是我们扩展的(非抛出异常)版本:

import Foundation
import Combine

public extension Publisher {

    /// 一个使用异步转换闭包从上游发布者转换所有元素的发布者。
    /// 警告:只有使用 `maxPublishers = .max(1)` 时,执行顺序(FIFO)才能得到保证。
    func asyncMap<T>(maxPublishers: Subscribers.Demand = .max(1), _ transform: @escaping (Output) async -> T) -> AnyPublisher<T, Failure> {
        flatMap(maxPublishers: maxPublishers) { value -> Future<T, Failure> in
            Future { promise in
                Task {
                    let result = await transform(value)
                    promise(.success(result))
                }
            }
        }
        .eraseToAnyPublisher()
    }
}

这是我们正在运行的测试,一个是顺序无关紧要的情况,第二个测试则检查了顺序:

class PublisherAsyncTests: XCTestCase {

    var cancellableSubscriber = Set<AnyCancellable>()

    override func setUp() {
        super.setUp()
        cancellableSubscriber = []
    }

    func testAsyncMap() async throws {

        let expectation = expectation(description: "testAsyncMap")

        let sequence = [1, 2, 3, 4, 5, 6]
        var resultSequence: [Int] = []

        sequence
            .publisher
            .asyncMap(maxPublishers: .unlimited) { value in
                try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...20_000_000))
                return value
            }
            .collect()
            .sink { value in
                resultSequence = value
                expectation.fulfill()
            }
            .store(in: &cancellableSubscriber)

        await fulfillment(of: [expectation])

        XCTAssertEqual(sequence, resultSequence.sorted())
    }

    func testAsyncFIFOMap() async throws {

        let expectation = expectation(description: "testAsyncMap")

        let sequence = [1, 2, 3, 4, 5, 6]
        var resultSequence: [Int] = []

        sequence
            .publisher
            .asyncMap { value in
                try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...40_000_000))
                return value
            }
            .collect()
            .sink { value in
                resultSequence = value
                expectation.fulfill()
            }
            .store(in: &cancellableSubscriber)

        await fulfillment(of: [expectation], timeout: 5)

        XCTAssertEqual(sequence, resultSequence)
    }
}
英文:

In our codebase, we were using quite often async Methods encapsulated in a Task inside a Combine pipeline.

After reading an article from swiftbysundell we implemented a Publisher extension with the asyncMap method in the article.

However, the way it was implemented we realized that it didn't guarantee that calls that came first in, came first out. After a bit of trying we figured that if we set maxPublisher in the flatMap to .max(1), we did seem to achieve a first-in-first-out. At least our tests seem to be running fine now.

However, we didn't see any implementation like this anywhere else or as a naive Swift feature, and ask ourselves if there are particular issues why this can't or shouldn't be implemented in a similar way.

We are wondering and asking if we did overlook something and maybe might run into some race conditions or unordered calls when running some async Backend calls that should be in order.

Here is the (not throwing) version of our extension:

import Foundation
import Combine

public extension Publisher {

    /// A publisher that transforms all elements from an upstream publisher using an async transform closure.
    /// Warning: The order of execution (FIFO) is only with `maxPublishers = .max(1)` guaranteed.
    func asyncMap&lt;T&gt;(maxPublishers: Subscribers.Demand = .max(1), _ transform: @escaping (Output) async -&gt; T) -&gt; AnyPublisher&lt;T, Failure&gt; {
        flatMap(maxPublishers: maxPublishers) { value -&gt; Future&lt;T, Failure&gt; in
            Future { promise in
                Task {
                    let result = await transform(value)
                    promise(.success(result))
                }
            }
        }
        .eraseToAnyPublisher()
    }
}

And here are the tests we are running, one when the order doesn't matter, and the second test where the order is checked:

class PublisherAsyncTests: XCTestCase {

    var cancellableSubscriber = Set&lt;AnyCancellable&gt;()

    override func setUp() {
        super.setUp()
        cancellableSubscriber = []
    }

    func testAsyncMap() async throws {

        let expectation = expectation(description: &quot;testAsyncMap&quot;)

        let sequence = [1, 2, 3, 4, 5, 6]
        var resultSequence: [Int] = []

        sequence
            .publisher
            .asyncMap(maxPublishers: .unlimited) { value in
                try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...20_000_000))
                return value
            }
            .collect()
            .sink { value in
                resultSequence = value
                expectation.fulfill()
            }
            .store(in: &amp;cancellableSubscriber)

        await fulfillment(of: [expectation])

        XCTAssertEqual(sequence, resultSequence.sorted())
    }

    func testAsyncFIFOMap() async throws {

        let expectation = expectation(description: &quot;testAsyncMap&quot;)

        let sequence = [1, 2, 3, 4, 5, 6]
        var resultSequence: [Int] = []

        sequence
            .publisher
            .asyncMap { value in
                try? await Task.sleep(nanoseconds: UInt64.random(in: 10_000_000...40_000_000))
                return value
            }
            .collect()
            .sink { value in
                resultSequence = value
                expectation.fulfill()
            }
            .store(in: &amp;cancellableSubscriber)

        await fulfillment(of: [expectation], timeout: 5)

        XCTAssertEqual(sequence, resultSequence)
    }
}

If you need additional information or have questions please don't hesitate to ask and I can update my question.

答案1

得分: 1

The maxPublishers parameter in flatMap:

> 指定最大并发订阅者数量,如果未指定则为unlimited

所以如果你使用.max(1)flatMap 只能一次从上游请求一个元素,等待 Future 完成,发布该值,然后再请求另一个元素。

换句话说,你在 asyncMap 中执行的所有异步操作都将按顺序执行,而不是并行执行。这意味着从上游获取的顺序被保留,不会出现竞争条件。

如果异步操作并行执行,那么无论哪个先完成,都会首先由 flatMap 发布,这就是为什么如果使用默认值 .unlimited 时不会保留顺序。

作为简单的演示,考虑以下示例:

let x = [1,2,3,4].publisher
let cancellable = x.asyncMap(maxPublishers: .max(1)) { i in
    try! await Task.sleep(for: .seconds(1))
}.sink { _ in
    print("Foo")
}
  • 这将在每个之间以1秒的延迟打印四个 Foo

  • 如果是 maxPublishers: .max(2),将在1秒后打印两个 Foo,然后再经过1秒后打印另外两个 Foo

  • 如果是 maxPublishers: .unlimited,将在1秒后打印四个 Foo

  • 如果是 maxPublishers: .none,将什么都不会打印,且发布者永远不会完成。

如果你希望异步操作并行执行,同时仍然保留顺序,那就比这复杂得多。我不能立即想出如何实现这一点。

英文:

The maxPublishers parameter in flatMap:

> Specifies the maximum number of concurrent publisher subscriptions, or unlimited if unspecified.

So if you use .max(1), flatMap can only requests one element from its upstream at a time, wait for the Future to complete, publish that value, and then request another element.

In other words, all the async operations you do in asyncMap will be carried out sequentially, not in parallel. This means that the order from the upstream is preserved, and there can be no race conditions.

If the async operations are carried out in parallel, then whichever finishes first is first published by flatMap, and that's why the order is not preserved if you use the .unlimited default.

For a simple demonstration, consider:

let x = [1,2,3,4].publisher
let cancellable = x.asyncMap(maxPublishers: .max(1)) { i in
    try! await Task.sleep(for: .seconds(1))
}.sink { _ in
    print(&quot;Foo&quot;)
}
  • This will print four Foos with 1 second delay between each one.

  • If it were maxPublishers: .max(2), it would be print two Foos after 1 second, and another two Foos after 1 second.

  • If it were maxPublishers: .unlimited, it would print four Foos after 1 second.

  • If it were maxPublishers: .none, it will print nothing and the publisher never complete.

If you want it to run the async operations in parallel, and still have the order be preserved, that is a lot less trivial than this. I cannot think off the top of my head how you'd implement this.

huangapple
  • 本文由 发表于 2023年5月17日 19:43:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/76271728.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定