英文:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
问题
我有一个docker-compose.yml文件,运行kafka服务器并创建kafka主题。从GitHub工作流中运行docker compose和测试。
docker-compose.yml
---
version: "3.8"
networks:
kafka-network:
driver: bridge
services:
kafka:
container_name: kafka
image: bitnami/kafka:latest
networks:
- kafka-network
environment:
KAFKA_BROKER_ID: 1
KAFKA_ENABLE_KRAFT: "true"
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9094
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,EXTERNAL://0.0.0.0:9093,CONTROLLER://:9094
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9093
KAFKA_CFG_BROKER_ID: 1
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CFG_MESSAGE_MAX_BYTES: 5242880
KAFKA_CFG_MAX_REQUEST_SIZE: 5242880
BITNAMI_DEBUG: "true"
KAFKA_CFG_DELETE_TOPIC_ENABLE: "true"
# create kafka raw topic
init-kafka:
image: bitnami/kafka:latest
networks:
- kafka-network
depends_on:
kafka:
condition: service_started
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics.sh --bootstrap-server kafka:9092 --list
echo -e 'Creating kafka topics'
kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic test-topic --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics.sh --bootstrap-server kafka:9092 --list
"
test.yml
name: Testing
on: [push]
jobs:
integration-tests:
runs-on: [self-hosted, Linux, docker, ubuntu]
container:
image: app-image:latest
volumes:
- /var/run/docker.sock:/var/run/docker.sock
timeout-minutes: 15
steps:
- uses: actions/checkout@v2
with:
fetch-depth: "0"
- name: Install missing system dependencies
run: apt-get update -qq && apt-get install -y make
- name: Log in
uses: docker/login-action@v1
- name: Build the stack
run: docker-compose up -d
- name: Run Integration Test
run: make integration-test
test_kafka.py
df.write.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("kafka.security.protocol", "PLAINTEXT")
.option("topic", "test-topic")
.save()
在make文件中,我运行命令来运行测试用例 -
pytest tests/integration/test_kafka.py
这个设置在本地工作正常,但GitHub工作流失败并给出错误`org.apache.kafka.common.KafkaException: Failed to construct kafka producer`
我还更改了`.option("kafka.bootstrap.servers", "localhost:9093")`,git报错
WARN NetworkClient:[Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9093) could not be established. Broker may not be available.
WARN NetworkClient: [Producer clientId=producer-1] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected
org.apache.kafka.common.errors.TimeoutException: Topic test-topic not present in metadata after 60000 ms.
我想在CI管道中在容器内运行测试。Kafka服务器正在运行,GitHub工作流也正在创建kafka主题,但通过Python连接失败。有人可以帮助我吗,或者我漏掉了什么吗?
提前感谢!
<details>
<summary>英文:</summary>
I have a docker-compose.yml file, running kafka server and creating kafka topic. running docker compose and tests from github workflow
docker-compose.yml
---
version: "3.8"
networks:
kafka-network:
driver: bridge
services:
kafka:
container_name: kafka
image: bitnami/kafka:latest
networks:
- kafka-network
environment:
KAFKA_BROKER_ID: 1
KAFKA_ENABLE_KRAFT: "true"
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9094
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,EXTERNAL://0.0.0.0:9093,CONTROLLER://:9094
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9093
KAFKA_CFG_BROKER_ID: 1
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CFG_MESSAGE_MAX_BYTES: 5242880
KAFKA_CFG_MAX_REQUEST_SIZE: 5242880
BITNAMI_DEBUG: "true"
KAFKA_CFG_DELETE_TOPIC_ENABLE: "true"
# create kafka raw topic
init-kafka:
image: bitnami/kafka:latest
networks:
- kafka-network
depends_on:
kafka:
condition: service_started
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics.sh --bootstrap-server kafka:9092 --list
echo -e 'Creating kafka topics'
kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic test-topic --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics.sh --bootstrap-server kafka:9092 --list
"
test.yml
name: Testing
on: [push]
jobs:
integration-tests:
runs-on: [self-hosted, Linux, docker, ubuntu]
container:
image: app-image:latest
volumes:
- /var/run/docker.sock:/var/run/docker.sock
timeout-minutes: 15
steps:
- uses: actions/checkout@v2
with:
fetch-depth: "0"
- name: Install missing system dependencies
run: apt-get update -qq && apt-get install -y make
- name: Log in
uses: docker/login-action@v1
- name: Build the stack
run: docker-compose up -d
- name: Run Integration Test
run: make integration-test
test_kafka.py
df.write.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("kafka.security.protocol", "PLAINTEXT")
.option("topic", "test-topic")
.save()
in make file i am running commage to run testcase -
pytest tests/integration/test_kafka.py
This setup works fine locally, but the github workflow fails and gives error `org.apache.kafka.common.KafkaException: Failed to construct kafka producer`
I also chnaged `.option("kafka.bootstrap.servers", "localhost:9093")`
git throws an error
WARN NetworkClient:[Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9093) could not be established. Broker may not be available.
WARN NetworkClient: [Producer clientId=producer-1] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected
org.apache.kafka.common.errors.TimeoutException: Topic test-topic not present in metadata after 60000 ms.
I want to run tests in the CI pipeline, inside a container.Kafka server is running and kafka topic is being created on github workflow as well but connecting through python failes. Can someone please help me or I am missing something here?
Thanks in advance!
</details>
# 答案1
**得分**: 1
在我的情况下,我遇到了相同的异常,根本原因是我为delivery.timeout.ms设置的值。
对于小于30秒的任何值,我都会收到异常。
以下配置解决了我的问题。
configProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 30000);
<details>
<summary>英文:</summary>
I got same exception, the root cause in my case was the value I set for delivery.timeout.ms.
I used get the exception for any-value lesser than 30sec.
Below config solved my issue.
configProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 30000);
</details>
# 答案2
**得分**: 0
您的Compose文件缺少`ports`定义,以将本地端口9093转发到代理。
<details>
<summary>英文:</summary>
Your compose file is missing `ports` definition to forward localhost port 9093 to the broker
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论