英文:
Not able to create a Kafka topic using Confluent while the broker is running in a docker container
问题
I am getting an error while trying to create a topic in the container.
Here is my docker-compose file:
kafka:
image: confluentinc/cp-kafka:latest
networks:
- kafka_network
depends_on:
- zookeeper
ports:
- 29092:29092
- 29093:29093
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://172.23.0.3:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
After running the docker container, I am running the following Python code on my local machine which should connect to the Kafka broker in the container:
def create_kafka_topic(bootstrap_servers, topic_name, num_partitions=1, replication_factor=1):
# Configure the admin client with the bootstrap servers
admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
# Create a NewTopic object with topic configuration
new_topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
# Create the topic
admin_client.create_topics([new_topic])
if __name__ == "__main__":
# Configure your Kafka broker address here
bootstrap_servers = "localhost:29092"
# Topic details
topic_name = config.TOPIC
num_partitions = config.NUM_PARTITIONS
replication_factor = 1
# Create the Kafka topic for transactions
create_kafka_topic(bootstrap_servers, topic_name_transactions, num_partitions, replication_factor)
But running the above code is giving the following output:
%6|1691170181.360|BGQUEUE|rdkafka#producer-1| [thrd:background]: Purging 1 unserved events from the background queue
I would really appreciate it if someone could help me figure it out.
Thanks a lot!
英文:
I am getting an error while trying to create a topic in the container.
Here is my docker-compose file:
kafka:
image: confluentinc/cp-kafka:latest
networks:
- kafka_network
depends_on:
- zookeeper
ports:
- 29092:29092
- 29093:29093
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://172.23.0.3:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
After running the docker container I am running the following Python code on my local machine which should connect to the Kafka broker in the container:
def create_kafka_topic(bootstrap_servers, topic_name, num_partitions=1, replication_factor=1):
# Configure the admin client with the bootstrap servers
admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
# Create a NewTopic object with topic configuration
new_topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
# Create the topic
admin_client.create_topics([new_topic])
if __name__ == "__main__":
# Configure your Kafka broker address here
bootstrap_servers = "localhost:29092"
# Topic details
topic_name = config.TOPIC
num_partitions = config.NUM_PARTITIONS
replication_factor = 1
# Create the Kafka topic for transactions
create_kafka_topic(bootstrap_servers, topic_name_transactions, num_partitions, replication_factor)
But running the above code is giving the following output:
%6|1691170181.360|BGQUEUE|rdkafka#producer-1| [thrd:background]: Purging 1 unserved events from background queue
I would really appreciate it if someone could help me figure it out.
Thanks a lot!
答案1
得分: 0
以下是您要求的中文翻译部分:
更新:- 我已经找到了我面临的问题!
docker-compose.yml
文件没问题,问题出在 Python 文件上。我尝试在交互式 Python shell 中逐行运行它,它可以工作,所以问题是创建 AdminClient
对象所花费的时间。因此,在创建对象之后添加 time.sleep(10)
是一个解决方案。一个更好的解决方案是创建一个函数,通过发送常量请求来检查是否已创建了 AdminClient,类似于这样更加健壮 -
def wait_for_admin_client(admin_client):
# 等待 admin 客户端连接到 Kafka 集群
while True:
metadata = admin_client.list_topics(timeout=5)
if metadata is not None:
break
admin_client.poll(0.1)
然后是这个 -
def create_kafka_topic(broker_name, topic_names, num_partitions=1, replication_factor=1):
# 使用引导服务器配置 admin 客户端
admin_client = AdminClient({
'bootstrap.servers': broker_name
})
# time.sleep(15)
wait_for_admin_client(admin_client)
# ... 其余部分保持不变
希望这对某人有所帮助!
英文:
Update:- I've figured out the issue I'm facing!
The docker-compose.yml
file is alright as it is, the problem was with the Python file. I tried running each line separately in an interactive Python shell and it worked, so the problem was the time it was taking to create the AdminClient
object. So, adding a time.sleep(10) right after creating the object is one solution. A better solution would be to create a function that checks if the AdminClient has been created by sending constant requests, something like this which is more robust -
def wait_for_admin_client(admin_client):
# Wait until the admin client is connected to the Kafka cluster
while True:
metadata = admin_client.list_topics(timeout=5)
if metadata is not None:
break
admin_client.poll(0.1)
And then this -
def create_kafka_topic(broker_name, topic_names, num_partitions=1, replication_factor=1):
# Configure the admin client with the bootstrap servers
admin_client = AdminClient({
'bootstrap.servers': broker_name
})
# time.sleep(15)
wait_for_admin_client(admin_client)
# ... rest remains the same
Hope this helps someone!
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论