
huangapple go评论110阅读模式

Custom Combine Publisher for a single database listener



var listener: DatabaseListener?

self.listener = db.items.addListener { items in 


// later when I don't need the listener anymore:




class ItemsSubscription: Subscription {
    private var subscriber: AnySubscriber<[Item], Never>?
    private var listener: DatabaseListener?
    private var items: [Item] = UserDefaults.standard.cacheItems

    init(subscriber: AnySubscriber<[Item], Never>) {
        self.subscriber = subscriber

    func request(_ demand: Subscribers.Demand) {
        let _ = subscriber?.receive(items)

        self.listener = db.items.addListener {
            UserDefaults.standard.cacheItems = $0
            self.items = $0
            let _ = self.subscriber?.receive($0)

    func cancel() {
        self.listener = nil
        self.subscriber = nil

struct ItemsPublisher: Publisher {
    typealias Output = [Item]
    typealias Failure = Never

    func receive<S>(subscriber: S) where S: Subscriber, S.Input == [Item], S.Failure == Never {
        let subscription = ItemsSubscription(subscriber: subscriber)
        subscriber.receive(subscription: subscription)


private var cancellables: Set<AnyCancellable> = []

    .sink { items in






I have an iOS app with a custom database, To retrieve data from my database I setup a listener like this:

var listener: DatabaseListener?

self.listener = db.items.addListener { items in 


// later when I don&#39;t need the listener any more:

This listener gives the items as soon as I set it up and notifies me whenever my data is updated, It also stays alive until I manually cancel it. I also store a cache of the retrieved items in UserDefaults to speed things up (See it in action in the example bellow).

Now I'm trying to start using Combine to retrieve my items, I want to setup the database listener as soon as a new subscription is created (for example when sink or assign are called) and cancel it when there's no more subscriptions left.

So here's what I came up with:

class ItemsSubscription: Subscription {
    private var subscriber: (any Subscriber&lt;[Item], Never&gt;)?
    private var listener: DatabaseListener?
    private var items: [Item] = UserDefaults.standard.cacheItems

    init(subscriber: any Subscriber&lt;[Item], Never&gt;) {
        self.subscriber = subscriber

    func request(_ demand: Subscribers.Demand) {
        let _ = subscriber?.receive(items)

        self.listener = db.items.addListener {
            UserDefaults.standard.cacheItems = $0
            self.items = $0
            let _ = self.subscriber?.receive($0)

    func cancel() {
        self.listener = nil
        self.subscriber = nil

struct ItemsPublisher: Publisher {
	typealias Output = [Item]
	typealias Failure = Never

    func receive&lt;S&gt;(subscriber: S) where S: Subscriber, S.Input == [Item], S.Failure == Never {
		let subscription = ItemsSubscription(subscriber: subscriber)
		subscriber.receive(subscription: subscription)

Then I'm using ItemsPublisher like this:

private var cancellables: Set&lt;AnyCancellable&gt; = []

    .sink { items in


Currently this method is working but it's creating a new database listener (which is an expensive resource) for every ItemsPublisher I create. Instead I want to maintain a single database listener while I have a least 1 subscriber and I want any following subscriber to receive the latest items from the same subscription.

I considered creating a single ItemsPublisher instance and using it throughout the app, but later subscribers didn't receive any data at all.

I also considered using CurrentValueSubject (or a @Published property) to store the items but I couldn't figure out when to setup database listener or when to cancel it for that matter.

Any help or advice would be appreciated.


得分: 1

> Instead I want to maintain a single database listener while I have a least 1 subscriber and I want any following subscriber to receive the latest items from the same subscription.

这是share()的用途。 查看文档以获取更多信息。



> Instead I want to maintain a single database listener while I have a least 1 subscriber and I want any following subscriber to receive the latest items from the same subscription.

That's exactly what share() is for. View the documentation for more information.

You might also want to consider using multicast with a CurrentValueSubject depending on the situation.

  • 本文由 发表于 2023年2月17日 23:31:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/75486267.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
