如何在Kafka消费者中移动到特定偏移量,而不会遇到ValueError?

huangapple go评论74阅读模式
英文:

How do I move to a specific offset in a Kafka consumer without running into a ValueError?

问题

I'm using python 3.9.16 and kafka-python version 2.0.2. I'm running on my Macbook Pro IOS 11.6.5.

New to Kafka and just playing around with it for now. I'm not sure what the issue is and I'm not sure why my workaround works.

What I'm trying to do is seek to a specific offset on the topic but I routinely run into a ValueError.

This is the code I have.

from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])

myTP = TopicPartition('my-topic', 0)

consumer.assign([myTP])
print("this is the consumer assignment: {}".format(consumer.assignment()))
consumer.seek(myTP, 22) 

for blah in consumer:
    print("{}, {}".format(blah.offset, blah.value))

So most of the time when I run it, I'll get this ValueError. Once in a while it will mysteriously work without my workaround but I don't know why.

The workaround I found was if I printed the position before and after my seek command, it seems to work all the time but I don't know why. Can someone explain this to me? Do I need to build in some short delay to make this work? Does printing my position in the Consumer reset something within the Consumer which makes it work?

EDIT: Full traceback is here:

$ python tkCons.py
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
    (value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes
...
...
...
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes

(Note: I've removed the HTML escape codes to provide a clean translation. If you need more specific information or assistance, please let me know.)

英文:

I'm using python 3.9.16 and kafka-python version 2.0.2. I'm running on my Macbook Pro IOS 11.6.5.

New to Kafka and just playing around with it for now. I'm not sure what the issue is and I'm not sure why my workaround works.

What I'm trying to do is seek to a specific offset on the topic but I routinely run into a ValueError.

This is the code I have.

from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(bootstrap_servers=[&#39;localhost:9092&#39;])

#import pdb
#pdb.set_trace()

myTP = TopicPartition(&#39;my-topic&#39;, 0)

consumer.assign([myTP])
print (&quot;this is the consumer assignment: {}&quot;.format(consumer.assignment()))
#print (&quot;not sure why this will work but printing position: {} &quot;.format(consumer.position(myTP)))
consumer.seek(myTP, 22) 
#print (&quot;not sure why this will work but printing position: {} &quot;.format(consumer.position(myTP)))

for blah in consumer:
    print (&quot;{}, {}&quot;.format(blah.offset, blah.value))

So most of the time when I run it, I'll get this ValueError. Once in a while it will mysteriously work without my workaround but I don't know why.

this is the consumer assignment: {TopicPartition(topic=&#39;my-topic&#39;, partition=0)}
Traceback (most recent call last):
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py&quot;, line 20, in _unpack
    (value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes
...
...
...
ValueError: Error encountered when attempting to convert value: b&#39;&#39; to struct format: &#39;&lt;built-in method unpack of _struct.Struct object at 0x10539a930&gt;&#39;, hit error: unpack requires a buffer of 4 bytes

The workaround I found was if I printed the position before and after my seek command, it seems to work all the time but I don't know why. Can someone explain this to me? Do I need to build in some short delay to make this work? Does printing my position in the Consumer reset something within the Consumer which makes it work?

$ python tkCons.py 
this is the consumer assignment: {TopicPartition(topic=&#39;my-topic&#39;, partition=0)}
not sure why this will work but printing position: 34 
not sure why this will work but printing position: 22 
22, b&#39;{&quot;number&quot;: 8}&#39;
23, b&#39;{&quot;number&quot;: 9}&#39;
24, b&#39;{&quot;number&quot;: 0}&#39;
25, b&#39;{&quot;number&quot;: 1}&#39;
26, b&#39;{&quot;number&quot;: 2}&#39;
27, b&#39;{&quot;number&quot;: 3}&#39;
28, b&#39;{&quot;number&quot;: 4}&#39;
29, b&#39;{&quot;number&quot;: 5}&#39;
30, b&#39;{&quot;number&quot;: 6}&#39;
31, b&#39;{&quot;number&quot;: 7}&#39;
32, b&#39;{&quot;number&quot;: 8}&#39;
33, b&#39;{&quot;number&quot;: 9}&#39;

EDIT:
Full traceback is here:

$ python tkCons.py
this is the consumer assignment: {TopicPartition(topic=&#39;my-topic&#39;, partition=0)}
Traceback (most recent call last):
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py&quot;, line 20, in _unpack
    (value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File &quot;/Users/my_secret_username/kafka/tkCons.py&quot;, line 34, in &lt;module&gt;
    for blah in consumer:
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py&quot;, line 1193, in __next__
    return self.next_v2()
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py&quot;, line 1201, in next_v2
    return next(self._iterator)
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py&quot;, line 1116, in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py&quot;, line 655, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py&quot;, line 702, in _poll_once
    self._client.poll(timeout_ms=timeout_ms)
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py&quot;, line 602, in poll
    self._poll(timeout / 1000)
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py&quot;, line 687, in _poll
    self._pending_completion.extend(conn.recv())
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py&quot;, line 1053, in recv
    responses = self._recv()
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py&quot;, line 1127, in _recv
    return self._protocol.receive_bytes(recvd_data)
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py&quot;, line 132, in receive_bytes
    resp = self._process_response(self._rbuffer)
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py&quot;, line 138, in _process_response
    recv_correlation_id = Int32.decode(read_buffer)
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py&quot;, line 64, in decode
    return _unpack(cls._unpack, data.read(4))
  File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py&quot;, line 23, in _unpack
    raise ValueError(&quot;Error encountered when attempting to convert value: &quot;
ValueError: Error encountered when attempting to convert value: b&#39;&#39; to struct format: &#39;&lt;built-in method unpack of _struct.Struct object at 0x10539a930&gt;&#39;, hit error: unpack requires a buffer of 4 bytes

答案1

得分: 1

我建议升级到最新版本的Python并重试。

错误来自内部的字节解包函数。

英文:

I'd suggest upgrading to the most recent Python release and try again.

The error is coming from internal bytes unpack function.

huangapple
  • 本文由 发表于 2023年5月22日 15:02:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/76303698.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定