英文:
How can we ensure reliable and low-latency kafka writing using confluent-kafka-go client?
问题
在这段代码中,调用Producer.Produce(...)
方法将消息写入发送缓冲队列并立即返回。然而,将消息写入内部缓冲队列并不能保证成功发送消息。那么我们如何确保消息的成功发送呢?
要确保消息的成功发送,可以采取以下几个步骤:
-
设置
DeliveryChan
通道:在创建生产者时,可以设置一个DeliveryChan
通道,用于接收消息的传递状态。通过监听该通道,可以获取消息的传递结果。 -
检查消息的传递状态:在调用
Producer.Produce(...)
方法后,可以通过读取DeliveryChan
通道来获取消息的传递状态。如果消息成功传递到代理服务器并被确认,将会收到一个kafka.Message
类型的消息。 -
处理传递失败的情况:如果消息传递失败,可以根据具体情况进行处理。例如,可以进行重试操作,或者将失败的消息记录下来以便后续处理。
通过以上步骤,可以增加对消息传递状态的监控和处理,从而提高消息传递的可靠性。
英文:
The sample producer code at https://github.com/confluentinc/confluent-kafka-go calls Producer.Produce(...)
It writes the message to the sending buffer queue and return immediately. However, writing to the internal buffer queue does not guarantee successful message delivery. How can we ensure sucessful message delivery here?
for idx, word := range []string{"Welcome", "to", "the",
"Confluent", "Kafka", "Golang", "client"} {
start_time := time.Now().UnixNano()
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic,
Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
答案1
得分: 1
要立即阻塞并发送数据,您需要调用生产者的Flush方法。
为了确保数据可靠写入,您需要设置acks=all生产者配置,并确保主题配置为具有至少2个或更多的最小同步副本,并且主题的复制因子为3个或更多。
英文:
To block and send data immediately, you'd call Flush method of the producer
https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/producer.go#L335
To ensure it's written reliably, set acks=all producer config and ensure the topic is configured to have min in sync replicas as 2 or more, and topic replication factor as 3 or more
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论