Kafka消费者偏移量导出的Golang代码可以使用sharma或confluent-kafka-go库。

huangapple go评论69阅读模式
英文:

Kafka consumer offset export golang -- sharma or confluent-kafka-go lib

问题

我正在尝试找到一种在消费者组上执行偏移重置操作的方法,例如在Kafka命令中,可以像这样:

kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current --export | tee topic-offset.csv
kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg2 --to-current

然后根据导出的文件导入新的偏移量?

kafka-consumer-groups.sh --bootstrap-server $kfk --execute --reset-offsets --topic $t --group $cg2 --from-file topic-offset.csv

导出和导入文件不是问题...只是似乎找不到获取和设置偏移量的方法。

所以,有人使用sharmaconfluent-kafka-go库来处理这个问题吗?

提前感谢任何建议 Kafka消费者偏移量导出的Golang代码可以使用sharma或confluent-kafka-go库。

英文:

I am trying to find a way of performing offsets reset operation on consumer group which for example in Kafka commands would be something like this:

kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current --export | tee topic-offset.csv
kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg2 --to-current

and then import new offset base on that export file ?

kafka-consumer-groups.sh --bootstrap-server $kfk --execute --reset-offsets --topic $t --group $cg2 --from-file topic-offset.csv

The export import from file is not a problem ... just seems cannot find way to get and then set the offset ..

so did anyone was playing with this using sharma or confluent-kafka-go lib ?

Thanks for any suggestions in advance Kafka消费者偏移量导出的Golang代码可以使用sharma或confluent-kafka-go库。

答案1

得分: 1

好的,我已经翻译了你提供的内容:

func main() {
	brokers := []string{BK}
	kfk.Logger = log.New(os.Stdout, "", log.LstdFlags)

	cfg := kfk.NewConfig()
	cfg.ClientID = CID
	client, _ := kfk.NewClient(brokers, cfg)
	//fmt.Println(client)
	offsetMg, _ := kfk.NewOffsetManagerFromClient(CG, client)
	defer offsetMg.Close()

	consumer, _ := kfk.NewConsumerFromClient(client)
	defer consumer.Close()

	partitions, _ := consumer.Partitions(TOPIC)

	for _, p := range partitions {

		pom, _ := offsetMg.ManagePartition(TOPIC, p)
		ofs, pomStr := pom.NextOffset()
		fmt.Printf("Partition: %v -> nextOffset: %v:%s\n", p, ofs, pomStr)

	}
	fmt.Println("--")

}

这段代码给出了以下输出:

Partition: 0 -> nextOffset: 31:
Partition: 1 -> nextOffset: 30:
Partition: 2 -> nextOffset: 45:
Partition: 3 -> nextOffset: 39:
Partition: 4 -> nextOffset: 45:
Partition: 5 -> nextOffset: 39:
Partition: 6 -> nextOffset: 37:
Partition: 7 -> nextOffset: 42:
Partition: 8 -> nextOffset: 43:
Partition: 9 -> nextOffset: 35:
Partition: 10 -> nextOffset: 41:
Partition: 11 -> nextOffset: 36:

这与使用Java命令得到的结果完全相同:

❯ kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current | sort -k3 -n

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
propertest-cg1                 propertest                     0          31
propertest-cg1                 propertest                     1          30
propertest-cg1                 propertest                     2          45
propertest-cg1                 propertest                     3          39
propertest-cg1                 propertest                     4          45
propertest-cg1                 propertest                     5          39
propertest-cg1                 propertest                     6          37
propertest-cg1                 propertest                     7          42
propertest-cg1                 propertest                     8          43
propertest-cg1                 propertest                     9          35
propertest-cg1                 propertest                     10         41
propertest-cg1                 propertest                     11         36

现在只剩下将这些数据导出到文件并使用以下函数来设置新的偏移量:

// ResetOffset resets to the provided offset, alongside a metadata string that
// represents the state of the partition consumer at that point in time. Reset
// acts as a counterpart to MarkOffset, the difference being that it allows to
// reset an offset to an earlier or smaller value, where MarkOffset only
// allows incrementing the offset. cf MarkOffset for more details.
ResetOffset(topic string, partition int32, offset int64, metadata string)
英文:

OK i think i found it the way just need to implement full solution but i should be good with the

