Kafka消费者偏移量导出的Golang代码可以使用sharma或confluent-kafka-go库。

huangapple go评论118阅读模式
英文:

Kafka consumer offset export golang -- sharma or confluent-kafka-go lib

问题

我正在尝试找到一种在消费者组上执行偏移重置操作的方法,例如在Kafka命令中,可以像这样:

  1. kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current --export | tee topic-offset.csv
  1. kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg2 --to-current

然后根据导出的文件导入新的偏移量?

  1. kafka-consumer-groups.sh --bootstrap-server $kfk --execute --reset-offsets --topic $t --group $cg2 --from-file topic-offset.csv

导出和导入文件不是问题...只是似乎找不到获取和设置偏移量的方法。

所以,有人使用sharmaconfluent-kafka-go库来处理这个问题吗?

提前感谢任何建议 Kafka消费者偏移量导出的Golang代码可以使用sharma或confluent-kafka-go库。

英文:

I am trying to find a way of performing offsets reset operation on consumer group which for example in Kafka commands would be something like this:

  1. kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current --export | tee topic-offset.csv
  1. kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg2 --to-current

and then import new offset base on that export file ?

  1. kafka-consumer-groups.sh --bootstrap-server $kfk --execute --reset-offsets --topic $t --group $cg2 --from-file topic-offset.csv

The export import from file is not a problem ... just seems cannot find way to get and then set the offset ..

so did anyone was playing with this using sharma or confluent-kafka-go lib ?

Thanks for any suggestions in advance Kafka消费者偏移量导出的Golang代码可以使用sharma或confluent-kafka-go库。

答案1

得分: 1

好的,我已经翻译了你提供的内容:

  1. func main() {
  2. brokers := []string{BK}
  3. kfk.Logger = log.New(os.Stdout, "", log.LstdFlags)
  4. cfg := kfk.NewConfig()
  5. cfg.ClientID = CID
  6. client, _ := kfk.NewClient(brokers, cfg)
  7. //fmt.Println(client)
  8. offsetMg, _ := kfk.NewOffsetManagerFromClient(CG, client)
  9. defer offsetMg.Close()
  10. consumer, _ := kfk.NewConsumerFromClient(client)
  11. defer consumer.Close()
  12. partitions, _ := consumer.Partitions(TOPIC)
  13. for _, p := range partitions {
  14. pom, _ := offsetMg.ManagePartition(TOPIC, p)
  15. ofs, pomStr := pom.NextOffset()
  16. fmt.Printf("Partition: %v -> nextOffset: %v:%s\n", p, ofs, pomStr)
  17. }
  18. fmt.Println("--")
  19. }

这段代码给出了以下输出:

  1. Partition: 0 -> nextOffset: 31:
  2. Partition: 1 -> nextOffset: 30:
  3. Partition: 2 -> nextOffset: 45:
  4. Partition: 3 -> nextOffset: 39:
  5. Partition: 4 -> nextOffset: 45:
  6. Partition: 5 -> nextOffset: 39:
  7. Partition: 6 -> nextOffset: 37:
  8. Partition: 7 -> nextOffset: 42:
  9. Partition: 8 -> nextOffset: 43:
  10. Partition: 9 -> nextOffset: 35:
  11. Partition: 10 -> nextOffset: 41:
  12. Partition: 11 -> nextOffset: 36:

这与使用Java命令得到的结果完全相同:

  1. kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current | sort -k3 -n
  2. GROUP TOPIC PARTITION NEW-OFFSET
  3. propertest-cg1 propertest 0 31
  4. propertest-cg1 propertest 1 30
  5. propertest-cg1 propertest 2 45
  6. propertest-cg1 propertest 3 39
  7. propertest-cg1 propertest 4 45
  8. propertest-cg1 propertest 5 39
  9. propertest-cg1 propertest 6 37
  10. propertest-cg1 propertest 7 42
  11. propertest-cg1 propertest 8 43
  12. propertest-cg1 propertest 9 35
  13. propertest-cg1 propertest 10 41
  14. propertest-cg1 propertest 11 36

