英文:
How to change offset of a topic during runtime?
问题
我有一个为 Kafka 主题提供消息的生产者,并且还有另一个服务从主题中读取这些消息。
我有一个业务用例,有时消费者需要忽略队列中已经存在的所有消息,只处理新的即将到来的消息。在不停止和重新启动 Kafka 服务器的情况下,是否可以实现这一点?
我正在使用 GO 语言进行开发。所以如果 Kafka 支持这样的需求,是否有办法通过 sarama GO 客户端更改消费者的配置,从最新的消息开始消费。
提前感谢你的帮助。
英文:
I have a Producer for kafka topic which keeps on pushing some messages to kafka topic. And also I have another service reading these messages from topic.
I have an business use-case, where sometimes consumer need to ignore all the messages which are already there in queue and start processing only new upcoming messages. Can this be archived without stopping and restarting the kafka server.
I am working on GO. So if kafka supports such requirement, is there any way I can change configuration of consumer to start consuming from latest message using sarama GO client.
Thank you in advance.
答案1
得分: 1
你可以使用随机UUID作为消费者组ID,并/或者禁用自动提交,然后可以从最新的偏移量开始。
config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
(从Sarama示例代码中复制)
否则,Kafka消费者API应该有一个seekToEnd
函数,但在Sarama中似乎是通过获取每个分区的高水位标记,然后在ConsumerGroup实例上调用ResetOffsets
来实现的。注意:在执行此操作之前,应该暂停消费者组。
英文:
You could use a random UUID for consumer group id, and/or disable auto commits, then you can start at the latest offset with
config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
(copied from Sarama example code)
Otherwise, Kafka consumer API should have a seekToEnd function, but it seems to be exposed in Sarama as getting high watermarks from consumer for every partition, then calling ResetOffets on a ConsumerGroup instance. Note: the group should be paused before doing that.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论