为什么我的Kafka消费者在我寻求到另一个位置时会得到一个ValueError?

huangapple go评论58阅读模式
英文:

Why do I get a ValueError in my Kafka Consumer if I seek to another position?

问题

I'll provide a translation of the code and relevant text without the code part. Here's the translated content:

我正在使用 Python 3.9.16 和 kafka-python 版本 2.0.2。我运行在我的 Macbook Pro 上,操作系统是 IOS 11.6.5。

我还在学习 Kafka,所以完全有可能我正在以错误的方式进行操作。

我尝试的是在消费者中测试偏移量的查找,以防某些消息没有被处理,我需要返回并重新读取消息。

无论如何,我一直遇到这个错误消息。我甚至不确定为什么会发生,因为有时我可以处理偏移量,并且一切正常,而其他时候,它会给我这个消息:

ValueError: 在尝试转换值 b'' 为结构格式时遇到错误: '<built-in method unpack of _struct.Struct object at 0x10bb669f0>', 出错信息: unpack 需要一个包含 4 字节的缓冲区

当它工作时,我可以在 pdb 中看到这个,这在某种程度上证明了这些值存在于主题中以供我消费:

(Pdb)
> /Users/username/kafka/tkCons.py(41)<module>()
-> print("{}, {}".format(blah.offset, blah.value))
(Pdb)
10, b'{"number": 10}'
> /Users/username/kafka/tkCons.py(40)<module>()
-> for blah in consumer:
(Pdb)

我希望我能够在测试期间缩小范围,但是我无法确定我添加/注释哪些代码行会使其工作或导致上述错误。由于我不确定底层发生了什么,是我在某种程度上对 ZooKeeper 的某些操作影响到了什么?我需要做什么来使底层的东西保持正常?以下是我的代码,如果有关系的话。

from kafka import KafkaConsumer, TopicPartition

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(#'my-topic333', 'my-topic222', 'my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])

myTP = TopicPartition('my-topic333', 0)
import pdb 
pdb.set_trace()

consumer.assign([myTP])
print ("this is the consumer assignment: {}".format(consumer.assignment()))
print ("before this is my position: {} ".format(consumer.position(myTP)))
consumer.seek(myTP, 50)
#consumer.seek_to_beginning()   
print ("after seeking this is my position: {} ".format(consumer.position(myTP)))

for blah in consumer:
    print ("{}, {}".format(blah.offset, blah.value))

Is there anything else you would like to know or clarify?

英文:

I'm using python 3.9.16 and kafka-python version 2.0.2. I'm running on my Macbook Pro IOS 11.6.5.

I'm still getting my feet wet with Kafka so it's entirely possible I'm doing things the wrong way.

What I'm trying to do is test seeking to offsets with my consumer in case something doesn't get processed and I have to go back and re-read a message.

Anyway, I keep running into this error message. I'm not even sure why it's happening because sometimes I can process the offset and it works fine, other times, it gives me this message:

ValueError: Error encountered when attempting to convert value: b&#39;&#39; to struct format: &#39;&lt;built-in method unpack of _struct.Struct object at 0x10bb669f0&gt;&#39;, hit error: unpack requires a buffer of 4 bytes

When it's working, I can see this in pdb, which kinda proves that the values are present in the topic for me to consume:

(Pdb) 
&gt; /Users/username/kafka/tkCons.py(41)&lt;module&gt;()
-&gt; print (&quot;{}, {}&quot;.format(blah.offset, blah.value))
(Pdb) 
10, b&#39;{&quot;number&quot;: 10}&#39;
&gt; /Users/username/kafka/tkCons.py(40)&lt;module&gt;()
-&gt; for blah in consumer:
(Pdb) 

I wish I could narrow down what I'm doing during testing but I can't pin down what lines of code I added/commented out helps make it work or makes it give me the above error.
Since I'm not 100% sure what's happening under the hood, is me seeking around somehow affecting something in zookeeper? What do I need to do to make whatever under the hood stuff happy? Here's my code in case it matters.

from kafka import KafkaConsumer, TopicPartition

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(#&#39;my-topic333&#39;, &#39;my-topic222&#39;, &#39;my-topic&#39;,
                         group_id=&#39;my-group&#39;,
                         bootstrap_servers=[&#39;localhost:9092&#39;])

myTP = TopicPartition(&#39;my-topic333&#39;, 0)
import pdb 
pdb.set_trace()

consumer.assign([myTP])
print (&quot;this is the consumer assignment: {}&quot;.format(consumer.assignment()))
print (&quot;before this is my position: {} &quot;.format(consumer.position(myTP)))
consumer.seek(myTP, 50)
#consumer.seek_to_beginning()   
print (&quot;after seeking this is my position: {} &quot;.format(consumer.position(myTP)))

for blah in consumer:
    print (&quot;{}, {}&quot;.format(blah.offset, blah.value))

答案1

得分: 1

首先,blah.value 可能为 None,但应该打印 None,而不是引发与反序列化相关的 ValueError... 您需要显示完整的堆栈跟踪,并将偏移量打印在与值分开的单独一行上,以便您可以看到错误实际发生的位置,或查看包括先前成功偏移量的任何日志...

如果某些内容未被处理,我需要回去重新读一条消息

我不建议使用 seek 来实现这一点。

相反,让一个失败的处理器引发致命异常,并停止您的 Python 进程。然后,手动处理任何成功处理的数据的偏移量提交(或者您愿意容忍重复的偏移量批次,假设您以幂等的方式处理它们),也就是禁用自动偏移量提交。然后,当您这样做并重新启动消费者组时,应用程序将在上次成功处理的偏移量之后自动继续运行,无需手动寻找。

我在某种程度上寻找是否会影响到 ZooKeeper 中的某些内容吗?

Assign API 不使用消费者组,除非您还使用 subscribe 和 commit 函数。

英文:

Firstly, blah.value could be None, but it should print None, then, not raise a ValueError related to deserialization... You need to show your full stacktrace and print the offset on a separate line from the value so that you can see where the error is actually happening, or look at any logs that include the previous successful offset...

> in case something doesn't get processed and I have to go back and re-read a message

I'd not suggest using seek for this.

Instead, make a failed processor raise a fatal exception, and stop your Python process. Then, handle offset commits manually for any successfully processed data (or batch of offsets you're willing to tolerate duplicates for, assuming you're processing them in an idempotent way), meaning also disable auto offset commits. Then, when you do this, and restart the consumer group, the app will automatically pick back up after the last successfully processed offset, and no manual seeking is required.

> is me seeking around somehow affecting something in zookeeper?

Assign API does not use consumer groups, so not unless you also use subscribe and commit functions.

huangapple
  • 本文由 发表于 2023年6月8日 10:41:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76428259.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定