现在只剩下将这些数据导出到文件并使用以下函数来设置新的偏移量:

  1. // ResetOffset resets to the provided offset, alongside a metadata string that
  2. // represents the state of the partition consumer at that point in time. Reset
  3. // acts as a counterpart to MarkOffset, the difference being that it allows to
  4. // reset an offset to an earlier or smaller value, where MarkOffset only
  5. // allows incrementing the offset. cf MarkOffset for more details.
  6. ResetOffset(topic string, partition int32, offset int64, metadata string)
英文:

OK i think i found it the way just need to implement full solution but i should be good with the

  1. func main() {
  2. brokers := []string{BK}
  3. kfk.Logger = log.New(os.Stdout, "", log.LstdFlags)
  4. cfg := kfk.NewConfig()
  5. cfg.ClientID = CID
  6. client, _ := kfk.NewClient(brokers, cfg)
  7. //fmt.Println(client)
  8. offsetMg, _ := kfk.NewOffsetManagerFromClient(CG, client)
  9. defer offsetMg.Close()
  10. consumer, _ := kfk.NewConsumerFromClient(client)
  11. defer consumer.Close()
  12. partitions, _ := consumer.Partitions(TOPIC)
  13. for _, p := range partitions {
  14. pom, _ := offsetMg.ManagePartition(TOPIC, p)
  15. ofs, pomStr := pom.NextOffset()
  16. fmt.Printf("Partition: %v -> nextOffset: %v:%s\n", p, ofs, pomStr)
  17. }
  18. fmt.Println("--")
  19. }

and that gives me this output:

  1. Partition: 0 -> nextOffset: 31:
  2. Partition: 1 -> nextOffset: 30:
  3. Partition: 2 -> nextOffset: 45:
  4. Partition: 3 -> nextOffset: 39:
  5. Partition: 4 -> nextOffset: 45:
  6. Partition: 5 -> nextOffset: 39:
  7. Partition: 6 -> nextOffset: 37:
  8. Partition: 7 -> nextOffset: 42:
  9. Partition: 8 -> nextOffset: 43:
  10. Partition: 9 -> nextOffset: 35:
  11. Partition: 10 -> nextOffset: 41:
  12. Partition: 11 -> nextOffset: 36:

which is exactly same as from java command:

  1. kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current | sort -k3 -n
  2. GROUP TOPIC PARTITION NEW-OFFSET
  3. propertest-cg1 propertest 0 31
  4. propertest-cg1 propertest 1 30
  5. propertest-cg1 propertest 2 45
  6. propertest-cg1 propertest 3 39
  7. propertest-cg1 propertest 4 45
  8. propertest-cg1 propertest 5 39
  9. propertest-cg1 propertest 6 37
  10. propertest-cg1 propertest 7 42
  11. propertest-cg1 propertest 8 43
  12. propertest-cg1 propertest 9 35
  13. propertest-cg1 propertest 10 41
  14. propertest-cg1 propertest 11 36

so now only what left is to export this data to a file and use function

  1. // ResetOffset resets to the provided offset, alongside a metadata string that
  2. // represents the state of the partition consumer at that point in time. Reset
  3. // acts as a counterpart to MarkOffset, the difference being that it allows to
  4. // reset an offset to an earlier or smaller value, where MarkOffset only
  5. // allows incrementing the offset. cf MarkOffset for more details.
  6. ResetOffset(topic string, partition int32, offset int64, metadata string)

to set new offset ...

答案2

得分: 0

这是一些简单的代码,根据YMAL配置进行偏移重置。

https://github.com/nXnUs25/kfk-offsets

这是一个用于监控消费者组列表和偏移重置的命令行工具。

