我们如何确保使用confluent-kafka-go客户端进行可靠且低延迟的Kafka写入?

huangapple go评论74阅读模式
英文:

How can we ensure reliable and low-latency kafka writing using confluent-kafka-go client?

问题

在这段代码中,调用Producer.Produce(...)方法将消息写入发送缓冲队列并立即返回。然而,将消息写入内部缓冲队列并不能保证成功发送消息。那么我们如何确保消息的成功发送呢?

要确保消息的成功发送,可以采取以下几个步骤:

  1. 设置DeliveryChan通道:在创建生产者时,可以设置一个DeliveryChan通道,用于接收消息的传递状态。通过监听该通道,可以获取消息的传递结果。

  2. 检查消息的传递状态:在调用Producer.Produce(...)方法后,可以通过读取DeliveryChan通道来获取消息的传递状态。如果消息成功传递到代理服务器并被确认,将会收到一个kafka.Message类型的消息。

  3. 处理传递失败的情况:如果消息传递失败,可以根据具体情况进行处理。例如,可以进行重试操作,或者将失败的消息记录下来以便后续处理。

通过以上步骤,可以增加对消息传递状态的监控和处理,从而提高消息传递的可靠性。

英文:

The sample producer code at https://github.com/confluentinc/confluent-kafka-go calls Producer.Produce(...) It writes the message to the sending buffer queue and return immediately. However, writing to the internal buffer queue does not guarantee successful message delivery. How can we ensure sucessful message delivery here?

for idx, word := range []string{"Welcome", "to", "the", 
                                "Confluent", "Kafka", "Golang", "client"} {
	start_time := time.Now().UnixNano()
	p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, 
                                             Partition: kafka.PartitionAny},
		Value:          []byte(word),
	}, nil)
}

答案1

得分: 1

要立即阻塞并发送数据,您需要调用生产者的Flush方法。

为了确保数据可靠写入,您需要设置acks=all生产者配置,并确保主题配置为具有至少2个或更多的最小同步副本,并且主题的复制因子为3个或更多。

英文:

To block and send data immediately, you'd call Flush method of the producer

https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/producer.go#L335

To ensure it's written reliably, set acks=all producer config and ensure the topic is configured to have min in sync replicas as 2 or more, and topic replication factor as 3 or more

huangapple
  • 本文由 发表于 2022年7月20日 16:24:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/73048231.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定