Kafka生产者在异步模式下的工作原理是如何的

huangapple go评论114阅读模式
英文:

How kafka producer works in asynchronous mode

问题

以下是您提供的代码的翻译部分:

我有一个简单的代码来发送消息

  1. private static void send(KafkaProducer<String, String> producer) {
  2. System.out.println(Thread.currentThread() + " before");
  3. producer.send(new ProducerRecord<String, String>("test", "test"), (metadata, exception) -> {
  4. if (exception == null) {
  5. System.out.println(metadata.partition());
  6. } else {
  7. exception.printStackTrace();
  8. }
  9. });
  10. System.out.println(Thread.currentThread() + " after");
  11. }

和Kafka配置

  1. Properties properties = new Properties();
  2. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
  3. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  4. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  5. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

我的Kafka已经关闭。

我期望如果我调用send两次

  1. send(producer);
  2. send(producer);

, 我将连续获得两条“Thread before”的记录,因为我们在异步模式下工作

  1. Thread before
  2. Thread before

但我只在第一条记录出现错误后才看到第二条记录的处理开始

  1. Thread[main,5,main] before
  2. org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
  3. Thread[main,5,main] after
  4. Thread[main,5,main] before

我认为我误解了Kafka的异步模式。

英文:

I have a simple code to send a message

  1. private static void send(KafkaProducer&lt;String, String&gt; producer) {
  2. System.out.println(Thread.currentThread()+&quot; before&quot;);
  3. producer.send(new ProducerRecord&lt;String, String&gt;(&quot;test&quot;, &quot;test&quot;), (metadata, exception) -&gt; {
  4. if (exception == null) {
  5. System.out.println(metadata.partition());
  6. } else {
  7. exception.printStackTrace();
  8. }
  9. });
  10. System.out.println(Thread.currentThread()+&quot; after&quot;);
  11. }

And kafka config

  1. Properties properties = new Properties();
  2. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9091&quot;);
  3. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);
  4. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);
  5. KafkaProducer&lt;String, String&gt; producer = new KafkaProducer&lt;&gt;(properties);

My kafka is shutdown.

I expected if I call send twice

  1. send(producer);
  2. send(producer);

, I will get two Thread before records one after the other since we are working in asynchronous mode

  1. Thread before
  2. Thread before

But I see the start of processing the second record only after the error of the first

  1. Thread[main,5,main] before
  2. org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
  3. Thread[main,5,main] after
  4. Thread[main,5,main] before

I think I misunderstood kafka's asynchronous mode

答案1

得分: 1

第一个发送请求是阻塞的,用于创建主题。

另外,你的 after 应该在 lambda 回调体内。

英文:

The first send request is blocking to create the topic.

Also, your after should be within the lamdba callback body.

huangapple
  • 本文由 发表于 2023年3月7日 07:08:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/75656643.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定