Kafka消费者重新启动后,有时会重置偏移量。

huangapple go评论80阅读模式
英文:

Kafka offset resets sometimes after consumer restart

问题

我有一个具有4个分区和4个具有相同消费者组ID的消费者的Kafka主题。
所有消费者每天晚上都会重新启动,第一个在早上4点,第二个在早上4点15分,其他的在早上4点30分和4点45分(这是我们管理员的规定,我无能为力)。
有些天,某些分区的偏移量会重置,消费者在重新启动后会从偏移量0开始读取所有消息。有些天则不会发生。有什么想法,问题出在哪里?
消费者使用Go语言编写。

Kafka配置:

conf := kafka.ReaderConfig{
    Brokers:  cfg.Kafka.BootstrapServers,
    GroupID:  cfg.Kafka.ConsumerGroup,
    Topic:    cfg.Kafka.Topic,
    MinBytes: 10e3, // 10KB
    MaxBytes: 32e6, // 10MB
}

读取消息:

for {
    m, err := reader.ReadMessage(context.Background())
    if err != nil {
        go log.Error(componentName, actionName, -123753, err.Error(), "Ошибка чтения сообщения с кафки", "", "", string(m.Value), nil)
        continue
    }
    log.Warn(componentName, actionName, "Обработка сообщения", "START", "", "", map[string]interface{}{"offset": m.Offset, "partition": m.Partition}, nil)
    var msg model.KafkaMessage

    err = json.Unmarshal(m.Value, &msg)
    if err != nil {
        go log.Error(componentName, actionName, -123754, err.Error(), "Ошибка парсинга сообщения", "", "", string(m.Value), nil)
        continue
    }

    _service := service.NewService(log, cfg)
    currentNumAttempts := 0 
    maxNumAttempt := 3      

    for currentNumAttempts < maxNumAttempt {
        currentNumAttempts++
        err = _service.UpdateNumberInColvir(msg)
        if err != nil {
            continue
        }
        break
    }
}
英文:

I have a Kafka topic with 4 partitions and 4 consumers with same Consumer Group ID.
All consumers restart every night, first at 4:00 a.m., second at 4:15 a.m and others at 4:30 and 4:45 (it's our admins' rules I can do nothing with it).
Some days offset for some partitions reset and consumers after restart read all the messages starting with offset 0. Some days it not happens. Any ideas where the problem is?
Consumers are in Go.

Kafka config:

conf := kafka.ReaderConfig{
		Brokers:  cfg.Kafka.BootstrapServers,
		GroupID:  cfg.Kafka.ConsumerGroup,
		Topic:    cfg.Kafka.Topic,
		MinBytes: 10e3, // 10KB
		MaxBytes: 32e6, // 10MB
	}

Read the message:

for {
		m, err := reader.ReadMessage(context.Background())
		if err != nil {
			go log.Error(componentName, actionName, -123753, err.Error(), &quot;Ошибка чтения сообщения с кафки&quot;, &quot;&quot;, &quot;&quot;, string(m.Value), nil)
			continue
		}
		log.Warn(componentName, actionName, &quot;Обработка сообщения&quot;, &quot;START&quot;, &quot;&quot;, &quot;&quot;, map[string]interface{}{&quot;offset&quot;: m.Offset, &quot;partition&quot;: m.Partition}, nil)
		var msg model.KafkaMessage

		err = json.Unmarshal(m.Value, &amp;msg)
		if err != nil {
			go log.Error(componentName, actionName, -123754, err.Error(), &quot;Ошибка парсинга сообщения&quot;, &quot;&quot;, &quot;&quot;, string(m.Value), nil)
			continue
		}

		_service := service.NewService(log, cfg)
		currentNumAttempts := 0 
		maxNumAttempt := 3      

		for currentNumAttempts &lt; maxNumAttempt {
			currentNumAttempts++
			err = _service.UpdateNumberInColvir(msg)
			if err != nil {
				continue
			}
			break
		}

	}

答案1

得分: 1

根据你的评论*And cleanup.policy is DELETE*,可能会发生以下情况。

__consumer_offsets主题跟踪消费者组每个主题分区的最后提交的偏移量。

cleanup.policy = delete意味着在retention.ms之后,__consumer_offsets主题分区日志将被删除。

因此,如果你的消费者在retention.ms时间段内没有消费任何消息,你的__consumer_offset主题将不会被更新,并且偏移量主题中的所有消息将被删除。然后,在__consumer_offsets主题中没有任何关于你的消费者组的记录。因此,Kafka认为它是主题分区的一个新的消费者组,并开始从头读取消息。

注意 - cleanup.policy应该设置为compact以用于__consumer.offsets主题。

英文:

According to your comment And cleanup.policy is DELETE Following can be happened.

__consumer_offsets topic keeps the track of last committed offset per topic partition of your consumer group.

cleanup.policy = delete means your __consumer_offsets topic partition logs delete after retention.ms.

So, your consumers not consumed any message for retention.ms period, your __consumer_offset topic will not be updated and all message in the offset topic will be deleted. Then there is no any track to your consumer group in __consumer_offsets topic. So, Kafka thinks it is as a new consumer group to the topic partition and start to read messages from start.

NOTE - cleanup.policy should be compact for __consumer.offsets topic.

huangapple
  • 本文由 发表于 2021年6月22日 14:11:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/68078401.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定