Kafka生产者终止,队列或传输中仍有1条消息(881字节)。

huangapple go评论108阅读模式
英文:

Kafka Producer terminating with 1 message (881 bytes) still in queue or transit

问题

我对Kafka还不太熟悉,当我向生产者推送值时,我收到了这个消息。

func Produce(topic string, key string, message interface{}) {

    headers := map[string][]byte{
        MSG_HEADER_KEY_CORRELATIONID: []byte("1234"),
        MSG_HEADER_KEY_REQUESTID:     []byte(uuid.NewString()),
        MSG_HEADER_KEY_TESTID:        []byte("456"),
        MSG_HEADER_KEY_MESSAGETYPE:   []byte("TestLookupRequest"),
    }

    kheaders := make([]kafka.Header, 0, len(headers))
    for k, v := range headers {
        kheaders = append(kheaders, kafka.Header{Key: k, Value: v})
    }


    var err error

    servers := "XXXXXX"
    protocol := "SASL_SSL"
    mechanisms := "PLAIN"
    username := "XXXXXXX"
    password := "XXXXXXX"


    Producer, err = kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": servers,
        "security.protocol": protocol,
        "sasl.username":     username,
        "sasl.password":     password,
        "sasl.mechanism":    mechanisms,
    })

    if err != nil {
        panic(err)
    }
    defer Producer.Close()

    value, _ := json.Marshal(message)

    err = Producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Key:           []byte("12345"),
        Headers:       kheaders,
        Value:         value,
        Timestamp:     time.Now().UTC(),
        TimestampType: kafka.TimestampCreateTime,
    }, nil)

    if err != nil {
        panic(err)
    }
    Producer.Flush(30)
}

%4|1641074998.615|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 1 message (881 bytes) still in queue or transit: use flush() to wait for outstanding message delivery

请问有什么帮助可以解决这个问题吗?

英文:

I am pretty new to Kafka and I am getting this message when pushing value to the producer

func Produce(topic string, key string, message interface{}) {
headers := map[string][]byte{
MSG_HEADER_KEY_CORRELATIONID: []byte("1234"),
MSG_HEADER_KEY_REQUESTID:     []byte(uuid.NewString()),
MSG_HEADER_KEY_TESTID:        []byte("456"),
MSG_HEADER_KEY_MESSAGETYPE:   []byte("TestLookupRequest"),
}
kheaders := make([]kafka.Header, 0, len(headers))
for k, v := range headers {
kheaders = append(kheaders, kafka.Header{Key: k, Value: v})
}
var err error
servers := "XXXXXX"
protocol := "SASL_SSL"
mechanisms := "PLAIN"
username := "XXXXXXX"
password := "XXXXXXX"
Producer, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": servers,
"security.protocol": protocol,
"sasl.username":     username,
"sasl.password":     password,
"sasl.mechanism":    mechanisms,
})
if err != nil {
panic(err)
}
defer Producer.Close()
value, _ := json.Marshal(message)
err = Producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key:           []byte("12345"),
Headers:       kheaders,
Value:         value,
Timestamp:     time.Now().UTC(),
TimestampType: kafka.TimestampCreateTime,
}, nil)
if err != nil {
panic(err)
}
Producer.Flush(30)
}

%4|1641074998.615|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 1 message (881 bytes) still in queue or transit: use flush() to wait for outstanding message delivery

Any help on how I can fix this?

答案1

得分: 4

请在 Flush() 方法中尝试更长的超时时间,30毫秒可能不够。或者尝试使用通道,就像这个示例中的方式:
https://github.com/confluentinc/confluent-kafka-go/blob/80c58f81b6cc32d3ed046609bf660a41a061b23d/examples/producer_example/producer_example.go

英文:

Please, try a longer timeout when Flush(); 30ms might not be enough. Or try to use a channel as in this example:
https://github.com/confluentinc/confluent-kafka-go/blob/80c58f81b6cc32d3ed046609bf660a41a061b23d/examples/producer_example/producer_example.go

huangapple
  • 本文由 发表于 2022年1月2日 06:15:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/70552273.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定