Unable to create a Kafka topic using Confluent while the broker is running in a docker container.

huangapple go评论118阅读模式
英文:

Not able to create a Kafka topic using Confluent while the broker is running in a docker container

问题

I am getting an error while trying to create a topic in the container.

Here is my docker-compose file:

  1. kafka:
  2. image: confluentinc/cp-kafka:latest
  3. networks:
  4. - kafka_network
  5. depends_on:
  6. - zookeeper
  7. ports:
  8. - 29092:29092
  9. - 29093:29093
  10. environment:
  11. KAFKA_BROKER_ID: 1
  12. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  13. KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092
  14. KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://172.23.0.3:29093
  15. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT
  16. KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
  17. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

After running the docker container, I am running the following Python code on my local machine which should connect to the Kafka broker in the container:

  1. def create_kafka_topic(bootstrap_servers, topic_name, num_partitions=1, replication_factor=1):
  2. # Configure the admin client with the bootstrap servers
  3. admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
  4. # Create a NewTopic object with topic configuration
  5. new_topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
  6. # Create the topic
  7. admin_client.create_topics([new_topic])
  8. if __name__ == "__main__":
  9. # Configure your Kafka broker address here
  10. bootstrap_servers = "localhost:29092"
  11. # Topic details
  12. topic_name = config.TOPIC
  13. num_partitions = config.NUM_PARTITIONS
  14. replication_factor = 1
  15. # Create the Kafka topic for transactions
  16. create_kafka_topic(bootstrap_servers, topic_name_transactions, num_partitions, replication_factor)

But running the above code is giving the following output:

%6|1691170181.360|BGQUEUE|rdkafka#producer-1| [thrd:background]: Purging 1 unserved events from the background queue

I would really appreciate it if someone could help me figure it out.
Thanks a lot!

英文:

I am getting an error while trying to create a topic in the container.

Here is my docker-compose file:

  1. kafka:
  2. image: confluentinc/cp-kafka:latest
  3. networks:
  4. - kafka_network
  5. depends_on:
  6. - zookeeper
  7. ports:
  8. - 29092:29092
  9. - 29093:29093
  10. environment:
  11. KAFKA_BROKER_ID: 1
  12. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  13. KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092
  14. KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://172.23.0.3:29093
  15. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT
  16. KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
  17. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

After running the docker container I am running the following Python code on my local machine which should connect to the Kafka broker in the container:

  1. def create_kafka_topic(bootstrap_servers, topic_name, num_partitions=1, replication_factor=1):
  2. # Configure the admin client with the bootstrap servers
  3. admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
  4. # Create a NewTopic object with topic configuration
  5. new_topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
  6. # Create the topic
  7. admin_client.create_topics([new_topic])
  8. if __name__ == "__main__":
  9. # Configure your Kafka broker address here
  10. bootstrap_servers = "localhost:29092"
  11. # Topic details
  12. topic_name = config.TOPIC
  13. num_partitions = config.NUM_PARTITIONS
  14. replication_factor = 1
  15. # Create the Kafka topic for transactions
  16. create_kafka_topic(bootstrap_servers, topic_name_transactions, num_partitions, replication_factor)

But running the above code is giving the following output:

%6|1691170181.360|BGQUEUE|rdkafka#producer-1| [thrd:background]: Purging 1 unserved events from background queue

I would really appreciate it if someone could help me figure it out.
Thanks a lot!

答案1

得分: 0

以下是您要求的中文翻译部分:

更新:- 我已经找到了我面临的问题!
docker-compose.yml 文件没问题,问题出在 Python 文件上。我尝试在交互式 Python shell 中逐行运行它,它可以工作,所以问题是创建 AdminClient 对象所花费的时间。因此,在创建对象之后添加 time.sleep(10) 是一个解决方案。一个更好的解决方案是创建一个函数,通过发送常量请求来检查是否已创建了 AdminClient,类似于这样更加健壮 -

  1. def wait_for_admin_client(admin_client):
  2. # 等待 admin 客户端连接到 Kafka 集群
  3. while True:
  4. metadata = admin_client.list_topics(timeout=5)
  5. if metadata is not None:
  6. break
  7. admin_client.poll(0.1)

然后是这个 -

  1. def create_kafka_topic(broker_name, topic_names, num_partitions=1, replication_factor=1):
  2. # 使用引导服务器配置 admin 客户端
  3. admin_client = AdminClient({
  4. 'bootstrap.servers': broker_name
  5. })
  6. # time.sleep(15)
  7. wait_for_admin_client(admin_client)
  8. # ... 其余部分保持不变

希望这对某人有所帮助!

英文:

Update:- I've figured out the issue I'm facing!
The docker-compose.yml file is alright as it is, the problem was with the Python file. I tried running each line separately in an interactive Python shell and it worked, so the problem was the time it was taking to create the AdminClient object. So, adding a time.sleep(10) right after creating the object is one solution. A better solution would be to create a function that checks if the AdminClient has been created by sending constant requests, something like this which is more robust -

  1. def wait_for_admin_client(admin_client):
  2. # Wait until the admin client is connected to the Kafka cluster
  3. while True:
  4. metadata = admin_client.list_topics(timeout=5)
  5. if metadata is not None:
  6. break
  7. admin_client.poll(0.1)

And then this -

  1. def create_kafka_topic(broker_name, topic_names, num_partitions=1, replication_factor=1):
  2. # Configure the admin client with the bootstrap servers
  3. admin_client = AdminClient({
  4. 'bootstrap.servers': broker_name
  5. })
  6. # time.sleep(15)
  7. wait_for_admin_client(admin_client)
  8. # ... rest remains the same

Hope this helps someone!

huangapple
  • 本文由 发表于 2023年8月5日 01:43:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/76838125.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定