Swift Combine – 有条件地返回 Future

huangapple go评论68阅读模式
英文:

Swift combine - return Future conditionally

问题

我有一个返回类型为Future<Void, Error>的函数,从API返回并保存项目。

我希望检查返回的项目数组是否为空,如果是,则调用另一个服务端点来接收/保存项目。

显然,第二次调用需要在第一次完成后有条件地发生。

我找到了使用flatMap的示例,它们似乎非常接近我想要的,但这些示例使用AnyPublisher而不是Future,尝试使用Futures似乎给我奇怪的返回类型。

你可能能告诉我只知道Combine的基础知识!如果很简单,我会将这些调用转换为async await,并有条件地调用替代端点,但是Combine在整个网络和API层中都在使用,如果可能的话,我希望使用已建立的Combine模式来完成。

如果有人有一个从函数返回条件未来的骨架示例,我将不胜感激(下面是当前函数的示例)

谢谢!

英文:

I have a function with return type Future<Void, Error> that returns and saves items from an API

What I'm hoping to do is check whether the items array returned is empty and if so make a call to another service endpoint to receive / save items

Obviously the second call needs to happen conditionally and after the first has completed

I've found examples using flatMap that seem very close to what I want but these use AnyPublisher rather than Future and trying with Futures seems to yield me strange return types

You can probably tell I only know the fundamentals of combine! If it was simple I would convert these calls to async await and make a conditional call to the alternative endpoint but combine is used throughout the network and API layers and if possible hope to do it with the established combine pattern

If anyone has a skeleton example of returning conditional futures from a function I'd be very grateful (example of current function below)

Thanks!

    func sync() -&gt; Future&lt;Void, Error&gt; {
    return Future { [weak self] promise in
        guard let strongSelf = self else { return }
        strongSelf.service.getItems
            .receive(on: strongSelf.scheduler)
            .sink {
                if case .failure(let error) = $0 {
                    promise(.failure(error))
                }
            } receiveValue: { items in
                 // if items empty make another api call
                strongSelf.saveItems(items)
               
                promise(.success(()))
            }.store(in: &amp;strongSelf.subscriptions)
    }
}

答案1

得分: 1

这里的挑战之一是订阅发布者是同步操作。你的Future会按顺序执行,而订阅你使用sink创建的会立即超出范围并被取消。

一个巧妙的解决方法是在闭包中捕获订阅并手动处理:

func sync(service: Service) -> Future<[Int], Error> {
    Future<[Int], Error> { continuation in
        var subscription: AnyCancellable?
        subscription = service.getItems()
            .flatMap { items in
                if items.isEmpty {
                    return service.generateItems().eraseToAnyPublisher()
                } else {
                    return Just(items).setFailureType(to: Error.self).eraseToAnyPublisher()
                }
            }.sink {
                if case .failure(let error) = $0 {
                    continuation(.failure(error))
                }
                subscription?.cancel()
            } receiveValue: {
                continuation(.success($0))
                subscription?.cancel()
            }
    }
}

在这里,订阅以AnyCancellable?变量的形式在sink的闭包中被捕获。这可以防止在future的闭包结束时释放(和取消)订阅。订阅必须在成功和失败的情况下在sink中被取消。

我假设你的API调用,比如service.getItems(),返回一个FutureflatMap将这次调用的结果转换为一个新的发布者。如果getItems()调用返回一个空数组,那么flatMap()创建的发布者就是由generateItems()返回的(我编造的另一个API调用)。如果getItems调用不为空,则我使用Just提供一个简单返回项目的发布者(通过一些操作符的修饰使数据类型对齐)。

对于这个例子,我编造了一个类似于下面的服务:

class Service {
    var shouldReturnItems: Bool;

    init(shouldReturnItems: Bool) {
        self.shouldReturnItems = shouldReturnItems
    }

    func getItems() -> Future<[Int], Error> {
        Future<[Int], Error> { continuation in
            DispatchQueue.main.asyncAfter(deadline: .now().advanced(by: .seconds(1))) {
                if self.shouldReturnItems {
                    continuation(.success([1, 2, 3, 4]))
                } else {
                    continuation(.success([]))
                }
            }
        }
    }

    func generateItems() -> Future<[Int], Error> {
        Future<[Int], Error> { continuation in
            DispatchQueue.main.asyncAfter(deadline: .now().advanced(by: .seconds(1))) {
                continuation(.success([5, 6, 7, 8]))
            }
        }
    }
}

你也可以使用Swift结构化并发来解决这个问题。你无需像Combine那样“转换”任何调用,因为Combine已经执行了这个操作。你仍然可以保留基于Future的接口,以供不想使用async/await的调用者使用:

func sync(service: Service) -> Future<[Int], Error> {
    Future<[Int], Error> { continuation in
        Task {
            do {
                var items = try await service.getItems().value
                if items.isEmpty {
                    items = try await service.generateItems().value
                }

                continuation(.success(items))
            } catch (let error) {
                continuation(.failure(error))
            }
        }
    }
}