我们有相同的偏移量...现在为了模拟这个过程,我们将向主题发送消息,并在一个消费者组propertest-cg1a11上进行消费,我们将产生5条消息并在该消费者组上全部消费,这将给我们提供我们已经消费的信息。

^CProcessed a total of 33 messages 28 + 5

  1. ./kfkgo lag
  2. Using config file: ~/kfk-offsets/kfk-offset.yaml
  3. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
  4. propertest-cg1a11 propertest 0 183 183 0
  5. propertest-cg1a11 propertest 1 165 165 0
  6. propertest-cg1a11 propertest 2 192 192 0
  7. propertest-cg1a11 propertest 3 177 177 0
  8. propertest-cg1a11 propertest 4 192 192 0
  9. propertest-cg1a11 propertest 5 169 169 0
  10. propertest-cg1a11 propertest 6 180 180 0
  11. propertest-cg1a11 propertest 7 164 164 0
  12. propertest-cg1a11 propertest 8 195 195 0
  13. propertest-cg1a11 propertest 9 188 188 0
  14. propertest-cg1a11 propertest 10 184 184 0
  15. propertest-cg1a11 propertest 11 184 184 0
  16. TOTAL LAG: 0
  17. ./kfkgo lag -g propertest-cg -t propertest
  18. Using config file: ~/kfk-offsets/kfk-offset.yaml
  19. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
  20. propertest-cg propertest 0 179 183 4
  21. propertest-cg propertest 1 162 165 3
  22. propertest-cg propertest 2 190 192 2
  23. propertest-cg propertest 3 174 177 3
  24. propertest-cg propertest 4 187 192 5
  25. propertest-cg propertest 5 167 169 2
  26. propertest-cg propertest 6 177 180 3
  27. propertest-cg propertest 7 160 164 4
  28. propertest-cg propertest 8 192 195 3
  29. propertest-cg propertest 9 185 188 3
  30. propertest-cg propertest 10 183 184 1
  31. propertest-cg propertest 11 184 184 0
  32. TOTAL LAG: 33

现在,我们将偏移量从propertest-cg移回propertest-cg1a11,这将允许我们在该消费者组上处理相同的消息。

  1. ./kfkgo offset -m
  2. Using config file: ~/kfk-offsets/kfk-offset.yaml
  3. moving

再次验证:

