使用异步方式收集发布者值

huangapple go评论62阅读模式
英文:

Collecting Publisher values with async

问题

测试收集传递值的函数() 异步抛出错误 {
    // 在真实测试中,这是被注入到被测试单元中的。
    let subject = PassthroughSubject<Int, Never>()

    // 我期望这个只有在`subject`完成后才会完成。我使用了`async let`所以我可以通过`subject`推送一些数据,然后稍后`await collectValues`,希望得到被`subject`发布的东西。在真实测试中,这是被测试单元的一个属性,该属性对`subject`运行各种操作。
    async let collectValues = await subject.values.reduce(into: []) { $0.append($1) }

    // 发送一些数据通过`subject`,然后`.finish`它。
    subject.send(10)
    subject.send(20)
    subject.send(completion: .finished)

    // 等待值,这样我们可以检查我们得到了期望的东西。
    let values = await collectValues

    // 这个断言失败了...
    XCTAssertEqual(values, [10, 20])
}
英文:

I've been writing some unit tests of some Combine code we have. I've run in to some issues. I think I've simplified the various pieces in to this test. NB: This isn't a test – it's me trying to understand why one of the tests doesn't work!

func test_collectingPassthroughValues() async throws {
    // In the real test this is injected in to the unit under test.
    let subject = PassthroughSubject&lt;Int, Never&gt;()

    // I&#39;m expecting this to only complete once `subject` finishes. I&#39;ve used
    // `async let` so I can poke some data through `subject` and then later on
    // `await collectValues` to hopefully get back the stuff published by 
    // `subject`. In the real test this is a property from the unit under test
    // which runs various operators on `subject`.
    async let collectValues = await subject.values.reduce(into: []) { $0.append($1) }

    // Send some data through `subject` and then `.finish` it.
    subject.send(10)
    subject.send(20)
    subject.send(completion: .finished)

    // Await the values so we can check we got what&#39;s expected.
    let values = await collectValues

    // This fails…
    XCTAssertEqual(values, [10, 20])
}

The assertion fails with:

est_collectingPassthroughValues(): XCTAssertTrue failed - Found difference for 
Different count:
 |	Received: (0) []
 |	Expected: (2) [10, 20]

So subject.values seems to get nothing at all; I don't know why?

Thanks!

答案1

得分: 3

以下是翻译好的内容:

发生的情况相当简单。如何正确编写它则不太清楚,我的建议是“不要这样做”。

首先,有一个不是问题但是小问题:

async let collectValues = await subject.values.reduce(into: []) { $0.append($1) }

你不应该在这里使用 await。如果没有其他问题,这可能会成为问题。

根本问题是 PassthroughSubject 如果没有订阅者就会丢弃消息。在你当前的代码中,这肯定会发生,但也很难修复。

// 去掉多余的 await

async let collectValues = subject.values.reduce(into: []) { $0.append($1) }

// 那一行与下面这行代码相似:

let collectValues = Task {
    var values: [Int] = []
    for await value in subject.values {
        values.append(value)
    }
    return values
}

问题在于这会启动一个任务,可能不会立即开始。所以你的下一行代码 subject.send(10) 没有订阅者(甚至还没有到达 for-await 行),它会被丢弃。

你可以通过在创建任务后插入 try await Task.sleep(for: .seconds(1)) 来解决问题,但这并没有太大帮助。PassthroughSubject 没有缓冲。当你调用 append 时,没有任何监听。该值将被丢弃,你将丢失 20。

你可以通过缓冲来改善情况,但仍然需要休眠(在我看来是不可接受的)。以下内容对我来说非常可靠:

func test_collectingPassthroughValues() async throws {
    // 在真实测试中,这是注入到被测试单元中的。
    let subject = PassthroughSubject<Int, Never>()
    let readSubject = subject.buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)

    async let collectValues = readSubject.values.reduce(into: []) { $0.append($1) }

    try await Task.sleep(for: .seconds(1))
    subject.send(10)
    subject.send(20)
    subject.send(completion: .finished)

    // 等待值以检查我们得到了预期的内容。
    let values = await collectValues

    XCTAssertEqual(values, [10, 20])
}

但在我看来,这是一种完全错误的方法。

我不会尝试混合 PassthroughSubject 和 .values。我只是看不到使其稳健的方法。更广泛地说,我建议在混合使用 Combine 和结构化并发时要非常小心。它们通常对事物的工作方式有非常不同的看法。

英文:

What is happening is fairly straightforward. How to write it correctly is much less clear, and my recommendation is "don't do this."

First, a minor issue that isn't the problem:

async let collectValues = await subject.values.reduce(into: []) { $0.append($1) }

You shouldn't use await here. That probably would be a problem if there weren't other problems.

The fundamental problem is that PassthroughSubject drops messages if there's no subscriber. In your current code, that's absolutely going to happen, but it's also really hard to fix.

// Taking out the extra `await`
async let collectValues = subject.values.reduce(into: []) { $0.append($1) }

// That line is pretty close to:
    let collectValues = Task {
        var values: [Int] = []
        for await value in subject.values {
            values.append(value)
        }
        return values
    }

The problem is that this kicks off a task that may not start immediately. So your next line of code, subject.send(10) has no subscriber (it hasn't even gotten to the for-await line), and it's just thrown away.

You can kind of fix it by throwing in a try await Task.sleep(for: .seconds(1)) after creating the Task, but it doesn't help much. There's no buffering on PassthroughSubject. While you're calling append, there is nothing listening. The value will be thrown away and you'll drop the 20.

You can improve things by buffering, but you'll still need to sleep (which is unacceptable IMO). The following, nonetheless, is very reliable for me:

func test_collectingPassthroughValues() async throws {
    // In the real test this is injected in to the unit under test.
    let subject = PassthroughSubject&lt;Int, Never&gt;()
    let readSubject = subject.buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)

    async let collectValues = readSubject.values.reduce(into: []) { $0.append($1) }

    try await Task.sleep(for: .seconds(1))
    subject.send(10)
    subject.send(20)
    subject.send(completion: .finished)

    // Await the values so we can check we got what&#39;s expected.
    let values = await collectValues

    XCTAssertEqual(values, [10, 20])
}

But IMO, this is a completely broken approach.

I would not try to mix PassthroughSubject with .values. I just don't see any way to make it robust. More broadly, I recommend being very careful mixing Combine and Structured Concurrency. They tend to have very different ideas about how things are supposed to work.

huangapple
  • 本文由 发表于 2023年5月17日 22:16:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/76273105.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定