我觉得这比在闭包中使用订阅的版本更易读。对外部世界来说,sync使用与服务API中使用的相同的Futures

我在Playground中完成了所有这些,并使用以下代码进行了测试:

Task {
    do {
        let items = try await sync(service: Service(shouldReturnItems: true)).value
        print("service 1 returned \(items)")
    } catch {
        print("service 1 returned error")
    }
}

Task {
    do {
        let items = try await sync(service: Service(shouldReturnItems: false)).value
        print("service 2 returned \(items)")
    } catch {
        print("service 2 returned error")
    }
}

对于不支持async value属性的iOS 14,你可能可以根据上述主题的变体进行修改:

extension Future {
    func myValue() async throws -> Output  {
        return try await withUnsafeThrowingContinuation({ continuation in
            var subscription: AnyCancellable?

            subscription = self.sink {
                subscription?.cancel()
                subscription = nil;
                if case .failure(let error) = $0 {
                    continuation.resume(throwing: error)
                }
            } receiveValue: {
                subscription?.cancel()
                subscription = nil;
                continuation.resume(returning: $0)
            }
        })
    }
}

并在调用站点使用:

if #unavailable(iOS 15) {
    await someFuture.myValue()
} else {
    await someFuture.value
}

(也许给myValue取个更好的名字)

英文:

One of the challenges here is the fact that subscribing to a publisher is a synchronous operation. Your Future, as written, is going to execute from top to bottom and end. The subscription you create with sink is going to fall out of scope immediately, and be cancelled.

One sneaky way to get around this is to capture the subscription in the closure and dispose of it manually:

func sync(service: Service) -&gt; Future&lt;[Int], Error&gt; {
    Future&lt;[Int],Error&gt; { contination in
        var subscription: AnyCancellable?
        subscription = service.getItems()
            .flatMap { items in
                if items.isEmpty {
                    return service.generateItems().eraseToAnyPublisher()
                } else {
                    return Just(items).setFailureType(to: Error.self).eraseToAnyPublisher()
                }
            }.sink {
                if case .failure(let error) = $0 {
                    contination(.failure(error))
                }
                subscription?.cancel()
            } receiveValue: {
                contination(.success($0))
                subscription?.cancel()
            }

    }
}

Here the subscription is captured in the sink's closure as an AnyCancellable? variable. This prevents it from being released (and cancelled) when the future's closure ends. The subscription must be cancelled, in the sink, in every success and failure case.

I assumed that your API calls, like service.getItems(), return a Future. The flatMap converts the results that call into a new publisher. If the getItems() call returns an empty array, then the publisher created by flatMap() is the one returned by generateItems() (another API call I invented). If the getItems call is not empty, then I use Just to provide a publisher that simply returns the items (with some decoration of operators to make the data types line up).

For this example, I made up a service that looks like this:

class Service {
    var shouldReturnItems: Bool;

    init(shouldReturnItems: Bool) {
        self.shouldReturnItems = shouldReturnItems
    }

    func getItems() -&gt;Future&lt;[Int], Error&gt; {
        Future&lt;[Int], Error&gt; { continuation in
            DispatchQueue.main.asyncAfter(deadline: .now().advanced(by: .seconds(1))) {
                if self.shouldReturnItems {
                    continuation(.success([1, 2, 3, 4]))
                } else {
                    continuation(.success([]))
                }
            }
        }
    }

    func generateItems() -&gt; Future&lt;[Int], Error&gt; {
        Future&lt;[Int], Error&gt; { continuation in
            DispatchQueue.main.asyncAfter(deadline: .now().advanced(by: .seconds(1))) {
                continuation(.success([5, 6, 7, 8]))
            }
        }
    }
}

You can also solve this problem using Swift Structured Concurrency. You don't have to "convert" any of the calls as Combine does that already. You can still preserve the Future based interface for callers that don't want to use async/await:

func sync(service: Service) -&gt; Future&lt;[Int], Error&gt; {
    Future&lt;[Int],Error&gt; { contination in
        Task {
            do {
                var items = try await service.getItems().value
                if items.isEmpty {
                    items = try await service.generateItems().value
                }

                contination(.success(items))
            } catch (let error) {
                contination(.failure(error))
            }
        }
    }
}

I find this much more readable than the version using the subscription in the closure. To the outside world, sync uses the same Futures that are used in the service's API.

I did all this in a Playground and tested it with this code:


Task {
    do {
        let items = try await sync(service: Service(shouldReturnItems: true)).value
        print(&quot;service 1 returned \(items)&quot;)
    } catch {
        print(&quot;service 1 returned error&quot;)
    }
}

Task {
    do {
        let items = try await sync(service: Service(shouldReturnItems: false)).value
        print(&quot;service 2 returned \(items)&quot;)
    } catch {
        print(&quot;service 2 returned error&quot;)
    }
}

