segmentio/kafka-go读取器客户端没有订阅主题和分区。

huangapple go评论109阅读模式
英文:

segmentio/kafka-go reader client not subscribing to the topic and partition

问题

读取客户端没有开始消费消息。这种情况是间歇性的,在大多数情况下,当主题中没有消息时会发生。

Kafka版本

Apache Kafka 3.3.0

kafka-go版本

v0.4.38

重现此行为的资源:

代码:

func main() {

	topic_name := "dev-billing"
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
	ctx, cancel := context.WithCancel(context.Background())

	go func() {
		sig := <-signals
		fmt.Println("Got signal: ", sig)
		cancel()
	}()

	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:                []string{"0.0.0.0:9092"},
		GroupID:                "consumer-group-biller",
		GroupTopics:            []string{},
		Topic:                  topic_name,
		QueueCapacity:          10,
		MinBytes:               10e3,
		MaxBytes:               10e6,
		MaxWait:                3 * time.Second,
		PartitionWatchInterval: 5 * time.Second,
		WatchPartitionChanges:  true,
		StartOffset:            kafka.LastOffset,
		ReadBackoffMax:         10 * time.Second,
		Logger:                 log.Default(),
		OffsetOutOfRangeError:  true,
	})

	i := 0

	// listening for the interrupts in a different channel.
	defer func() {
		err := r.Close()
		if err != nil {
			fmt.Println("Error closing consumer: ", err)
			return
		}
		fmt.Println("Consumer closed")
	}()

	for {
		m, err := r.FetchMessage(ctx)
		if err != nil {
			break
		}
		msg := m.Value
		content := Event{}
		json.Unmarshal([]byte(msg), &content)

		fmt.Printf("%+v\n", content)

		if content.StatusCode == 200 {
			i++
		}

		if err := r.CommitMessages(ctx, m); err != nil {
			log.Fatal("failed to commit messages:", err)
		}
		fmt.Println("Total:", i)
	}

	if err := r.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}
}

期望的行为

消费者应该在启动后立即开始消费主题分区中的消息,从最后的偏移量开始。

观察到的行为

当生产者正在生产时,消费者无法订阅主题。如果在生产者之前启动消费者,则可以正常工作。

错误日志:

2022/11/17 14:25:34 entering loop for consumer group, consumer-group-biller
2022/11/17 14:25:37 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 joinGroup succeeded for response, consumer-group-biller.  generationID=44, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff
2022/11/17 14:25:37 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 received empty assignments for group, consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff for generation 44
2022/11/17 14:25:37 sync group finished for group, consumer-group-biller
2022/11/17 14:25:37 subscribed to topics and partitions: map[]
2022/11/17 14:25:37 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:25:37 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:25:37 started commit for group consumer-group-biller

当它工作时:

2022/11/17 14:09:02 entering loop for consumer group, consumer-group-biller
2022/11/17 14:09:04 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 joinGroup succeeded for response, consumer-group-biller.  generationID=35, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273
2022/11/17 14:09:04 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 sync group finished for group, consumer-group-biller
2022/11/17 14:09:04 subscribed to topics and partitions: map[{topic:dev-billing partition:0}:25]
2022/11/17 14:09:04 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:09:04 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:09:04 initializing kafka reader for partition 0 of dev-billing starting at offset 25
2022/11/17 14:09:04 started commit for group consumer-group-biller
2022/11/17 14:09:04 the kafka reader for partition 0 of dev-billing is seeking to offset 25
{RequestID:f9f1971f-3d5b-4ef5-b92b-880fe094887e EventID:0d52de85-8da4-435d-8d19-e267947670c3 Event:example MerchantID:id Status:OK StatusCode:200}
2022/11/17 14:09:04 committed offsets for group consumer-group-biller:

参考:https://github.com/segmentio/kafka-go

英文:

The reader client is not starting to consume messages. This is happening intermittently, in most cases it happens when there are no messages in the topic.

Kafka Version

Apache Kafka 3.3.0

kafka-go version

v0.4.38

Resources to reproduce the behavior:

Code:

