Python Kafka 消费者.poll 消息错误 字典对象没有属性错误。

huangapple go评论68阅读模式
英文:

python kafka consumer.poll msg.error dict object has no attribute error

问题

我试图使用消费者轮询设置Kafka消息轮询。我可以看到很多示例代码使用了message.error,例如:

while True:
    try:
        message = consumer.poll(10.0)

        if not message:
            time.sleep(120) # 休眠2分钟

        if message.error():
            print(f"消费者错误: {message.error()}")
            continue

        print(f"接收到的消息: {message.value().decode('utf-8')}")
    except:
        # 处理任何异常

但是当我尝试运行这些代码时,我收到错误AttributeError: 'dict' object has no attribute 'error',指的是代码行if message.error():。这是版本问题吗?我不明白为什么有这么多示例代码片段,但实际上不起作用。我假设message.value() 也会有问题。

注意,我使用的是Kafka Zookeeper而不是Kafka Confluent。

英文:

I am trying to set up kafka message polling using consumer poll.
I can see loads of example code that uses message.error
e.g.

while True:
    try: 
        message = consumer.poll(10.0)

        if not message:
            time.sleep(120) # Sleep for 2 minutes

        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        print(f"Received message: {message.value().decode('utf-8')}")
    except:
        # Handle any exception here

But when I try these I get error ...
AttributeError: 'dict' object has no attribute 'error' referring to the code line "if message.error():"

Is this a version issue or something ?
I don't understand why there are so many example code snippets with this code but it doesn't actually work ?
I assume message.value() will have a problem as well.

Note I am using kafka zookeeper NOT kafka confluent.

Copied code and ran it.

答案1

得分: 1

consumer.poll() 方法不仅仅返回记录列表,它返回一个字典,其中键是分区,值是该分区中的记录列表(来源)。您的错误是因为您试图循环遍历该字典,而您的意图是仅循环遍历字典中的值为记录的部分。

处理消息的常用接口是双重循环:

for topic_partition, records in messages.items():
    for record in records:
英文:

The consumer.poll() method doesn't just return list of records. It returns a dict where keys are the partition, and values are the list of records in that partition (source). Your error arises because you're trying to loop over that dict, whereas your intention is to loop over only the records who are values in the dict.

A commonly used interface for handling the messages is to double-loop:

for topic_partition, records in messages.items():
    for record in records:

huangapple
  • 本文由 发表于 2023年5月17日 17:11:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/76270406.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定