Kafka Java Producer 无法将消息发送到 Kafka 实例。

huangapple go评论94阅读模式
英文:

Kafka Java Producer is not able to send message to kafka instance

问题

我正在运行一个在Docker容器中的Kafka实例,使用以下docker-compose.yml文件。

  1. version: "3"
  2. services:
  3. zookeeper:
  4. image: 'bitnami/zookeeper:latest'
  5. ports:
  6. - '2181:2181'
  7. environment:
  8. - ALLOW_ANONYMOUS_LOGIN=yes
  9. kafka:
  10. image: 'bitnami/kafka:latest'
  11. ports:
  12. - '9092:9092'
  13. - '9093:9093'
  14. environment:
  15. - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
  16. - ALLOW_PLAINTEXT_LISTENER=yes
  17. - KAFKA_BROKER_ID=1
  18. - KAFKA_CREATE_TOPICS="topic_name:1:3"
  19. - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
  20. - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
  21. - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://127.0.0.1:9092,EXTERNAL://127.0.0.1:9093
  22. - KAFKA_INTER_BROKER_LISTENER_NAME=EXTERNAL
  23. depends_on:
  24. - zookeeper

它运行得相当好。我通过使用kafkacat发送数据来测试Kafka,没有问题,我能够通过Kafka消费者接收数据。请检查以下kafkacat命令。

  1. kafkacat -P -b 127.0.0.1:9092 -t topic_name
  2. kafkacat -C -b 127.0.0.1:9092 -t topic_name

然而,当我尝试使用Java生产者代码发送数据时,我无法从kafkacat消费者那里接收到数据。请检查下面的Java生产者代码。我想听听您的建议?提前感谢。

  1. public class DataProducer {
  2. public static void main(String[] args) {
  3. KafkaTemplate<String,String> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(kafkaConfig()));
  4. kafkaTemplate.send("topic_name", "test");
  5. }
  6. public static Map<String, Object> kafkaConfig() {
  7. Map<String, Object> props = new HashMap<>();
  8. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  9. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  10. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  11. return props;
  12. }}

此外,以下是包含元数据的kafkacat命令输出。

  1. kafkacat -b 127.0.0.1:9092 -L
  2. Metadata for all topics (from broker 1: 127.0.0.1:9092/1):
  3. 1 brokers:
  4. broker 1 at 127.0.0.1:9092 (controller)
  5. 2 topics:
  6. topic "topic_name" with 1 partitions:
  7. partition 0, leader 1, replicas: 1, isrs: 1
  8. topic "__consumer_offsets" with 50 partitions:
  9. partition 0, leader 1, replicas: 1, isrs: 1
  10. partition 1, leader 1, replicas: 1, isrs: 1
  11. ..
  12. partition 48, leader 1, replicas: 1, isrs: 1
  13. partition 49, leader 1, replicas: 1, isrs: 1
英文:

I am running a kafka instance in a docker container with following docker-compose.yml file.

  1. version: &quot;3&quot;
  2. services:
  3. zookeeper:
  4. image: &#39;bitnami/zookeeper:latest&#39;
  5. ports:
  6. - &#39;2181:2181&#39;
  7. environment:
  8. - ALLOW_ANONYMOUS_LOGIN=yes
  9. kafka:
  10. image: &#39;bitnami/kafka:latest&#39;
  11. ports:
  12. - &#39;9092:9092&#39;
  13. - &#39;9093:9093&#39;
  14. environment:
  15. - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
  16. - ALLOW_PLAINTEXT_LISTENER=yes
  17. - KAFKA_BROKER_ID=1
  18. - KAFKA_CREATE_TOPICS=&quot;topic_name:1:3&quot;
  19. - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
  20. - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
  21. - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://127.0.0.1:9092,EXTERNAL://127.0.0.1:9093
  22. - KAFKA_INTER_BROKER_LISTENER_NAME=EXTERNAL
  23. depends_on:
  24. - zookeeper

It is running pretty much good. I tested kafka by sending data via kafkacat. no problem i am able to receive the data via kafka consumer. Please check following kafkacat commands.

  1. kafkacat -P -b 127.0.0.1:9092 -t topic_name
  2. kafkacat -C -b 127.0.0.1:9092 -t topic_name

However when i tried to send it by java producer code, I am not able to receive from kafkacat consumer. Please check java producer code below. I would like to hear your suggestions? Thanks in advance

  1. public class DataProducer {
  2. public static void main(String[] args) {
  3. KafkaTemplate&lt;String,String&gt; kafkaTemplate = new KafkaTemplate&lt;&gt;(new DefaultKafkaProducerFactory&lt;&gt;(kafkaConfig()));
  4. kafkaTemplate.send(&quot;topic_name&quot;, &quot;test&quot;);
  5. }
  6. public static Map&lt;String, Object&gt; kafkaConfig() {
  7. Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
  8. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;127.0.0.1:9092&quot;);
  9. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  10. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  11. return props;
  12. }}

Adding also metadata following kafkacat command output.

  1. kafkacat -b 127.0.0.1:9092 -L
  2. Metadata for all topics (from broker 1: 127.0.0.1:9092/1):
  3. 1 brokers:
  4. broker 1 at 127.0.0.1:9092 (controller)
  5. 2 topics:
  6. topic &quot;topic_name&quot; with 1 partitions:
  7. partition 0, leader 1, replicas: 1, isrs: 1
  8. topic &quot;__consumer_offsets&quot; with 50 partitions:
  9. partition 0, leader 1, replicas: 1, isrs: 1
  10. partition 1, leader 1, replicas: 1, isrs: 1
  11. ..
  12. partition 48, leader 1, replicas: 1, isrs: 1
  13. partition 49, leader 1, replicas: 1, isrs: 1

答案1

得分: 3

经纪人配置似乎正常,因为您收到了正确的元数据。

我认为问题出在您的代码中。kafkaTemplate.send() 是一个异步操作,很可能在生产者成功发送消息之前,您的进程已经结束。尝试在发送方法中添加 .get() 以强制它成为同步操作。

  1. kafkaTemplate.send("topic_name", "test").get();
英文:

Broker configuration seems to be fine since you get back the correct metadata.

I think the problem is in your code. kafkaTemplate.send() is an asynchronous operation and most likely your process ends before the producer manages to actually send the message. Try adding a .get() to that send method to force it in being synchronous.

  1. kafkaTemplate.send(&quot;topic_name&quot;, &quot;test&quot;).get();

答案2

得分: 0

我遇到过类似的问题。很奇怪,但将 kafka-clients 版本的Maven依赖从latest(2.6.0)更改为2.0.0(或升级到2.5.0)对我有帮助。

英文:

I had similar issue. It's strange, but change in maven dependency of kafka-clients version from latest(2.6.0) to 2.0.0 (or up to 2.5.0) helped me.

huangapple
  • 本文由 发表于 2020年8月10日 04:59:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/63331265.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定