Unable to create a Kafka topic using Confluent while the broker is running in a docker container.

huangapple go评论96阅读模式
英文:

Not able to create a Kafka topic using Confluent while the broker is running in a docker container

问题

I am getting an error while trying to create a topic in the container.

Here is my docker-compose file:

kafka:
  image: confluentinc/cp-kafka:latest
  networks:
    - kafka_network
  depends_on:
    - zookeeper
  ports:
    - 29092:29092
    - 29093:29093
  environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092
    KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://172.23.0.3:29093
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

After running the docker container, I am running the following Python code on my local machine which should connect to the Kafka broker in the container:

def create_kafka_topic(bootstrap_servers, topic_name, num_partitions=1, replication_factor=1):
    # Configure the admin client with the bootstrap servers
    admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})

    # Create a NewTopic object with topic configuration
    new_topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)

    # Create the topic
    admin_client.create_topics([new_topic])

if __name__ == "__main__":
    # Configure your Kafka broker address here
    bootstrap_servers = "localhost:29092"

    # Topic details
    topic_name = config.TOPIC
    num_partitions = config.NUM_PARTITIONS
    replication_factor = 1

    # Create the Kafka topic for transactions
    create_kafka_topic(bootstrap_servers, topic_name_transactions, num_partitions, replication_factor)

But running the above code is giving the following output:

%6|1691170181.360|BGQUEUE|rdkafka#producer-1| [thrd:background]: Purging 1 unserved events from the background queue

I would really appreciate it if someone could help me figure it out.
Thanks a lot!

英文:

I am getting an error while trying to create a topic in the container.

Here is my docker-compose file:

  kafka:
    image: confluentinc/cp-kafka:latest
    networks:
      - kafka_network
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
      - 29093:29093
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://172.23.0.3:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

After running the docker container I am running the following Python code on my local machine which should connect to the Kafka broker in the container:

def create_kafka_topic(bootstrap_servers, topic_name, num_partitions=1, replication_factor=1):
    # Configure the admin client with the bootstrap servers
    admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})

    # Create a NewTopic object with topic configuration
    new_topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)

    # Create the topic
    admin_client.create_topics([new_topic])

if __name__ == "__main__":
    # Configure your Kafka broker address here
    bootstrap_servers = "localhost:29092"

    # Topic details
    topic_name = config.TOPIC
    num_partitions = config.NUM_PARTITIONS
    replication_factor = 1
    
    # Create the Kafka topic for transactions
    create_kafka_topic(bootstrap_servers, topic_name_transactions, num_partitions, replication_factor)

But running the above code is giving the following output:

%6|1691170181.360|BGQUEUE|rdkafka#producer-1| [thrd:background]: Purging 1 unserved events from background queue

I would really appreciate it if someone could help me figure it out.
Thanks a lot!

答案1

得分: 0

以下是您要求的中文翻译部分:

更新:- 我已经找到了我面临的问题!
docker-compose.yml 文件没问题,问题出在 Python 文件上。我尝试在交互式 Python shell 中逐行运行它,它可以工作,所以问题是创建 AdminClient 对象所花费的时间。因此,在创建对象之后添加 time.sleep(10) 是一个解决方案。一个更好的解决方案是创建一个函数,通过发送常量请求来检查是否已创建了 AdminClient,类似于这样更加健壮 -

def wait_for_admin_client(admin_client):
    # 等待 admin 客户端连接到 Kafka 集群
    while True:
        metadata = admin_client.list_topics(timeout=5)
        if metadata is not None:
            break
        admin_client.poll(0.1)

然后是这个 -

def create_kafka_topic(broker_name, topic_names, num_partitions=1, replication_factor=1):
    # 使用引导服务器配置 admin 客户端
    admin_client = AdminClient({
        'bootstrap.servers': broker_name
    })

    # time.sleep(15)
    wait_for_admin_client(admin_client)
    # ... 其余部分保持不变

希望这对某人有所帮助!

英文:

Update:- I've figured out the issue I'm facing!
The docker-compose.yml file is alright as it is, the problem was with the Python file. I tried running each line separately in an interactive Python shell and it worked, so the problem was the time it was taking to create the AdminClient object. So, adding a time.sleep(10) right after creating the object is one solution. A better solution would be to create a function that checks if the AdminClient has been created by sending constant requests, something like this which is more robust -

def wait_for_admin_client(admin_client):
    # Wait until the admin client is connected to the Kafka cluster
    while True:
        metadata = admin_client.list_topics(timeout=5)
        if metadata is not None:
            break
        admin_client.poll(0.1)

And then this -

def create_kafka_topic(broker_name, topic_names, num_partitions=1, replication_factor=1):
    # Configure the admin client with the bootstrap servers
    admin_client = AdminClient({
        'bootstrap.servers': broker_name
    })

    # time.sleep(15)
    wait_for_admin_client(admin_client)
    # ... rest remains the same

Hope this helps someone!

huangapple
  • 本文由 发表于 2023年8月5日 01:43:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/76838125.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定