func main() {
	brokers := []string{BK}
	kfk.Logger = log.New(os.Stdout, "", log.LstdFlags)

	cfg := kfk.NewConfig()
	cfg.ClientID = CID
	client, _ := kfk.NewClient(brokers, cfg)
	//fmt.Println(client)
	offsetMg, _ := kfk.NewOffsetManagerFromClient(CG, client)
	defer offsetMg.Close()

	consumer, _ := kfk.NewConsumerFromClient(client)
	defer consumer.Close()

	partitions, _ := consumer.Partitions(TOPIC)

	for _, p := range partitions {

		pom, _ := offsetMg.ManagePartition(TOPIC, p)
		ofs, pomStr := pom.NextOffset()
		fmt.Printf("Partition: %v -> nextOffset: %v:%s\n", p, ofs, pomStr)

	}
	fmt.Println("--")

}

and that gives me this output:

Partition: 0 -> nextOffset: 31:
Partition: 1 -> nextOffset: 30:
Partition: 2 -> nextOffset: 45:
Partition: 3 -> nextOffset: 39:
Partition: 4 -> nextOffset: 45:
Partition: 5 -> nextOffset: 39:
Partition: 6 -> nextOffset: 37:
Partition: 7 -> nextOffset: 42:
Partition: 8 -> nextOffset: 43:
Partition: 9 -> nextOffset: 35:
Partition: 10 -> nextOffset: 41:
Partition: 11 -> nextOffset: 36:

which is exactly same as from java command:

❯ kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current | sort -k3 -n

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
propertest-cg1                 propertest                     0          31
propertest-cg1                 propertest                     1          30
propertest-cg1                 propertest                     2          45
propertest-cg1                 propertest                     3          39
propertest-cg1                 propertest                     4          45
propertest-cg1                 propertest                     5          39
propertest-cg1                 propertest                     6          37
propertest-cg1                 propertest                     7          42
propertest-cg1                 propertest                     8          43
propertest-cg1                 propertest                     9          35
propertest-cg1                 propertest                     10         41
propertest-cg1                 propertest                     11         36

so now only what left is to export this data to a file and use function

// ResetOffset resets to the provided offset, alongside a metadata string that
	// represents the state of the partition consumer at that point in time. Reset
	// acts as a counterpart to MarkOffset, the difference being that it allows to
	// reset an offset to an earlier or smaller value, where MarkOffset only
	// allows incrementing the offset. cf MarkOffset for more details.
	ResetOffset(topic string, partition int32, offset int64, metadata string)

to set new offset ...

答案2

得分: 0

这是一些简单的代码,根据YMAL配置进行偏移重置。

https://github.com/nXnUs25/kfk-offsets

这是一个用于监控消费者组列表和偏移重置的命令行工具。

我们有相同的偏移量...现在为了模拟这个过程,我们将向主题发送消息,并在一个消费者组propertest-cg1a11上进行消费,我们将产生5条消息并在该消费者组上全部消费,这将给我们提供我们已经消费的信息。

^CProcessed a total of 33 messages 28 + 5

❯ ./kfkgo lag
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg1a11                        propertest                               0               183             183             0
propertest-cg1a11                        propertest                               1               165             165             0
propertest-cg1a11                        propertest                               2               192             192             0
propertest-cg1a11                        propertest                               3               177             177             0
propertest-cg1a11                        propertest                               4               192             192             0
propertest-cg1a11                        propertest                               5               169             169             0
propertest-cg1a11                        propertest                               6               180             180             0
propertest-cg1a11                        propertest                               7               164             164             0
propertest-cg1a11                        propertest                               8               195             195             0
propertest-cg1a11                        propertest                               9               188             188             0
propertest-cg1a11                        propertest                               10              184             184             0
propertest-cg1a11                        propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        0

❯ ./kfkgo lag -g propertest-cg -t propertest
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg                            propertest                               0               179             183             4
propertest-cg                            propertest                               1               162             165             3
propertest-cg                            propertest                               2               190             192             2
propertest-cg                            propertest                               3               174             177             3
propertest-cg                            propertest                               4               187             192             5
propertest-cg                            propertest                               5               167             169             2
propertest-cg                            propertest                               6               177             180             3
propertest-cg                            propertest                               7               160             164             4
propertest-cg                            propertest                               8               192             195             3
propertest-cg                            propertest                               9               185             188             3
propertest-cg                            propertest                               10              183             184             1
propertest-cg                            propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        33

