org.apache.kafka.common.KafkaException: 构建 Kafka 生产者失败

huangapple go评论100阅读模式
英文:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

问题

我有一个docker-compose.yml文件,运行kafka服务器并创建kafka主题。从GitHub工作流中运行docker compose和测试。

docker-compose.yml
---
version: "3.8"
networks:
  kafka-network:
    driver: bridge

services:
  kafka:
    container_name: kafka
    image: bitnami/kafka:latest
    networks:
      - kafka-network
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ENABLE_KRAFT: "true"
      KAFKA_CFG_PROCESS_ROLES: broker,controller
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9094
      ALLOW_PLAINTEXT_LISTENER: "yes"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,EXTERNAL://0.0.0.0:9093,CONTROLLER://:9094
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9093
      KAFKA_CFG_BROKER_ID: 1
      KAFKA_CFG_NODE_ID: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_CFG_MESSAGE_MAX_BYTES: 5242880
      KAFKA_CFG_MAX_REQUEST_SIZE: 5242880
      BITNAMI_DEBUG: "true"
      KAFKA_CFG_DELETE_TOPIC_ENABLE: "true"

  # create kafka raw topic
  init-kafka:
    image: bitnami/kafka:latest
    networks:
      - kafka-network
    depends_on:
      kafka:
        condition: service_started
    entrypoint: [ '/bin/sh', '-c' ]
    command: |
      "
      # blocks until kafka is reachable
      kafka-topics.sh --bootstrap-server kafka:9092 --list

      echo -e 'Creating kafka topics'
      kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic test-topic --replication-factor 1 --partitions 1

      echo -e 'Successfully created the following topics:'
      kafka-topics.sh --bootstrap-server kafka:9092 --list
      "      

test.yml

name: Testing

on: [push]

jobs:
  integration-tests:
    runs-on: [self-hosted, Linux, docker, ubuntu]
    container:
      image: app-image:latest
      volumes:
        - /var/run/docker.sock:/var/run/docker.sock
    timeout-minutes: 15

    steps:
      - uses: actions/checkout@v2
        with:
          fetch-depth: "0"

      - name: Install missing system dependencies
        run: apt-get update -qq && apt-get install -y make

      - name: Log in
        uses: docker/login-action@v1

      - name: Build the stack
        run: docker-compose up -d

      - name: Run Integration Test
        run: make integration-test

test_kafka.py

df.write.format("kafka")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("kafka.security.protocol", "PLAINTEXT")
        .option("topic", "test-topic")
        .save()

在make文件中,我运行命令来运行测试用例 -

pytest tests/integration/test_kafka.py

这个设置在本地工作正常,但GitHub工作流失败并给出错误`org.apache.kafka.common.KafkaException: Failed to construct kafka producer`

我还更改了`.option("kafka.bootstrap.servers", "localhost:9093")`,git报错

WARN NetworkClient:[Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9093) could not be established. Broker may not be available.    
WARN NetworkClient: [Producer clientId=producer-1] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected
org.apache.kafka.common.errors.TimeoutException: Topic test-topic not present in metadata after 60000 ms.

我想在CI管道中在容器内运行测试。Kafka服务器正在运行,GitHub工作流也正在创建kafka主题,但通过Python连接失败。有人可以帮助我吗,或者我漏掉了什么吗?

提前感谢!

<details>
<summary>英文:</summary>

I have a docker-compose.yml file, running kafka server and creating kafka topic. running docker compose and tests from github workflow

