英文:
segmentio/kafka-go reader client not subscribing to the topic and partition
问题
读取客户端没有开始消费消息。这种情况是间歇性的,在大多数情况下,当主题中没有消息时会发生。
Kafka版本
Apache Kafka 3.3.0
kafka-go版本
v0.4.38
重现此行为的资源:
代码:
func main() {
topic_name := "dev-billing"
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := <-signals
fmt.Println("Got signal: ", sig)
cancel()
}()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"0.0.0.0:9092"},
GroupID: "consumer-group-biller",
GroupTopics: []string{},
Topic: topic_name,
QueueCapacity: 10,
MinBytes: 10e3,
MaxBytes: 10e6,
MaxWait: 3 * time.Second,
PartitionWatchInterval: 5 * time.Second,
WatchPartitionChanges: true,
StartOffset: kafka.LastOffset,
ReadBackoffMax: 10 * time.Second,
Logger: log.Default(),
OffsetOutOfRangeError: true,
})
i := 0
// listening for the interrupts in a different channel.
defer func() {
err := r.Close()
if err != nil {
fmt.Println("Error closing consumer: ", err)
return
}
fmt.Println("Consumer closed")
}()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
break
}
msg := m.Value
content := Event{}
json.Unmarshal([]byte(msg), &content)
fmt.Printf("%+v\n", content)
if content.StatusCode == 200 {
i++
}
if err := r.CommitMessages(ctx, m); err != nil {
log.Fatal("failed to commit messages:", err)
}
fmt.Println("Total:", i)
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
期望的行为
消费者应该在启动后立即开始消费主题分区中的消息,从最后的偏移量开始。
观察到的行为
当生产者正在生产时,消费者无法订阅主题。如果在生产者之前启动消费者,则可以正常工作。
错误日志:
2022/11/17 14:25:34 entering loop for consumer group, consumer-group-biller
2022/11/17 14:25:37 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 joinGroup succeeded for response, consumer-group-biller. generationID=44, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff
2022/11/17 14:25:37 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 received empty assignments for group, consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff for generation 44
2022/11/17 14:25:37 sync group finished for group, consumer-group-biller
2022/11/17 14:25:37 subscribed to topics and partitions: map[]
2022/11/17 14:25:37 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:25:37 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:25:37 started commit for group consumer-group-biller
当它工作时:
2022/11/17 14:09:02 entering loop for consumer group, consumer-group-biller
2022/11/17 14:09:04 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 joinGroup succeeded for response, consumer-group-biller. generationID=35, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273
2022/11/17 14:09:04 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 sync group finished for group, consumer-group-biller
2022/11/17 14:09:04 subscribed to topics and partitions: map[{topic:dev-billing partition:0}:25]
2022/11/17 14:09:04 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:09:04 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:09:04 initializing kafka reader for partition 0 of dev-billing starting at offset 25
2022/11/17 14:09:04 started commit for group consumer-group-biller
2022/11/17 14:09:04 the kafka reader for partition 0 of dev-billing is seeking to offset 25
{RequestID:f9f1971f-3d5b-4ef5-b92b-880fe094887e EventID:0d52de85-8da4-435d-8d19-e267947670c3 Event:example MerchantID:id Status:OK StatusCode:200}
2022/11/17 14:09:04 committed offsets for group consumer-group-biller:
参考:https://github.com/segmentio/kafka-go
英文:
The reader client is not starting to consume messages. This is happening intermittently, in most cases it happens when there are no messages in the topic.
Kafka Version
Apache Kafka 3.3.0
kafka-go version
v0.4.38
Resources to reproduce the behavior:
Code:
func main() {
topic_name := "dev-billing"
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := <-signals
fmt.Println("Got signal: ", sig)
cancel()
}()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"0.0.0.0:9092"},
GroupID: "consumer-group-biller",
GroupTopics: []string{},
Topic: topic_name,
QueueCapacity: 10,
MinBytes: 10e3,
MaxBytes: 10e6,
MaxWait: 3 * time.Second,
PartitionWatchInterval: 5 * time.Second,
WatchPartitionChanges: true,
StartOffset: kafka.LastOffset,
ReadBackoffMax: 10 * time.Second,
Logger: log.Default(),
OffsetOutOfRangeError: true,
})
i := 0
// listening for the interrupts in a different channel.
defer func() {
err := r.Close()
if err != nil {
fmt.Println("Error closing consumer: ", err)
return
}
fmt.Println("Consumer closed")
}()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
break
}
msg := m.Value
content := Event{}
json.Unmarshal([]byte(msg), &content)
fmt.Printf("%+v\n", content)
if content.StatusCode == 200 {
i++
}
if err := r.CommitMessages(ctx, m); err != nil {
log.Fatal("failed to commit messages:", err)
}
fmt.Println("Total:", i)
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
Expected Behavior
Consumers should start consuming the messages in the topic partitions after the last offset as soon as it is started.
Observed Behavior
The consumer is not able to subscribe to the topic when the producer is already producing. If the consumer is start before the producer it works.
Error logs:
2022/11/17 14:25:34 entering loop for consumer group, consumer-group-biller
2022/11/17 14:25:37 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 joinGroup succeeded for response, consumer-group-biller. generationID=44, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff
2022/11/17 14:25:37 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 received empty assignments for group, consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff for generation 44
2022/11/17 14:25:37 sync group finished for group, consumer-group-biller
2022/11/17 14:25:37 subscribed to topics and partitions: map[]
2022/11/17 14:25:37 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:25:37 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:25:37 started commit for group consumer-group-biller
When it worked:
2022/11/17 14:09:02 entering loop for consumer group, consumer-group-biller
2022/11/17 14:09:04 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 joinGroup succeeded for response, consumer-group-biller. generationID=35, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273
2022/11/17 14:09:04 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 sync group finished for group, consumer-group-biller
2022/11/17 14:09:04 subscribed to topics and partitions: map[{topic:dev-billing partition:0}:25]
2022/11/17 14:09:04 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:09:04 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:09:04 initializing kafka reader for partition 0 of dev-billing starting at offset 25
2022/11/17 14:09:04 started commit for group consumer-group-biller
2022/11/17 14:09:04 the kafka reader for partition 0 of dev-billing is seeking to offset 25
{RequestID:f9f1971f-3d5b-4ef5-b92b-880fe094887e EventID:0d52de85-8da4-435d-8d19-e267947670c3 Event:example MerchantID:id Status:OK StatusCode:200}
2022/11/17 14:09:04 committed offsets for group consumer-group-biller:
答案1
得分: 1
最近图书馆出了一些问题,我建议暂时降级并在GitHub上报告这个问题。
具体来说,你可以降级到v0.4.35,在v0.4.36中引入了一些有关消费者组的重构,这会导致消费者组出现问题,如果你查看问题页面的话。
英文:
there has been issue recently with the library, I suggest to downgrade for the moment and report the issue in github.
specifically you can downgrade to v0.4.35, there has been some refactoring into the consumer group introduced in v0.4.36 which causes issues to the consumer group if you check the issues page.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论