现在,我们将偏移量从propertest-cg移回propertest-cg1a11,这将允许我们在该消费者组上处理相同的消息。

❯ ./kfkgo offset -m
Using config file: ~/kfk-offsets/kfk-offset.yaml
moving

再次验证:

Kafka命令:kafka-consumer-groups.sh

propertest-cg1a11 propertest      0          179             183             4               -               -               -
propertest-cg1a11 propertest      1          162             165             3               -               -               -
propertest-cg1a11 propertest      2          190             192             2               -               -               -
propertest-cg1a11 propertest      3          174             177             3               -               -               -
propertest-cg1a11 propertest      4          187             192             5               -               -               -
propertest-cg1a11 propertest      5          167             169             2               -               -               -
propertest-cg1a11 propertest      6          177             180             3               -               -               -
propertest-cg1a11 propertest      7          160             164             4               -               -               -
propertest-cg1a11 propertest      8          192             195             3               -               -               -
propertest-cg1a11 propertest      9          185             188             3               -               -               -
propertest-cg1a11 propertest      10         183             184             1               -               -               -
propertest-cg1a11 propertest      11         184             184             0               -               -               -


Consumer group 'propertest-cg' has no active members.
propertest-cg   propertest      0          179             183             4               -               -               -
propertest-cg   propertest      1          162             165             3               -               -               -
propertest-cg   propertest      2          190             192             2               -               -               -
propertest-cg   propertest      3          174             177             3               -               -               -
propertest-cg   propertest      4          187             192             5               -               -               -
propertest-cg   propertest      5          167             169             2               -               -               -
propertest-cg   propertest      6          177             180             3               -               -               -
propertest-cg   propertest      7          160             164             4               -               -               -
propertest-cg   propertest      8          192             195             3               -               -               -
propertest-cg   propertest      9          185             188             3               -               -               -
propertest-cg   propertest      10         183             184             1               -               -               -
propertest-cg   propertest      11         184             184             0               -               -               -

❯ ./kfkgo lag -g propertest-cg -t propertest
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg                            propertest                               0               179             183             4
propertest-cg                            propertest                               1               162             165             3
propertest-cg                            propertest                               2               190             192             2
propertest-cg                            propertest                               3               174             177             3
propertest-cg                            propertest                               4               187             192             5
propertest-cg                            propertest                               5               167             169             2
propertest-cg                            propertest                               6               177             180             3
propertest-cg                            propertest                               7               160             164             4
propertest-cg                            propertest                               8               192             195             3
propertest-cg                            propertest                               9               185             188             3
propertest-cg                            propertest                               10              183             184             1
propertest-cg                            propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        33

❯ ./kfkgo lag
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg1a11                        propertest                               0               179             183             4
propertest-cg1a11                        propertest                               1               162             165             3
propertest-cg1a11                        propertest                               2               190             192             2
propertest-cg1a11                        propertest                               3               174             177             3
propertest-cg1a11                        propertest                               4               187             192             5
propertest-cg1a11                        propertest                               5               167             169             2
propertest-cg1a11                        propertest                               6               177             180             3
propertest-cg1a11                        propertest                               7               160             164             4
propertest-cg1a11                        propertest                               8               192             195             3
propertest-cg1a11                        propertest                               9               185             188             3
propertest-cg1a11                        propertest                               10              183             184             1
propertest-cg1a11                        propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        33

更多示例请参阅README。

英文:

Here is some simple up which does the offset reset base on YMAL config

https://github.com/nXnUs25/kfk-offsets

Command line tool for LAG monitoring consumer groups listing and offset reset.

We have same offsets … Now to simulate the process we will produce messages to topic and keep consuming on one of the consumer group propertest-cg1a11 we will produce 5 messages and consume them all on that consumer group which will give us info that we consumed

^CProcessed a total of 33 messages 28 + 5

❯ ./kfkgo lag
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg1a11                        propertest                               0               183             183             0
propertest-cg1a11                        propertest                               1               165             165             0
propertest-cg1a11                        propertest                               2               192             192             0
propertest-cg1a11                        propertest                               3               177             177             0
propertest-cg1a11                        propertest                               4               192             192             0
propertest-cg1a11                        propertest                               5               169             169             0
propertest-cg1a11                        propertest                               6               180             180             0
propertest-cg1a11                        propertest                               7               164             164             0
propertest-cg1a11                        propertest                               8               195             195             0
propertest-cg1a11                        propertest                               9               188             188             0
propertest-cg1a11                        propertest                               10              184             184             0
propertest-cg1a11                        propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        0

