如何在Golang Kafka 10中设置消费者从特定偏移量开始?

huangapple go评论105阅读模式
英文:

How to set consumer to start from a specific offset in Golang Kafka 10

问题

我的需求是在生产者在崩溃之前处理的最后一条消息处重新启动。幸运的是,我只有一个主题,一个分区和一个消费者。

为了实现这一点,我尝试了https://github.com/Shopify/sarama,但似乎还不可用。
我现在正在使用https://godoc.org/github.com/bsm/sarama-cluster,它允许我提交每条消息的偏移量。

我无法检索到最后提交的偏移量
我无法弄清楚如何使sarama的消费者从该偏移量开始。到目前为止,我找到的唯一参数是Config.Producer.Offsets.Initial

  1. 如何检索最后提交的偏移量?
  2. 如何使消费者从已提交偏移量的最后一条消息开始?OffsetNewest将使其从最后一条已生产的消息开始,而不是消费者最后处理的消息。
  3. 是否可以仅使用Shopify/sarama而不是bsm/sarama-cluster来实现?

提前感谢

P.S. 我正在使用Kafka 10.0,因此偏移量存储在Kafka中而不是Zookeeper中。

编辑1:
部分解决方案:从sarama.OffsetOldest开始获取所有消息,然后跳过所有已处理的消息,直到找到未处理的消息为止。

英文:

My need is to make the producer to start from the last message it processed before it crashed. Fortunately I am in the case of having only one topic, with one partition and one consumer.

To do so I tried https://github.com/Shopify/sarama but it doesn't seems to be available yet.
I am now using https://godoc.org/github.com/bsm/sarama-cluster, which allow me to commit every message offset.

I cannot retrieve the last committed offset
I cannot figure out how to make a sarama consumer to start from said offset. The only parameter I've found so far is Config.Producer.Offsets.Initial.

  1. How to retrieve the last committed offset?
  2. How to make the consumer start from the last message whose offset has been committed? OffsetNewest will make it start from the last message produced, not the last processed b the consumer.
  3. Is it possible to do so using only Shopify/sarama and not bsm/sarama-cluster ?

Thank in advance

P.S. I am using Kafka 10.0, so the offsets are stores in a kafka and not in zookeeper.

EDIT1:
Partial solution: fetch all the messages since sarama.OffsetOldest and skip all of them until we found a non processed one.

答案1

得分: 1

如果已经为分区保存了偏移量,sarama-cluster将从该偏移量恢复消费。只有在没有保存的偏移量时(消费者组的第一次运行),才会使用Config.Producer.Offsets.Initial选项。

您可以在main()函数的开头添加以下行来验证这一点:

sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags)

然后您将在输出中看到类似以下内容:

cluster/consumer CID-17db1be4-a162-411c-a106-4d198191176a consume sample/0 from 12

其中的12是Sarama将从该分区(sample/0)开始的偏移量。

英文:

If offset was already saved for a partition, sarama-cluster will resume consumption from that offset. The Config.Producer.Offsets.Initial option is used only if no saved offset is present (first run for a consumer group).

You can verify this by adding the following line at the beginning of your main() function:

sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags)

Then you'll see something like the following in the output:

cluster/consumer CID-17db1be4-a162-411c-a106-4d198191176a consume sample/0 from 12

The 12 in that is the offset Sarama is going to start from for that partition (sample/0).

huangapple
  • 本文由 发表于 2017年2月1日 23:46:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/41983936.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定