docker-compose.yml  
---
    version: &quot;3.8&quot;
    networks:
      kafka-network:
        driver: bridge
    
    services:
      kafka:
        container_name: kafka
        image: bitnami/kafka:latest
        networks:
          - kafka-network
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ENABLE_KRAFT: &quot;true&quot;
          KAFKA_CFG_PROCESS_ROLES: broker,controller
          KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
          KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9094
          ALLOW_PLAINTEXT_LISTENER: &quot;yes&quot;
          KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,EXTERNAL://0.0.0.0:9093,CONTROLLER://:9094
          KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9093
          KAFKA_CFG_BROKER_ID: 1
          KAFKA_CFG_NODE_ID: 1
          KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: &quot;true&quot;
          KAFKA_CFG_MESSAGE_MAX_BYTES: 5242880
          KAFKA_CFG_MAX_REQUEST_SIZE: 5242880
          BITNAMI_DEBUG: &quot;true&quot;
          KAFKA_CFG_DELETE_TOPIC_ENABLE: &quot;true&quot;
        
      # create kafka raw topic
      init-kafka:
        image: bitnami/kafka:latest
        networks:
          - kafka-network
        depends_on:
          kafka:
            condition: service_started
        entrypoint:  [ &#39;/bin/sh&#39;, &#39;-c&#39; ]
        command: |          
          &quot;
          # blocks until kafka is reachable
          kafka-topics.sh --bootstrap-server kafka:9092 --list
    
          echo -e &#39;Creating kafka topics&#39;
          kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic test-topic --replication-factor 1 --partitions 1
    
          echo -e &#39;Successfully created the following topics:&#39;
          kafka-topics.sh --bootstrap-server kafka:9092 --list
          &quot;

test.yml

    name: Testing
    
    on: [push]
    
    jobs:
      integration-tests:
        runs-on: [self-hosted, Linux, docker, ubuntu]
        container:
          image: app-image:latest
          volumes:
              - /var/run/docker.sock:/var/run/docker.sock
        timeout-minutes: 15
    
        steps:
          - uses: actions/checkout@v2
            with:
              fetch-depth: &quot;0&quot;
    
          - name: Install missing system dependencies
            run: apt-get update -qq &amp;&amp; apt-get install -y make
    
          - name: Log in
            uses: docker/login-action@v1
           
          - name: Build the stack
            run: docker-compose up -d
    
          - name: Run Integration Test
            run: make integration-test

test_kafka.py

    df.write.format(&quot;kafka&quot;)
            .option(&quot;kafka.bootstrap.servers&quot;, &quot;kafka:9092&quot;)
            .option(&quot;kafka.security.protocol&quot;, &quot;PLAINTEXT&quot;)
            .option(&quot;topic&quot;, &quot;test-topic&quot;)
            .save()

in make file i am running commage to run testcase - 

    pytest tests/integration/test_kafka.py

This setup works fine locally, but the github workflow fails and gives error `org.apache.kafka.common.KafkaException: Failed to construct kafka producer` 

I also chnaged `.option(&quot;kafka.bootstrap.servers&quot;, &quot;localhost:9093&quot;)` 
git throws an error 

    WARN NetworkClient:[Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9093) could not be established. Broker may not be available.    
    WARN NetworkClient: [Producer clientId=producer-1] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected
        org.apache.kafka.common.errors.TimeoutException: Topic test-topic not present in metadata after 60000 ms.

I want to run tests in the CI pipeline, inside a container.Kafka server is running and kafka topic is being created on github workflow as well but connecting through python failes. Can someone please help me or I am missing something here? 

Thanks in advance!

</details>


# 答案1
**得分**: 1

在我的情况下,我遇到了相同的异常,根本原因是我为delivery.timeout.ms设置的值。

对于小于30秒的任何值,我都会收到异常。

以下配置解决了我的问题。

configProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 30000);

<details>
<summary>英文:</summary>

I got same exception, the root cause in my case was the value I set for delivery.timeout.ms.

I used get the exception for any-value lesser than 30sec.

Below config solved my issue.

configProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 30000);

</details>



# 答案2
**得分**: 0

您的Compose文件缺少`ports`定义,以将本地端口9093转发到代理。

<details>
<summary>英文:</summary>

Your compose file is missing `ports` definition to forward localhost port 9093 to the broker 

</details>



huangapple
  • 本文由 发表于 2023年5月29日 06:47:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/76353867.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定