Kafka命令:kafka-consumer-groups.sh

  1. propertest-cg1a11 propertest 0 179 183 4 - - -
  2. propertest-cg1a11 propertest 1 162 165 3 - - -
  3. propertest-cg1a11 propertest 2 190 192 2 - - -
  4. propertest-cg1a11 propertest 3 174 177 3 - - -
  5. propertest-cg1a11 propertest 4 187 192 5 - - -
  6. propertest-cg1a11 propertest 5 167 169 2 - - -
  7. propertest-cg1a11 propertest 6 177 180 3 - - -
  8. propertest-cg1a11 propertest 7 160 164 4 - - -
  9. propertest-cg1a11 propertest 8 192 195 3 - - -
  10. propertest-cg1a11 propertest 9 185 188 3 - - -
  11. propertest-cg1a11 propertest 10 183 184 1 - - -
  12. propertest-cg1a11 propertest 11 184 184 0 - - -
  13. Consumer group 'propertest-cg' has no active members.
  14. propertest-cg propertest 0 179 183 4 - - -
  15. propertest-cg propertest 1 162 165 3 - - -
  16. propertest-cg propertest 2 190 192 2 - - -
  17. propertest-cg propertest 3 174 177 3 - - -
  18. propertest-cg propertest 4 187 192 5 - - -
  19. propertest-cg propertest 5 167 169 2 - - -
  20. propertest-cg propertest 6 177 180 3 - - -
  21. propertest-cg propertest 7 160 164 4 - - -
  22. propertest-cg propertest 8 192 195 3 - - -
  23. propertest-cg propertest 9 185 188 3 - - -
  24. propertest-cg propertest 10 183 184 1 - - -
  25. propertest-cg propertest 11 184 184 0 - - -
  26. ./kfkgo lag -g propertest-cg -t propertest
  27. Using config file: ~/kfk-offsets/kfk-offset.yaml
  28. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
  29. propertest-cg propertest 0 179 183 4
  30. propertest-cg propertest 1 162 165 3
  31. propertest-cg propertest 2 190 192 2
  32. propertest-cg propertest 3 174 177 3
  33. propertest-cg propertest 4 187 192 5
  34. propertest-cg propertest 5 167 169 2
  35. propertest-cg propertest 6 177 180 3
  36. propertest-cg propertest 7 160 164 4
  37. propertest-cg propertest 8 192 195 3
  38. propertest-cg propertest 9 185 188 3
  39. propertest-cg propertest 10 183 184 1
  40. propertest-cg propertest 11 184 184 0
  41. TOTAL LAG: 33
  42. ./kfkgo lag
  43. Using config file: ~/kfk-offsets/kfk-offset.yaml
  44. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
  45. propertest-cg1a11 propertest 0 179 183 4
  46. propertest-cg1a11 propertest 1 162 165 3
  47. propertest-cg1a11 propertest 2 190 192 2
  48. propertest-cg1a11 propertest 3 174 177 3
  49. propertest-cg1a11 propertest 4 187 192 5
  50. propertest-cg1a11 propertest 5 167 169 2
  51. propertest-cg1a11 propertest 6 177 180 3
  52. propertest-cg1a11 propertest 7 160 164 4
  53. propertest-cg1a11 propertest 8 192 195 3
  54. propertest-cg1a11 propertest 9 185 188 3
  55. propertest-cg1a11 propertest 10 183 184 1
  56. propertest-cg1a11 propertest 11 184 184 0
  57. TOTAL LAG: 33

更多示例请参阅README。

英文:

Here is some simple up which does the offset reset base on YMAL config

https://github.com/nXnUs25/kfk-offsets

Command line tool for LAG monitoring consumer groups listing and offset reset.

We have same offsets … Now to simulate the process we will produce messages to topic and keep consuming on one of the consumer group propertest-cg1a11 we will produce 5 messages and consume them all on that consumer group which will give us info that we consumed

^CProcessed a total of 33 messages 28 + 5

  1. ./kfkgo lag
  2. Using config file: ~/kfk-offsets/kfk-offset.yaml
  3. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
  4. propertest-cg1a11 propertest 0 183 183 0
  5. propertest-cg1a11 propertest 1 165 165 0
  6. propertest-cg1a11 propertest 2 192 192 0
  7. propertest-cg1a11 propertest 3 177 177 0
  8. propertest-cg1a11 propertest 4 192 192 0
  9. propertest-cg1a11 propertest 5 169 169 0
  10. propertest-cg1a11 propertest 6 180 180 0
  11. propertest-cg1a11 propertest 7 164 164 0
  12. propertest-cg1a11 propertest 8 195 195 0
  13. propertest-cg1a11 propertest 9 188 188 0
  14. propertest-cg1a11 propertest 10 184 184 0
  15. propertest-cg1a11 propertest 11 184 184 0
  16. TOTAL LAG: 0
  17. ./kfkgo lag -g propertest-cg -t propertest
  18. Using config file: ~/kfk-offsets/kfk-offset.yaml
  19. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
  20. propertest-cg propertest 0 179 183 4
  21. propertest-cg propertest 1 162 165 3
  22. propertest-cg propertest 2 190 192 2
  23. propertest-cg propertest 3 174 177 3
  24. propertest-cg propertest 4 187 192 5
  25. propertest-cg propertest 5 167 169 2
  26. propertest-cg propertest 6 177 180 3
  27. propertest-cg propertest 7 160 164 4
  28. propertest-cg propertest 8 192 195 3
  29. propertest-cg propertest 9 185 188 3
  30. propertest-cg propertest 10 183 184 1
  31. propertest-cg propertest 11 184 184 0
  32. TOTAL LAG: 33

