segmentio/kafka-go读取器客户端没有订阅主题和分区。

huangapple go评论137阅读模式
英文:

segmentio/kafka-go reader client not subscribing to the topic and partition

问题

读取客户端没有开始消费消息。这种情况是间歇性的,在大多数情况下,当主题中没有消息时会发生。

Kafka版本

  1. Apache Kafka 3.3.0

kafka-go版本

  1. v0.4.38

重现此行为的资源:

代码:

  1. func main() {
  2. topic_name := "dev-billing"
  3. signals := make(chan os.Signal, 1)
  4. signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
  5. ctx, cancel := context.WithCancel(context.Background())
  6. go func() {
  7. sig := <-signals
  8. fmt.Println("Got signal: ", sig)
  9. cancel()
  10. }()
  11. r := kafka.NewReader(kafka.ReaderConfig{
  12. Brokers: []string{"0.0.0.0:9092"},
  13. GroupID: "consumer-group-biller",
  14. GroupTopics: []string{},
  15. Topic: topic_name,
  16. QueueCapacity: 10,
  17. MinBytes: 10e3,
  18. MaxBytes: 10e6,
  19. MaxWait: 3 * time.Second,
  20. PartitionWatchInterval: 5 * time.Second,
  21. WatchPartitionChanges: true,
  22. StartOffset: kafka.LastOffset,
  23. ReadBackoffMax: 10 * time.Second,
  24. Logger: log.Default(),
  25. OffsetOutOfRangeError: true,
  26. })
  27. i := 0
  28. // listening for the interrupts in a different channel.
  29. defer func() {
  30. err := r.Close()
  31. if err != nil {
  32. fmt.Println("Error closing consumer: ", err)
  33. return
  34. }
  35. fmt.Println("Consumer closed")
  36. }()
  37. for {
  38. m, err := r.FetchMessage(ctx)
  39. if err != nil {
  40. break
  41. }
  42. msg := m.Value
  43. content := Event{}
  44. json.Unmarshal([]byte(msg), &content)
  45. fmt.Printf("%+v\n", content)
  46. if content.StatusCode == 200 {
  47. i++
  48. }
  49. if err := r.CommitMessages(ctx, m); err != nil {
  50. log.Fatal("failed to commit messages:", err)
  51. }
  52. fmt.Println("Total:", i)
  53. }
  54. if err := r.Close(); err != nil {
  55. log.Fatal("failed to close reader:", err)
  56. }
  57. }

期望的行为

消费者应该在启动后立即开始消费主题分区中的消息,从最后的偏移量开始。

观察到的行为

当生产者正在生产时,消费者无法订阅主题。如果在生产者之前启动消费者,则可以正常工作。

错误日志:

  1. 2022/11/17 14:25:34 entering loop for consumer group, consumer-group-biller
  2. 2022/11/17 14:25:37 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
  3. 2022/11/17 14:25:37 joinGroup succeeded for response, consumer-group-biller. generationID=44, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff
  4. 2022/11/17 14:25:37 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
  5. 2022/11/17 14:25:37 received empty assignments for group, consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff for generation 44
  6. 2022/11/17 14:25:37 sync group finished for group, consumer-group-biller
  7. 2022/11/17 14:25:37 subscribed to topics and partitions: map[]
  8. 2022/11/17 14:25:37 started heartbeat for group, consumer-group-biller [3s]
  9. 2022/11/17 14:25:37 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
  10. 2022/11/17 14:25:37 started commit for group consumer-group-biller

当它工作时:

  1. 2022/11/17 14:09:02 entering loop for consumer group, consumer-group-biller
  2. 2022/11/17 14:09:04 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
  3. 2022/11/17 14:09:04 joinGroup succeeded for response, consumer-group-biller. generationID=35, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273
  4. 2022/11/17 14:09:04 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
  5. 2022/11/17 14:09:04 sync group finished for group, consumer-group-biller
  6. 2022/11/17 14:09:04 subscribed to topics and partitions: map[{topic:dev-billing partition:0}:25]
  7. 2022/11/17 14:09:04 started heartbeat for group, consumer-group-biller [3s]
  8. 2022/11/17 14:09:04 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
  9. 2022/11/17 14:09:04 initializing kafka reader for partition 0 of dev-billing starting at offset 25
  10. 2022/11/17 14:09:04 started commit for group consumer-group-biller
  11. 2022/11/17 14:09:04 the kafka reader for partition 0 of dev-billing is seeking to offset 25
  12. {RequestID:f9f1971f-3d5b-4ef5-b92b-880fe094887e EventID:0d52de85-8da4-435d-8d19-e267947670c3 Event:example MerchantID:id Status:OK StatusCode:200}
  13. 2022/11/17 14:09:04 committed offsets for group consumer-group-biller:

参考:https://github.com/segmentio/kafka-go

英文:

The reader client is not starting to consume messages. This is happening intermittently, in most cases it happens when there are no messages in the topic.

Kafka Version

  1. Apache Kafka 3.3.0

kafka-go version

  1. v0.4.38

Resources to reproduce the behavior:

