英文:
How do I make sure that I process one message at a time at most?
问题
我想知道如何在Go中使用Google的pub/sub功能逐条处理消息。我正在使用官方库进行开发,库的链接是https://pkg.go.dev/cloud.google.com/go/pubsub#section-readme。该事件被多个实例运行的服务消费,因此任何内存锁定机制都无法使用。
我意识到这是一种反模式,所以让我解释一下我的用例。使用mongoDB,我将一个对象数组作为每个实体的嵌入式文档进行存储。发布的事件会修改该数组的部分并保存。如果我同时接收到多个事件并且它们开始同时处理,保存操作将互相覆盖。因此,我考虑的解决方案是确保一次只处理一条消息,并且最好能够使用cloud pub/sub中的任何内置功能来实现。否则,我考虑在数据库中实现一些锁定机制,但我希望避免这样做。
如果有任何帮助,将不胜感激。
英文:
I am wondering how to process one message at a time using Googles pub/sub functionality in Go. I am using the official library for this, https://pkg.go.dev/cloud.google.com/go/pubsub#section-readme. The event is being consumed by a service that runs with multiple instances, so any in memory locking mechanism will not work.
I realise that it's an anti-pattern to do this, so let me explain my use-case. Using mongoDB I store an array of objects as an embedded document for each entity. The event being published is modifying parts of this array and saves it. If I receive more than one event at a time and they start processing exactly at the same time, one of the saves will override the other. So I was thinking a solution for this is to make sure that only one message will be processed at a time, and it would be nice to use any built-in functionality in cloud pub/sub to do so. Otherwise I was thinking of implementing some locking mechanism in the DB but i'd like to avoid that.
Any help would be appreciated.
答案1
得分: 2
你可以想象两种情况:
- 你可以在PubSub中使用排序键(ordering key)。这样,与同一对象相关的所有消息将按顺序逐个传递。
- 你可以使用PUSH订阅到PubSub,将消息推送到Cloud Run或Cloud Functions。对于Cloud Run,将并发设置为1(对于Cloud Functions gen1,默认情况下也是1),并将最大实例数也设置为1。这样,你每次只能处理一条消息,所有其他消息将被拒绝(HTTP错误代码429)并重新排队到PubSub。问题是,你无法像之前使用排序键那样并行处理。
一个类似且更简单的实现方式是使用Cloud Tasks而不是PubSub。使用Cloud Tasks,你可以在队列上设置速率限制,并将maxConcurrentDispatches
设置为1(无需对Cloud Functions的最大实例数或Cloud Run的最大实例数和并发性进行相同设置)。
英文:
You can imagine 2 things:
- You can use ordering key in PubSub. Like that, all the message in relation with the same object will be delivered in order and one by one.
- You can use a PUSH subscription to PubSub, to push to Cloud Run or Cloud Functions. With Cloud Run, set the concurrency to 1 (it's by default with Cloud Functions gen1), and set the max instance to 1 also. Like that you can process only one message at a time, all the other message will be rejected (429 HTTP error code) and will be requeued to PubSub. The problem is that you can parallelize the processing as before with ordering key
A similar thing, and simpler to implement, is to use Cloud Tasks instead of PubSub. With Cloud Tasks you can set a rate limit on a queue, and set the maxConcurrentDispatches
to 1 (and you haven't to do the same with Cloud Functions max instances or Cloud Run max instances and concurrency)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论