Now we will move the offset from propertest-cg back again to propertest-cg1a11 which will allow us to process same messages on that CG.

  1. ./kfkgo offset -m
  2. Using config file: ~/kfk-offsets/kfk-offset.yaml
  3. moving

and verification again :

Kafka commands: kafka-consumer-groups.sh

  1. propertest-cg1a11 propertest 0 179 183 4 - - -
  2. propertest-cg1a11 propertest 1 162 165 3 - - -
  3. propertest-cg1a11 propertest 2 190 192 2 - - -
  4. propertest-cg1a11 propertest 3 174 177 3 - - -
  5. propertest-cg1a11 propertest 4 187 192 5 - - -
  6. propertest-cg1a11 propertest 5 167 169 2 - - -
  7. propertest-cg1a11 propertest 6 177 180 3 - - -
  8. propertest-cg1a11 propertest 7 160 164 4 - - -
  9. propertest-cg1a11 propertest 8 192 195 3 - - -
  10. propertest-cg1a11 propertest 9 185 188 3 - - -
  11. propertest-cg1a11 propertest 10 183 184 1 - - -
  12. propertest-cg1a11 propertest 11 184 184 0 - - -
  13. Consumer group 'propertest-cg' has no active members.
  14. propertest-cg propertest 0 179 183 4 - - -
  15. propertest-cg propertest 1 162 165 3 - - -
  16. propertest-cg propertest 2 190 192 2 - - -
  17. propertest-cg propertest 3 174 177 3 - - -
  18. propertest-cg propertest 4 187 192 5 - - -
  19. propertest-cg propertest 5 167 169 2 - - -
  20. propertest-cg propertest 6 177 180 3 - - -
  21. propertest-cg propertest 7 160 164 4 - - -
  22. propertest-cg propertest 8 192 195 3 - - -
  23. propertest-cg propertest 9 185 188 3 - - -
  24. propertest-cg propertest 10 183 184 1 - - -
  25. propertest-cg propertest 11 184 184 0 - - -
  26. ./kfkgo lag -g propertest-cg -t propertest
  27. Using config file: ~/kfk-offsets/kfk-offset.yaml
  28. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
  29. propertest-cg propertest 0 179 183 4
  30. propertest-cg propertest 1 162 165 3
  31. propertest-cg propertest 2 190 192 2
  32. propertest-cg propertest 3 174 177 3
  33. propertest-cg propertest 4 187 192 5
  34. propertest-cg propertest 5 167 169 2
  35. propertest-cg propertest 6 177 180 3
  36. propertest-cg propertest 7 160 164 4
  37. propertest-cg propertest 8 192 195 3
  38. propertest-cg propertest 9 185 188 3
  39. propertest-cg propertest 10 183 184 1
  40. propertest-cg propertest 11 184 184 0
  41. TOTAL LAG: 33
  42. ./kfkgo lag
  43. Using config file: ~/kfk-offsets/kfk-offset.yaml
  44. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
  45. propertest-cg1a11 propertest 0 179 183 4
  46. propertest-cg1a11 propertest 1 162 165 3
  47. propertest-cg1a11 propertest 2 190 192 2
  48. propertest-cg1a11 propertest 3 174 177 3
  49. propertest-cg1a11 propertest 4 187 192 5
  50. propertest-cg1a11 propertest 5 167 169 2
  51. propertest-cg1a11 propertest 6 177 180 3
  52. propertest-cg1a11 propertest 7 160 164 4
  53. propertest-cg1a11 propertest 8 192 195 3
  54. propertest-cg1a11 propertest 9 185 188 3
  55. propertest-cg1a11 propertest 10 183 184 1
  56. propertest-cg1a11 propertest 11 184 184 0
  57. TOTAL LAG: 33

more examples in README

huangapple
  • 本文由 发表于 2022年4月29日 21:45:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/72058964.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定