Kafka生产者终止,队列或传输中仍有1条消息(881字节)。

huangapple go评论201阅读模式
英文:

Kafka Producer terminating with 1 message (881 bytes) still in queue or transit

问题

我对Kafka还不太熟悉,当我向生产者推送值时,我收到了这个消息。

  1. func Produce(topic string, key string, message interface{}) {
  2. headers := map[string][]byte{
  3. MSG_HEADER_KEY_CORRELATIONID: []byte("1234"),
  4. MSG_HEADER_KEY_REQUESTID: []byte(uuid.NewString()),
  5. MSG_HEADER_KEY_TESTID: []byte("456"),
  6. MSG_HEADER_KEY_MESSAGETYPE: []byte("TestLookupRequest"),
  7. }
  8. kheaders := make([]kafka.Header, 0, len(headers))
  9. for k, v := range headers {
  10. kheaders = append(kheaders, kafka.Header{Key: k, Value: v})
  11. }
  12. var err error
  13. servers := "XXXXXX"
  14. protocol := "SASL_SSL"
  15. mechanisms := "PLAIN"
  16. username := "XXXXXXX"
  17. password := "XXXXXXX"
  18. Producer, err = kafka.NewProducer(&kafka.ConfigMap{
  19. "bootstrap.servers": servers,
  20. "security.protocol": protocol,
  21. "sasl.username": username,
  22. "sasl.password": password,
  23. "sasl.mechanism": mechanisms,
  24. })
  25. if err != nil {
  26. panic(err)
  27. }
  28. defer Producer.Close()
  29. value, _ := json.Marshal(message)
  30. err = Producer.Produce(&kafka.Message{
  31. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  32. Key: []byte("12345"),
  33. Headers: kheaders,
  34. Value: value,
  35. Timestamp: time.Now().UTC(),
  36. TimestampType: kafka.TimestampCreateTime,
  37. }, nil)
  38. if err != nil {
  39. panic(err)
  40. }
  41. Producer.Flush(30)
  42. }

%4|1641074998.615|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 1 message (881 bytes) still in queue or transit: use flush() to wait for outstanding message delivery

请问有什么帮助可以解决这个问题吗?

英文:

I am pretty new to Kafka and I am getting this message when pushing value to the producer

  1. func Produce(topic string, key string, message interface{}) {
  2. headers := map[string][]byte{
  3. MSG_HEADER_KEY_CORRELATIONID: []byte("1234"),
  4. MSG_HEADER_KEY_REQUESTID: []byte(uuid.NewString()),
  5. MSG_HEADER_KEY_TESTID: []byte("456"),
  6. MSG_HEADER_KEY_MESSAGETYPE: []byte("TestLookupRequest"),
  7. }
  8. kheaders := make([]kafka.Header, 0, len(headers))
  9. for k, v := range headers {
  10. kheaders = append(kheaders, kafka.Header{Key: k, Value: v})
  11. }
  12. var err error
  13. servers := "XXXXXX"
  14. protocol := "SASL_SSL"
  15. mechanisms := "PLAIN"
  16. username := "XXXXXXX"
  17. password := "XXXXXXX"
  18. Producer, err = kafka.NewProducer(&kafka.ConfigMap{
  19. "bootstrap.servers": servers,
  20. "security.protocol": protocol,
  21. "sasl.username": username,
  22. "sasl.password": password,
  23. "sasl.mechanism": mechanisms,
  24. })
  25. if err != nil {
  26. panic(err)
  27. }
  28. defer Producer.Close()
  29. value, _ := json.Marshal(message)
  30. err = Producer.Produce(&kafka.Message{
  31. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  32. Key: []byte("12345"),
  33. Headers: kheaders,
  34. Value: value,
  35. Timestamp: time.Now().UTC(),
  36. TimestampType: kafka.TimestampCreateTime,
  37. }, nil)
  38. if err != nil {
  39. panic(err)
  40. }
  41. Producer.Flush(30)
  42. }

%4|1641074998.615|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 1 message (881 bytes) still in queue or transit: use flush() to wait for outstanding message delivery

Any help on how I can fix this?

答案1

得分: 4

请在 Flush() 方法中尝试更长的超时时间,30毫秒可能不够。或者尝试使用通道,就像这个示例中的方式:
https://github.com/confluentinc/confluent-kafka-go/blob/80c58f81b6cc32d3ed046609bf660a41a061b23d/examples/producer_example/producer_example.go

英文:

Please, try a longer timeout when Flush(); 30ms might not be enough. Or try to use a channel as in this example:
https://github.com/confluentinc/confluent-kafka-go/blob/80c58f81b6cc32d3ed046609bf660a41a061b23d/examples/producer_example/producer_example.go

huangapple
  • 本文由 发表于 2022年1月2日 06:15:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/70552273.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定