func main() {

	topic_name := &quot;dev-billing&quot;
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
	ctx, cancel := context.WithCancel(context.Background())

	go func() {
		sig := &lt;-signals
		fmt.Println(&quot;Got signal: &quot;, sig)
		cancel()
	}()

	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:                []string{&quot;0.0.0.0:9092&quot;},
		GroupID:                &quot;consumer-group-biller&quot;,
		GroupTopics:            []string{},
		Topic:                  topic_name,
		QueueCapacity:          10,
		MinBytes:               10e3,
		MaxBytes:               10e6,
		MaxWait:                3 * time.Second,
		PartitionWatchInterval: 5 * time.Second,
		WatchPartitionChanges:  true,
		StartOffset:            kafka.LastOffset,
		ReadBackoffMax:         10 * time.Second,
		Logger:                 log.Default(),
		OffsetOutOfRangeError:  true,
	})

	i := 0

	// listening for the interrupts in a different channel.
	defer func() {
		err := r.Close()
		if err != nil {
			fmt.Println(&quot;Error closing consumer: &quot;, err)
			return
		}
		fmt.Println(&quot;Consumer closed&quot;)
	}()

	for {
		m, err := r.FetchMessage(ctx)
		if err != nil {
			break
		}
		msg := m.Value
		content := Event{}
		json.Unmarshal([]byte(msg), &amp;content)

		fmt.Printf(&quot;%+v\n&quot;, content)

		if content.StatusCode == 200 {
			i++
		}

		if err := r.CommitMessages(ctx, m); err != nil {
			log.Fatal(&quot;failed to commit messages:&quot;, err)
		}
		fmt.Println(&quot;Total:&quot;, i)
	}

	if err := r.Close(); err != nil {
		log.Fatal(&quot;failed to close reader:&quot;, err)
	}
}

Expected Behavior

Consumers should start consuming the messages in the topic partitions after the last offset as soon as it is started.

Observed Behavior

The consumer is not able to subscribe to the topic when the producer is already producing. If the consumer is start before the producer it works.

Error logs:

2022/11/17 14:25:34 entering loop for consumer group, consumer-group-biller
2022/11/17 14:25:37 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 joinGroup succeeded for response, consumer-group-biller.  generationID=44, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff
2022/11/17 14:25:37 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 received empty assignments for group, consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff for generation 44
2022/11/17 14:25:37 sync group finished for group, consumer-group-biller
2022/11/17 14:25:37 subscribed to topics and partitions: map[]
2022/11/17 14:25:37 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:25:37 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:25:37 started commit for group consumer-group-biller

When it worked:

2022/11/17 14:09:02 entering loop for consumer group, consumer-group-biller
2022/11/17 14:09:04 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 joinGroup succeeded for response, consumer-group-biller.  generationID=35, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273
2022/11/17 14:09:04 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 sync group finished for group, consumer-group-biller
2022/11/17 14:09:04 subscribed to topics and partitions: map[{topic:dev-billing partition:0}:25]
2022/11/17 14:09:04 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:09:04 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:09:04 initializing kafka reader for partition 0 of dev-billing starting at offset 25
2022/11/17 14:09:04 started commit for group consumer-group-biller
2022/11/17 14:09:04 the kafka reader for partition 0 of dev-billing is seeking to offset 25
{RequestID:f9f1971f-3d5b-4ef5-b92b-880fe094887e EventID:0d52de85-8da4-435d-8d19-e267947670c3 Event:example MerchantID:id Status:OK StatusCode:200}
2022/11/17 14:09:04 committed offsets for group consumer-group-biller:

Ref: https://github.com/segmentio/kafka-go

答案1

得分: 1

最近图书馆出了一些问题,我建议暂时降级并在GitHub上报告这个问题。

具体来说,你可以降级到v0.4.35,在v0.4.36中引入了一些有关消费者组的重构,这会导致消费者组出现问题,如果你查看问题页面的话。

英文:

there has been issue recently with the library, I suggest to downgrade for the moment and report the issue in github.

specifically you can downgrade to v0.4.35, there has been some refactoring into the consumer group introduced in v0.4.36 which causes issues to the consumer group if you check the issues page.

huangapple
  • 本文由 发表于 2022年11月18日 03:45:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/74481274.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定