Kafka生产者在异步模式下的工作原理是如何的

huangapple go评论79阅读模式
英文:

How kafka producer works in asynchronous mode

问题

以下是您提供的代码的翻译部分:

我有一个简单的代码来发送消息

private static void send(KafkaProducer<String, String> producer) {
    System.out.println(Thread.currentThread() + " before");
    producer.send(new ProducerRecord<String, String>("test", "test"), (metadata, exception) -> {
        if (exception == null) {
            System.out.println(metadata.partition());
        } else {
            exception.printStackTrace();
        }
    });
    System.out.println(Thread.currentThread() + " after");
}

和Kafka配置

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

我的Kafka已经关闭。

我期望如果我调用send两次

send(producer);
send(producer);

, 我将连续获得两条“Thread before”的记录,因为我们在异步模式下工作

Thread before
Thread before

但我只在第一条记录出现错误后才看到第二条记录的处理开始

Thread[main,5,main] before
org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
Thread[main,5,main] after
Thread[main,5,main] before

我认为我误解了Kafka的异步模式。

英文:

I have a simple code to send a message

    private static void send(KafkaProducer&lt;String, String&gt; producer) {
        System.out.println(Thread.currentThread()+&quot; before&quot;);
        producer.send(new ProducerRecord&lt;String, String&gt;(&quot;test&quot;, &quot;test&quot;), (metadata, exception) -&gt; {
            if (exception == null) {
                System.out.println(metadata.partition());
            } else {
                exception.printStackTrace();
            }
        });
        System.out.println(Thread.currentThread()+&quot; after&quot;);
    }

And kafka config

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9091&quot;);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, &quot;org.apache.kafka.common.serialization.StringSerializer&quot;);

        KafkaProducer&lt;String, String&gt; producer = new KafkaProducer&lt;&gt;(properties);

My kafka is shutdown.

I expected if I call send twice

        send(producer);
        send(producer);

, I will get two Thread before records one after the other since we are working in asynchronous mode

Thread before
Thread before

But I see the start of processing the second record only after the error of the first

Thread[main,5,main] before
org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
Thread[main,5,main] after
Thread[main,5,main] before

I think I misunderstood kafka's asynchronous mode

答案1

得分: 1

第一个发送请求是阻塞的,用于创建主题。

另外,你的 after 应该在 lambda 回调体内。

英文:

The first send request is blocking to create the topic.

Also, your after should be within the lamdba callback body.

huangapple
  • 本文由 发表于 2023年3月7日 07:08:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/75656643.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定