如何在运行时更改主题的偏移量?

huangapple go评论83阅读模式
英文:

How to change offset of a topic during runtime?

问题

我有一个为 Kafka 主题提供消息的生产者,并且还有另一个服务从主题中读取这些消息。

我有一个业务用例,有时消费者需要忽略队列中已经存在的所有消息,只处理新的即将到来的消息。在不停止和重新启动 Kafka 服务器的情况下,是否可以实现这一点?

我正在使用 GO 语言进行开发。所以如果 Kafka 支持这样的需求,是否有办法通过 sarama GO 客户端更改消费者的配置,从最新的消息开始消费。

提前感谢你的帮助。

英文:

I have a Producer for kafka topic which keeps on pushing some messages to kafka topic. And also I have another service reading these messages from topic.

I have an business use-case, where sometimes consumer need to ignore all the messages which are already there in queue and start processing only new upcoming messages. Can this be archived without stopping and restarting the kafka server.

I am working on GO. So if kafka supports such requirement, is there any way I can change configuration of consumer to start consuming from latest message using sarama GO client.

Thank you in advance.

答案1

得分: 1

你可以使用随机UUID作为消费者组ID,并/或者禁用自动提交,然后可以从最新的偏移量开始。

config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest

(从Sarama示例代码中复制)

否则,Kafka消费者API应该有一个seekToEnd函数,但在Sarama中似乎是通过获取每个分区的高水位标记,然后在ConsumerGroup实例上调用ResetOffsets来实现的。注意:在执行此操作之前,应该暂停消费者组。

英文:

You could use a random UUID for consumer group id, and/or disable auto commits, then you can start at the latest offset with

config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest

(copied from Sarama example code)

Otherwise, Kafka consumer API should have a seekToEnd function, but it seems to be exposed in Sarama as getting high watermarks from consumer for every partition, then calling ResetOffets on a ConsumerGroup instance. Note: the group should be paused before doing that.

huangapple
  • 本文由 发表于 2022年9月15日 15:27:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/73727147.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定