英文:
kafka consumer with confluent-kafka-go change offset
问题
我使用以下配置创建了一个新的消费者:
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": addresses,
"group.id": "my_group",
"auto.offset.reset": "earliest",
})
topic := "testTopic"
if err = c.SubscribeTopics([]string{topic}, nil); err != nil {
panic(err)
}
然后,我根据以下代码生成事件并消费一个事件:
events := []map[string]string{
{
"name": "Foo",
},
{
"name": "Bar",
},
}
err = p.ProduceEvent(events[0]) // 有一个包装器来生成事件
err = p.ProduceEvent(events[1])
res, err := c.ReadMessage(100 * time.Second)
time.Sleep(20 * time.Second)
c.Close()
当我使用以下命令描述该消费者组时:
watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe
每个步骤的结果如下:
我不明白为什么最后偏移量为零!我只消费了一个事件。对我来说,Close
改变了偏移量,这是奇怪的行为。如果有任何线索,我将不胜感激。
英文:
I create a new consumer with this configuration:
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": addresses,
"group.id": "my_group",
"auto.offset.reset": "earliest",
})
topic := "testTopic"
if err = c.SubscribeTopics([]string{topic}, nil); err != nil {
panic(err)
}
Then I produce events based on the following code and consume one event:
events := []map[string]string{
{
"name": "Foo",
},
{
"name": "Bar",
},
}
err = p.ProduceEvent(events[0])//there is a wrapper to produce events
err = p.ProduceEvent(events[1])
res, err := c.ReadMessage(100 * time.Second)
time.Sleep(20 * time.Second)
c.Close()
when I describe the group with
watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe
. The results in each step is:
I can't understand why finally the lag is zero! I just consumed one events. It's weird behaviour to me, that Close
would be change the offset. Any clue is appreciated.
答案1
得分: 2
ReadMessage
包装了Poll
。Poll
获取一批消息并在本地进行缓存。由于您已将消费者配置为自动提交偏移量,它将提交所有获取到的消息,即使这些消息在本地缓存,并且您的应用程序尚未处理。这就是为什么在关闭消费者后您看不到延迟。
librdkafka
(因此confluent-kafka-go
)没有一种方法来配置max.pool.records
,因此如果您想精确控制哪些偏移量被提交,您需要禁用自动提交偏移量,并使用StoreOffsets
手动提交它们:https://github.com/confluentinc/confluent-kafka-go/issues/380#issuecomment-539903016
英文:
ReadMessage
wraps Poll
. Poll
gets a batch of messages and buffers them locally. Since you have configured your consumer to auto commit offsets, it will commit all the fetched messages, even the ones that are locally cached and that your application has still not processed. That is why you see that there is no lag after closing the consumer.
librdkafka
(and thus confluent-kafka-go
) does not have a way to configure max.pool.records
so if you want to control exactly which offsets get commmited, you'll need to disable auto commit offsets and commit them manually using StoreOffsets
: https://github.com/confluentinc/confluent-kafka-go/issues/380#issuecomment-539903016
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论