Code:

  1. func main() {
  2. topic_name := &quot;dev-billing&quot;
  3. signals := make(chan os.Signal, 1)
  4. signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
  5. ctx, cancel := context.WithCancel(context.Background())
  6. go func() {
  7. sig := &lt;-signals
  8. fmt.Println(&quot;Got signal: &quot;, sig)
  9. cancel()
  10. }()
  11. r := kafka.NewReader(kafka.ReaderConfig{
  12. Brokers: []string{&quot;0.0.0.0:9092&quot;},
  13. GroupID: &quot;consumer-group-biller&quot;,
  14. GroupTopics: []string{},
  15. Topic: topic_name,
  16. QueueCapacity: 10,
  17. MinBytes: 10e3,
  18. MaxBytes: 10e6,
  19. MaxWait: 3 * time.Second,
  20. PartitionWatchInterval: 5 * time.Second,
  21. WatchPartitionChanges: true,
  22. StartOffset: kafka.LastOffset,
  23. ReadBackoffMax: 10 * time.Second,
  24. Logger: log.Default(),
  25. OffsetOutOfRangeError: true,
  26. })
  27. i := 0
  28. // listening for the interrupts in a different channel.
  29. defer func() {
  30. err := r.Close()
  31. if err != nil {
  32. fmt.Println(&quot;Error closing consumer: &quot;, err)
  33. return
  34. }
  35. fmt.Println(&quot;Consumer closed&quot;)
  36. }()
  37. for {
  38. m, err := r.FetchMessage(ctx)
  39. if err != nil {
  40. break
  41. }
  42. msg := m.Value
  43. content := Event{}
  44. json.Unmarshal([]byte(msg), &amp;content)
  45. fmt.Printf(&quot;%+v\n&quot;, content)
  46. if content.StatusCode == 200 {
  47. i++
  48. }
  49. if err := r.CommitMessages(ctx, m); err != nil {
  50. log.Fatal(&quot;failed to commit messages:&quot;, err)
  51. }
  52. fmt.Println(&quot;Total:&quot;, i)
  53. }
  54. if err := r.Close(); err != nil {
  55. log.Fatal(&quot;failed to close reader:&quot;, err)
  56. }
  57. }

Expected Behavior

Consumers should start consuming the messages in the topic partitions after the last offset as soon as it is started.

Observed Behavior

The consumer is not able to subscribe to the topic when the producer is already producing. If the consumer is start before the producer it works.

Error logs:

  1. 2022/11/17 14:25:34 entering loop for consumer group, consumer-group-biller
  2. 2022/11/17 14:25:37 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
  3. 2022/11/17 14:25:37 joinGroup succeeded for response, consumer-group-biller. generationID=44, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff
  4. 2022/11/17 14:25:37 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
  5. 2022/11/17 14:25:37 received empty assignments for group, consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff for generation 44
  6. 2022/11/17 14:25:37 sync group finished for group, consumer-group-biller
  7. 2022/11/17 14:25:37 subscribed to topics and partitions: map[]
  8. 2022/11/17 14:25:37 started heartbeat for group, consumer-group-biller [3s]
  9. 2022/11/17 14:25:37 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
  10. 2022/11/17 14:25:37 started commit for group consumer-group-biller

When it worked:

  1. 2022/11/17 14:09:02 entering loop for consumer group, consumer-group-biller
  2. 2022/11/17 14:09:04 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
  3. 2022/11/17 14:09:04 joinGroup succeeded for response, consumer-group-biller. generationID=35, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273
  4. 2022/11/17 14:09:04 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
  5. 2022/11/17 14:09:04 sync group finished for group, consumer-group-biller
  6. 2022/11/17 14:09:04 subscribed to topics and partitions: map[{topic:dev-billing partition:0}:25]
  7. 2022/11/17 14:09:04 started heartbeat for group, consumer-group-biller [3s]
  8. 2022/11/17 14:09:04 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
  9. 2022/11/17 14:09:04 initializing kafka reader for partition 0 of dev-billing starting at offset 25
  10. 2022/11/17 14:09:04 started commit for group consumer-group-biller
  11. 2022/11/17 14:09:04 the kafka reader for partition 0 of dev-billing is seeking to offset 25
  12. {RequestID:f9f1971f-3d5b-4ef5-b92b-880fe094887e EventID:0d52de85-8da4-435d-8d19-e267947670c3 Event:example MerchantID:id Status:OK StatusCode:200}
  13. 2022/11/17 14:09:04 committed offsets for group consumer-group-biller:

Ref: https://github.com/segmentio/kafka-go

答案1

得分: 1

最近图书馆出了一些问题,我建议暂时降级并在GitHub上报告这个问题。

具体来说,你可以降级到v0.4.35,在v0.4.36中引入了一些有关消费者组的重构,这会导致消费者组出现问题,如果你查看问题页面的话。

英文:

there has been issue recently with the library, I suggest to downgrade for the moment and report the issue in github.

specifically you can downgrade to v0.4.35, there has been some refactoring into the consumer group introduced in v0.4.36 which causes issues to the consumer group if you check the issues page.

huangapple
  • 本文由 发表于 2022年11月18日 03:45:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/74481274.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定