英文:
python kafka consumer.poll msg.error dict object has no attribute error
问题
我试图使用消费者轮询设置Kafka消息轮询。我可以看到很多示例代码使用了message.error
,例如:
while True:
try:
message = consumer.poll(10.0)
if not message:
time.sleep(120) # 休眠2分钟
if message.error():
print(f"消费者错误: {message.error()}")
continue
print(f"接收到的消息: {message.value().decode('utf-8')}")
except:
# 处理任何异常
但是当我尝试运行这些代码时,我收到错误AttributeError: 'dict' object has no attribute 'error'
,指的是代码行if message.error():
。这是版本问题吗?我不明白为什么有这么多示例代码片段,但实际上不起作用。我假设message.value()
也会有问题。
注意,我使用的是Kafka Zookeeper而不是Kafka Confluent。
英文:
I am trying to set up kafka message polling using consumer poll.
I can see loads of example code that uses message.error
e.g.
while True:
try:
message = consumer.poll(10.0)
if not message:
time.sleep(120) # Sleep for 2 minutes
if message.error():
print(f"Consumer error: {message.error()}")
continue
print(f"Received message: {message.value().decode('utf-8')}")
except:
# Handle any exception here
But when I try these I get error ...
AttributeError: 'dict' object has no attribute 'error' referring to the code line "if message.error():"
Is this a version issue or something ?
I don't understand why there are so many example code snippets with this code but it doesn't actually work ?
I assume message.value() will have a problem as well.
Note I am using kafka zookeeper NOT kafka confluent.
Copied code and ran it.
答案1
得分: 1
consumer.poll()
方法不仅仅返回记录列表,它返回一个字典,其中键是分区,值是该分区中的记录列表(来源)。您的错误是因为您试图循环遍历该字典,而您的意图是仅循环遍历字典中的值为记录的部分。
处理消息的常用接口是双重循环:
for topic_partition, records in messages.items():
for record in records:
英文:
The consumer.poll()
method doesn't just return list of records. It returns a dict
where keys are the partition, and values are the list of records in that partition (source). Your error arises because you're trying to loop over that dict, whereas your intention is to loop over only the records who are values in the dict.
A commonly used interface for handling the messages is to double-loop:
for topic_partition, records in messages.items():
for record in records:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论