Kafka消费者使用confluent-kafka-go更改偏移量。

huangapple go评论84阅读模式
英文:

kafka consumer with confluent-kafka-go change offset

问题

我使用以下配置创建了一个新的消费者:

c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": addresses,
    "group.id":          "my_group",
    "auto.offset.reset": "earliest",
})
topic := "testTopic"
if err = c.SubscribeTopics([]string{topic}, nil); err != nil {
    panic(err)
}

然后,我根据以下代码生成事件并消费一个事件:

events := []map[string]string{
    {
        "name": "Foo",
    },
    {
        "name": "Bar",
    },
}

err = p.ProduceEvent(events[0]) // 有一个包装器来生成事件
err = p.ProduceEvent(events[1])

res, err := c.ReadMessage(100 * time.Second)
time.Sleep(20 * time.Second)

c.Close()

当我使用以下命令描述该消费者组时:
watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe
每个步骤的结果如下:

  1. 在生成事件后:
    Kafka消费者使用confluent-kafka-go更改偏移量。

  2. 当我消费一个事件时:
    Kafka消费者使用confluent-kafka-go更改偏移量。

  3. 在关闭消费者后:
    Kafka消费者使用confluent-kafka-go更改偏移量。

我不明白为什么最后偏移量为零!我只消费了一个事件。对我来说,Close改变了偏移量,这是奇怪的行为。如果有任何线索,我将不胜感激。

英文:

I create a new consumer with this configuration:

c, err := kafka.NewConsumer(&kafka.ConfigMap{
                 "bootstrap.servers": addresses,
                 "group.id":          "my_group",
                 "auto.offset.reset": "earliest",
         })
topic := "testTopic"
if err = c.SubscribeTopics([]string{topic}, nil); err != nil {
    panic(err)
}

Then I produce events based on the following code and consume one event:

events := []map[string]string{
{                                     
        "name":       "Foo",
},                                                 
{                                                  
        "name":       "Bar",                                                       
},                                                 
}                                                          
                                                                                                    
err = p.ProduceEvent(events[0])//there is a wrapper to produce events       
err = p.ProduceEvent(events[1])                                    
                                                                                   
res, err := c.ReadMessage(100 * time.Second)                                              
time.Sleep(20 * time.Second)                                                       
                                                                                                              
c.Close()                                  

when I describe the group with
watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe. The results in each step is:

  1. after producing the events:
    Kafka消费者使用confluent-kafka-go更改偏移量。

  2. when I consume one event:
    Kafka消费者使用confluent-kafka-go更改偏移量。

  3. after closing the consumer:
    Kafka消费者使用confluent-kafka-go更改偏移量。

I can't understand why finally the lag is zero! I just consumed one events. It's weird behaviour to me, that Close would be change the offset. Any clue is appreciated.

答案1

得分: 2

ReadMessage包装了PollPoll获取一批消息并在本地进行缓存。由于您已将消费者配置为自动提交偏移量,它将提交所有获取到的消息,即使这些消息在本地缓存,并且您的应用程序尚未处理。这就是为什么在关闭消费者后您看不到延迟。

librdkafka(因此confluent-kafka-go)没有一种方法来配置max.pool.records,因此如果您想精确控制哪些偏移量被提交,您需要禁用自动提交偏移量,并使用StoreOffsets手动提交它们:https://github.com/confluentinc/confluent-kafka-go/issues/380#issuecomment-539903016

英文:

ReadMessage wraps Poll. Poll gets a batch of messages and buffers them locally. Since you have configured your consumer to auto commit offsets, it will commit all the fetched messages, even the ones that are locally cached and that your application has still not processed. That is why you see that there is no lag after closing the consumer.

librdkafka (and thus confluent-kafka-go) does not have a way to configure max.pool.records so if you want to control exactly which offsets get commmited, you'll need to disable auto commit offsets and commit them manually using StoreOffsets: https://github.com/confluentinc/confluent-kafka-go/issues/380#issuecomment-539903016

huangapple
  • 本文由 发表于 2022年7月27日 05:16:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/73129832.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定