英文:
Multi instance microservice process a unique Kafka message more than one time when Kafka rebalance the partitions
问题
在生产环境中,我们有一个包含两个实例的微服务,它们从同一个主题中消费,但使用相同的组 ID(不同的客户端 ID)。
因此,如果我们有5个分区,第一个实例会接管3个分区,而另一个实例会接管2个分区。
但是在生产环境中部署时,
- 我们关闭第一个实例。
- 然后第一个实例重新启动。
- 然后我们部署第二个实例并关闭它。
- 最后第二个实例重新启动。
但是这个过程会导致Kafka在实例关闭时重新平衡分区。
问题是在此之后,我们会将来自Kafka分区的唯一消息处理两次(我检查了Kafka分区中没有重复消息,消息是唯一的)。
我认为在重新平衡和处理期间,一个实例无法成功提交一些消息,另一个实例会再次处理它。请注意,我们已经设置了
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
结果是来自Kafka的唯一消息会被处理两次。
如何解决这个问题?
此外,当我们从Kafka中读取消息后,我们会将它们存储在数据库中,然后提交这些消息到Kafka。
我想知道如何解决这个问题...
如果我使用“事务性消费”,是否可以解决这个问题?
如果两个实例同时使用相同的消息开始事务,会发生什么?
例如,
实例A在t1时间戳开始事务,带有id 1,2,3,4,5。
实例B在t2时间戳(比t1晚一些毫秒)开始事务,带有id 4,5,6,7,8,9。
对于相同的id 4,5的消息会发生什么?
此外,我认为如果我使用例如分布式Redis缓存并检查这些id,这是否是一个不错的选择。
英文:
In production, we have a microservice with two instances and consume from the same topic but with the same group id(and different clientId).
As a result, if we have 5 partitions, the first instance takes three and the other one takes two.
But when we deploy in production,
- We shut down the first instance.
- Then the first instance is up and running
- Then we deploy the second instance and shut down it,
- Finally and the second instance is up and running.
But this process makes Kafka rebalance the partitions when an instance is shut down
The problem is after that we process a unique message from the Kafka partition twice(I check there is no duplicate in Kafka partitions and the message is unique).
I think so during rebalancing and process time an instance does not achieve to commit some messages and the other instance processes it again. Note we have set
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
The result is a unique message from Kafka we process it twice.
How can resolve this issue?
Also when we read messages from Kafka we store them in a database and then commit the messages into Kafka.
I am curious how can resolve this issue...
If I use transactional consume
could I resolve this?
And what about if two instances begin transactions with the same messages?
For example
Instance A begins the transaction at t1 timestamp with ids 1,2,3,4 ,5
Instance B begins the transaction at the t2 timestamp(some ms after t1) with ids 4,5,6,7,8,9.
What happened with the messages with the same ids 4,5??
Also, I thought if it is a good choice can handle it by me using for example distributing redis cache and checking the ids.
答案1
得分: 2
-
通过使您的处理具有幂等性来处理重复读取。例如,在数据库中检查是否已经存储了数据,如果是,则丢弃记录。
-
自己处理偏移量提交。在Java客户端的KafkaConsumer类的Storing Offsets Outside Kafka部分中有一个很好的解释。基本上,它允许您原子地处理偏移量和实际数据存储到数据库中。请注意,您需要自己处理重新平衡事件,这里有一个如何做的示例:ConsumerRebalanceListener。
看起来您正在使用.NET客户端,所以等效的方法是使用ConsumerBuilder#SetPartitionsRevokedHandler。
英文:
You have two options:
-
Deal with the duplicate reads by making your processing idempotent. Eg check in your database whether you've already stored the data and drop the record if so.
-
Handle the offset commits yourself. There's a good explanation in the Storing Offsets Outside Kafka section of the KafkaConsumer class of the Java Client. Basically, it will allow you to transact atomically both the offset and the actual data in your database. Note that you'll need to handle the rebalance events yourself, there's an example of how to do that here: ConsumerRebalanceListener.
It seems that you're using the .NET client, so the equivalent would be to use ConsumerBuilder#SetPartitionsRevokedHandler.
答案2
得分: 0
只返回翻译好的部分:
至少需要在自行处理偏移提交时期望进行一次处理,但仅在成功处理记录后才进行提交。具体来说,如果您的消费者在拉取消息后但在提交之前重新平衡,它将不得不在重新平衡时寻找到最后提交的偏移量。
是的,事务可以帮助,但仅限于一个消费者会话。如果您重新平衡到一个全新的实例而不进行提交,整个事务将再次被消耗。您需要将此逻辑与自己的数据库事务处理结合起来。
解决此问题的方法是要么更频繁地提交,要么将已处理的值集中存储在高度可用的数据存储中。但除非您希望Redis更快,否则没有添加Redis的理由(从此数据存储中查找每个事件可能导致更高的消费者延迟和网络IO,从而增加了消费者重新平衡的可能性,因此您需要增加轮询超时配置)。
英文:
At least once processing is expected if you're handling offset commits on your own, but only committing after you've successfully processed any record. Specifically, if your consumer rebalances between process post-poll, but before committing, it'll have to seek back to the last committed offset on rebalance.
Yes, transactions can help, but only within one consumer session. If you rebalance to an entirely new instance without committing, the whole transaction will be consumed again. You need to combine this logic in your own database transaction handling.
The way to workaround this is to either commit more frequently, or centrally store values that have been processed in a highly available datastore. But there's no reason to add Redis when you already have a database, unless you expect Redis to be faster. (Looking up from this datastore for every event may cause higher consumer lag and network IO, further increasing likelihood consumers will rebalance, so you'll need to increase poll timeout configuration)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论