英文:
Kafka retry property not working as expected
问题
我已经使用版本2.8.1设置了Kafka,基本配置为1个Broker、1个Topic和1个Partition。我使用以下代码来发送数据:
public static void main(String[] args) throws InterruptedException {
//1. 创建生产者配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
properties.put("max.block.ms", "5000");
properties.put("retries", "3");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//2. 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
final CountDownLatch countDownLatch = new CountDownLatch(1);
//3. 发送数据
producer.send(new ProducerRecord<String, String>("test", "CallBack--1"), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.partition() + "==" + metadata.offset());
countDownLatch.countDown();
} else {
System.out.println("Error!!!");
exception.printStackTrace();
}
}
});
countDownLatch.await(60, TimeUnit.SECONDS);
//4. 关闭资源
producer.close();
}
我的Broker和Zookeeper都已经宕机,我希望日志中会出现"Error!!"三次,但在出现错误"org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 5000 ms"后什么也没发生。如何在Broker不可用时设置重试?
英文:
I have setup up Kafka using version 2.8.1 with the basic configuration as 1 Broker 1 Topic and 1 Partition.
I use the following code to send
public static void main(String[] args) throws InterruptedException {
//1. Create producer configuration information
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:19092");
properties.put("max.block.ms","5000");
properties.put("retries","3");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//2. Create producer object
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
final CountDownLatch countDownLatch = new CountDownLatch(1);
//3. Send data
producer.send(new ProducerRecord<String, String>("test","CallBack--1"), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null){
System.out.println(metadata.partition()+"=="+metadata.offset());
countDownLatch.countDown();
}else {
System.out.println("Error!!!");
exception.printStackTrace();
}
}
});
countDownLatch.await(60, TimeUnit.SECONDS);
//4. Close resources
producer.close();
}
Both my Broker & Zookeeper are down and I expect Error!!
to appear in the log 3 times, but nothing happens after the error org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 5000 ms
.How can I set up repeats when the broker is unavailable?
答案1
得分: 1
你需要编写自己的循环来执行此操作。如果代理宕机,TCP 请求将无法连接。生产者的配置 retries
用于处理发送数据本身的 瞬时 错误,而不是建立代理连接。
此外,请注意 2147483647
是默认的重试值,因此在这种情况下没有必要明确将其设置为较低的值。
英文:
You'd need to write your own loop to do this. The TCP request will fail to connect if the broker is down. The config retries
for a producer is for handling transient errors, on sending data itself, not establishing broker connections.
Also, note that 2147483647
is the default retries value, so there is no reason to explicitly set it to a lower value in this case.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论