英文:
How kafka producer works in asynchronous mode
问题
以下是您提供的代码的翻译部分:
我有一个简单的代码来发送消息
private static void send(KafkaProducer<String, String> producer) {
System.out.println(Thread.currentThread() + " before");
producer.send(new ProducerRecord<String, String>("test", "test"), (metadata, exception) -> {
if (exception == null) {
System.out.println(metadata.partition());
} else {
exception.printStackTrace();
}
});
System.out.println(Thread.currentThread() + " after");
}
和Kafka配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
我的Kafka已经关闭。
我期望如果我调用send两次
send(producer);
send(producer);
, 我将连续获得两条“Thread before”的记录,因为我们在异步模式下工作
Thread before
Thread before
但我只在第一条记录出现错误后才看到第二条记录的处理开始
Thread[main,5,main] before
org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
Thread[main,5,main] after
Thread[main,5,main] before
我认为我误解了Kafka的异步模式。
英文:
I have a simple code to send a message
private static void send(KafkaProducer<String, String> producer) {
System.out.println(Thread.currentThread()+" before");
producer.send(new ProducerRecord<String, String>("test", "test"), (metadata, exception) -> {
if (exception == null) {
System.out.println(metadata.partition());
} else {
exception.printStackTrace();
}
});
System.out.println(Thread.currentThread()+" after");
}
And kafka config
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
My kafka is shutdown.
I expected if I call send twice
send(producer);
send(producer);
, I will get two Thread before
records one after the other since we are working in asynchronous mode
Thread before
Thread before
But I see the start of processing the second record only after the error of the first
Thread[main,5,main] before
org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
Thread[main,5,main] after
Thread[main,5,main] before
I think I misunderstood kafka's asynchronous mode
答案1
得分: 1
第一个发送请求是阻塞的,用于创建主题。
另外,你的 after
应该在 lambda 回调体内。
英文:
The first send request is blocking to create the topic.
Also, your after
should be within the lamdba callback body.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论