英文:
Kafka offset resets sometimes after consumer restart
问题
我有一个具有4个分区和4个具有相同消费者组ID的消费者的Kafka主题。
所有消费者每天晚上都会重新启动,第一个在早上4点,第二个在早上4点15分,其他的在早上4点30分和4点45分(这是我们管理员的规定,我无能为力)。
有些天,某些分区的偏移量会重置,消费者在重新启动后会从偏移量0开始读取所有消息。有些天则不会发生。有什么想法,问题出在哪里?
消费者使用Go语言编写。
Kafka配置:
conf := kafka.ReaderConfig{
Brokers: cfg.Kafka.BootstrapServers,
GroupID: cfg.Kafka.ConsumerGroup,
Topic: cfg.Kafka.Topic,
MinBytes: 10e3, // 10KB
MaxBytes: 32e6, // 10MB
}
读取消息:
for {
m, err := reader.ReadMessage(context.Background())
if err != nil {
go log.Error(componentName, actionName, -123753, err.Error(), "Ошибка чтения сообщения с кафки", "", "", string(m.Value), nil)
continue
}
log.Warn(componentName, actionName, "Обработка сообщения", "START", "", "", map[string]interface{}{"offset": m.Offset, "partition": m.Partition}, nil)
var msg model.KafkaMessage
err = json.Unmarshal(m.Value, &msg)
if err != nil {
go log.Error(componentName, actionName, -123754, err.Error(), "Ошибка парсинга сообщения", "", "", string(m.Value), nil)
continue
}
_service := service.NewService(log, cfg)
currentNumAttempts := 0
maxNumAttempt := 3
for currentNumAttempts < maxNumAttempt {
currentNumAttempts++
err = _service.UpdateNumberInColvir(msg)
if err != nil {
continue
}
break
}
}
英文:
I have a Kafka topic with 4 partitions and 4 consumers with same Consumer Group ID.
All consumers restart every night, first at 4:00 a.m., second at 4:15 a.m and others at 4:30 and 4:45 (it's our admins' rules I can do nothing with it).
Some days offset for some partitions reset and consumers after restart read all the messages starting with offset 0. Some days it not happens. Any ideas where the problem is?
Consumers are in Go.
Kafka config:
conf := kafka.ReaderConfig{
Brokers: cfg.Kafka.BootstrapServers,
GroupID: cfg.Kafka.ConsumerGroup,
Topic: cfg.Kafka.Topic,
MinBytes: 10e3, // 10KB
MaxBytes: 32e6, // 10MB
}
Read the message:
for {
m, err := reader.ReadMessage(context.Background())
if err != nil {
go log.Error(componentName, actionName, -123753, err.Error(), "Ошибка чтения сообщения с кафки", "", "", string(m.Value), nil)
continue
}
log.Warn(componentName, actionName, "Обработка сообщения", "START", "", "", map[string]interface{}{"offset": m.Offset, "partition": m.Partition}, nil)
var msg model.KafkaMessage
err = json.Unmarshal(m.Value, &msg)
if err != nil {
go log.Error(componentName, actionName, -123754, err.Error(), "Ошибка парсинга сообщения", "", "", string(m.Value), nil)
continue
}
_service := service.NewService(log, cfg)
currentNumAttempts := 0
maxNumAttempt := 3
for currentNumAttempts < maxNumAttempt {
currentNumAttempts++
err = _service.UpdateNumberInColvir(msg)
if err != nil {
continue
}
break
}
}
答案1
得分: 1
根据你的评论*And cleanup.policy is DELETE
*,可能会发生以下情况。
__consumer_offsets
主题跟踪消费者组每个主题分区的最后提交的偏移量。
cleanup.policy = delete
意味着在retention.ms
之后,__consumer_offsets
主题分区日志将被删除。
因此,如果你的消费者在retention.ms
时间段内没有消费任何消息,你的__consumer_offset
主题将不会被更新,并且偏移量主题中的所有消息将被删除。然后,在__consumer_offsets
主题中没有任何关于你的消费者组的记录。因此,Kafka认为它是主题分区的一个新的消费者组,并开始从头读取消息。
注意 - cleanup.policy
应该设置为compact
以用于__consumer.offsets
主题。
英文:
According to your comment And cleanup.policy is DELETE
Following can be happened.
__consumer_offsets
topic keeps the track of last committed offset per topic partition of your consumer group.
cleanup.policy = delete
means your __consumer_offsets
topic partition logs delete after retention.ms
.
So, your consumers not consumed any message for retention.ms
period, your __consumer_offset
topic will not be updated and all message in the offset topic will be deleted. Then there is no any track to your consumer group in __consumer_offsets
topic. So, Kafka thinks it is as a new consumer group to the topic partition and start to read messages from start.
NOTE - cleanup.policy
should be compact
for __consumer.offsets
topic.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论