❯ ./kfkgo lag -g propertest-cg -t propertest
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg                            propertest                               0               179             183             4
propertest-cg                            propertest                               1               162             165             3
propertest-cg                            propertest                               2               190             192             2
propertest-cg                            propertest                               3               174             177             3
propertest-cg                            propertest                               4               187             192             5
propertest-cg                            propertest                               5               167             169             2
propertest-cg                            propertest                               6               177             180             3
propertest-cg                            propertest                               7               160             164             4
propertest-cg                            propertest                               8               192             195             3
propertest-cg                            propertest                               9               185             188             3
propertest-cg                            propertest                               10              183             184             1
propertest-cg                            propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        33

Now we will move the offset from propertest-cg back again to propertest-cg1a11 which will allow us to process same messages on that CG.

❯ ./kfkgo offset -m
Using config file: ~/kfk-offsets/kfk-offset.yaml
moving

and verification again :

Kafka commands: kafka-consumer-groups.sh

propertest-cg1a11 propertest      0          179             183             4               -               -               -
propertest-cg1a11 propertest      1          162             165             3               -               -               -
propertest-cg1a11 propertest      2          190             192             2               -               -               -
propertest-cg1a11 propertest      3          174             177             3               -               -               -
propertest-cg1a11 propertest      4          187             192             5               -               -               -
propertest-cg1a11 propertest      5          167             169             2               -               -               -
propertest-cg1a11 propertest      6          177             180             3               -               -               -
propertest-cg1a11 propertest      7          160             164             4               -               -               -
propertest-cg1a11 propertest      8          192             195             3               -               -               -
propertest-cg1a11 propertest      9          185             188             3               -               -               -
propertest-cg1a11 propertest      10         183             184             1               -               -               -
propertest-cg1a11 propertest      11         184             184             0               -               -               -


Consumer group 'propertest-cg' has no active members.
propertest-cg   propertest      0          179             183             4               -               -               -
propertest-cg   propertest      1          162             165             3               -               -               -
propertest-cg   propertest      2          190             192             2               -               -               -
propertest-cg   propertest      3          174             177             3               -               -               -
propertest-cg   propertest      4          187             192             5               -               -               -
propertest-cg   propertest      5          167             169             2               -               -               -
propertest-cg   propertest      6          177             180             3               -               -               -
propertest-cg   propertest      7          160             164             4               -               -               -
propertest-cg   propertest      8          192             195             3               -               -               -
propertest-cg   propertest      9          185             188             3               -               -               -
propertest-cg   propertest      10         183             184             1               -               -               -
propertest-cg   propertest      11         184             184             0               -               -               -

❯ ./kfkgo lag -g propertest-cg -t propertest
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg                            propertest                               0               179             183             4
propertest-cg                            propertest                               1               162             165             3
propertest-cg                            propertest                               2               190             192             2
propertest-cg                            propertest                               3               174             177             3
propertest-cg                            propertest                               4               187             192             5
propertest-cg                            propertest                               5               167             169             2
propertest-cg                            propertest                               6               177             180             3
propertest-cg                            propertest                               7               160             164             4
propertest-cg                            propertest                               8               192             195             3
propertest-cg                            propertest                               9               185             188             3
propertest-cg                            propertest                               10              183             184             1
propertest-cg                            propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        33

❯ ./kfkgo lag
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg1a11                        propertest                               0               179             183             4
propertest-cg1a11                        propertest                               1               162             165             3
propertest-cg1a11                        propertest                               2               190             192             2
propertest-cg1a11                        propertest                               3               174             177             3
propertest-cg1a11                        propertest                               4               187             192             5
propertest-cg1a11                        propertest                               5               167             169             2
propertest-cg1a11                        propertest                               6               177             180             3
propertest-cg1a11                        propertest                               7               160             164             4
propertest-cg1a11                        propertest                               8               192             195             3
propertest-cg1a11                        propertest                               9               185             188             3
propertest-cg1a11                        propertest                               10              183             184             1
propertest-cg1a11                        propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        33

more examples in README

huangapple
  • 本文由 发表于 2022年4月29日 21:45:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/72058964.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定