英文:
How to set consumer to start from a specific offset in Golang Kafka 10
问题
我的需求是在生产者在崩溃之前处理的最后一条消息处重新启动。幸运的是,我只有一个主题,一个分区和一个消费者。
为了实现这一点,我尝试了https://github.com/Shopify/sarama,但似乎还不可用。
我现在正在使用https://godoc.org/github.com/bsm/sarama-cluster,它允许我提交每条消息的偏移量。
我无法检索到最后提交的偏移量
我无法弄清楚如何使sarama的消费者从该偏移量开始。到目前为止,我找到的唯一参数是Config.Producer.Offsets.Initial
。
- 如何检索最后提交的偏移量?
- 如何使消费者从已提交偏移量的最后一条消息开始?
OffsetNewest
将使其从最后一条已生产的消息开始,而不是消费者最后处理的消息。 - 是否可以仅使用Shopify/sarama而不是bsm/sarama-cluster来实现?
提前感谢
P.S. 我正在使用Kafka 10.0,因此偏移量存储在Kafka中而不是Zookeeper中。
编辑1:
部分解决方案:从sarama.OffsetOldest开始获取所有消息,然后跳过所有已处理的消息,直到找到未处理的消息为止。
英文:
My need is to make the producer to start from the last message it processed before it crashed. Fortunately I am in the case of having only one topic, with one partition and one consumer.
To do so I tried https://github.com/Shopify/sarama but it doesn't seems to be available yet.
I am now using https://godoc.org/github.com/bsm/sarama-cluster, which allow me to commit every message offset.
I cannot retrieve the last committed offset
I cannot figure out how to make a sarama consumer to start from said offset. The only parameter I've found so far is Config.Producer.Offsets.Initial
.
- How to retrieve the last committed offset?
- How to make the consumer start from the last message whose offset has been committed?
OffsetNewest
will make it start from the last message produced, not the last processed b the consumer. - Is it possible to do so using only Shopify/sarama and not bsm/sarama-cluster ?
Thank in advance
P.S. I am using Kafka 10.0, so the offsets are stores in a kafka and not in zookeeper.
EDIT1:
Partial solution: fetch all the messages since sarama.OffsetOldest and skip all of them until we found a non processed one.
答案1
得分: 1
如果已经为分区保存了偏移量,sarama-cluster将从该偏移量恢复消费。只有在没有保存的偏移量时(消费者组的第一次运行),才会使用Config.Producer.Offsets.Initial
选项。
您可以在main()
函数的开头添加以下行来验证这一点:
sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags)
然后您将在输出中看到类似以下内容:
cluster/consumer CID-17db1be4-a162-411c-a106-4d198191176a consume sample/0 from 12
其中的12是Sarama将从该分区(sample/0)开始的偏移量。
英文:
If offset was already saved for a partition, sarama-cluster will resume consumption from that offset. The Config.Producer.Offsets.Initial
option is used only if no saved offset is present (first run for a consumer group).
You can verify this by adding the following line at the beginning of your main()
function:
sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags)
Then you'll see something like the following in the output:
cluster/consumer CID-17db1be4-a162-411c-a106-4d198191176a consume sample/0 from 12
The 12 in that is the offset Sarama is going to start from for that partition (sample/0).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论