For iOS 14 where the async value property of a future is not available you might be able to use a variation on the themes above:


extension Future {
    func myValue() async throws -&gt; Output  {
        return try await withUnsafeThrowingContinuation({ continuation in
            var subscription: AnyCancellable?

            subscription = self.sink {
                subscription?.cancel()
                subscription = nil;
                if case .failure(let error) = $0 {
                    continuation.resume(throwing: error)
                }
            } receiveValue: {
                subscription?.cancel()
                subscription = nil;
                continuation.resume(returning: $0)
            }
        })
    }
}

and at the call site:

if #unavailable(iOS 15) {
    await someFuture.myValue()
} else {
    await someFuture.value
}

(perhaps giving myValue a nicer name)

答案2

得分: 1

有几种解决方法。如果你实际上不需要知道作业何时完成,那么可以使你的 sync 返回 Void 并执行以下操作:

func syncʹ() {
    service.getItems
        .flatMap { items in
            items.isEmpty
            ? anotherAPICall().map { items }.eraseToAnyPublisher()
            : Just(items).setFailureType(to: Error.self).eraseToAnyPublisher()
        }
        .sink {
            if case .failure(let error) = $0 {
                // 处理失败
            }
        } receiveValue: { [weak self] items in
            guard let self else { return }
            self.saveItems(items)
        }
        .store(in: &subscriptions)
}

如果你希望 sync 仍然告诉你何时完成,以供某些下游流程使用,可以使用类似以下的方法:

func sync() -> AnyPublisher<Void, Error> {
    return service.getItems
        .flatMap { items in
            items.isEmpty
            ? anotherAPICall().map { items }.eraseToAnyPublisher()
            : Just(items).setFailureType(to: Error.self).eraseToAnyPublisher()
        }
        .flatMap { [unowned self] items in
            self.saveItemsʹ(items)
        }
        .eraseToAnyPublisher()
}

func saveItemsʹ(_ items: [Item]) -> Future<Void, Never> {
    Future { [unowned self] promise in
        self.saveItems(items)
        promise(.success(()))
    }
}

第一个 flatMap 将进行 API 调用并在数组为空时发出其值(或错误),否则它将只发出数组。

另一种选择是,如果你想将作业分解为逻辑单元:

func async() -> AnyPublisher<Void, Error> {
    let items = service.getItems
        .share()

    let otherCall = items
        .filter { $0.isEmpty }
        .flatMap { _ in anotherAPICall() }

    return Publishers.Merge(
        otherCall.map { [] },
        items.filter { !$0.isEmpty }
    )
    .flatMap { [unowned self] items in
        self.saveItemsʹ(items)
    }
    .eraseToAnyPublisher()
}

上述代码为每个副作用提供了自己的响应对象。

英文:

There's a couple of ways to solve this. If you don't actually need to know when the job is finished, then you can make your sync return Void and do this:

func syncʹ() {
	service.getItems
		.flatMap { items in
			items.isEmpty
			? anotherAPICall().map { items }.eraseToAnyPublisher()
			: Just(items).setFailureType(to: Error.self).eraseToAnyPublisher()
		}
		.sink {
			if case .failure(let error) = $0 {
				// handle failure
			}
		} receiveValue: { [weak  self] items in
			guard let self else { return }
			self.saveItems(items)
		}
		.store(in: &amp;subscriptions)
}

If you want sync to still tell you when it's done for some downstream process then something like this:

func sync() -&gt; AnyPublisher&lt;Void, Error&gt; {
	return service.getItems
		.flatMap { items in
			items.isEmpty
			? anotherAPICall().map { items }.eraseToAnyPublisher()
			: Just(items).setFailureType(to: Error.self).eraseToAnyPublisher()
		}
		.flatMap { [unowned self] items in
			self.saveItemsʹ(items)
		}
		.eraseToAnyPublisher()
}

func saveItemsʹ(_ items: [Item]) -&gt; Future&lt;Void, Never&gt; {
	Future { [unowned self] promise in
		self.saveItems(items)
		promise(.success(()))
	}
}

The flatMap (first one) will make the api call and emit it's value (or error) if the array is empty, otherwise it will just emit th array.

Another option if you want to break up the job into logical units:

func async() -&gt; AnyPublisher&lt;Void, Error&gt; {
	let items = service.getItems
		.share()

	let otherCall = items
		.filter { $0.isEmpty }
		.flatMap { _ in anotherAPICall() }

	return Publishers.Merge(
		otherCall.map { [] },
		items.filter { !$0.isEmpty }
	)
	.flatMap { [unowned self] items in
		self.saveItemsʹ(items)
	}
	.eraseToAnyPublisher()
}

The above gives each side effect it's own response object.

huangapple
  • 本文由 发表于 2023年4月20日 03:55:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/76058350.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定