英文:
How do I move to a specific offset in a Kafka consumer without running into a ValueError?
问题
I'm using python 3.9.16 and kafka-python version 2.0.2. I'm running on my Macbook Pro IOS 11.6.5.
New to Kafka and just playing around with it for now. I'm not sure what the issue is and I'm not sure why my workaround works.
What I'm trying to do is seek to a specific offset on the topic but I routinely run into a ValueError.
This is the code I have.
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
myTP = TopicPartition('my-topic', 0)
consumer.assign([myTP])
print("this is the consumer assignment: {}".format(consumer.assignment()))
consumer.seek(myTP, 22)
for blah in consumer:
print("{}, {}".format(blah.offset, blah.value))
So most of the time when I run it, I'll get this ValueError. Once in a while it will mysteriously work without my workaround but I don't know why.
The workaround I found was if I printed the position before and after my seek command, it seems to work all the time but I don't know why. Can someone explain this to me? Do I need to build in some short delay to make this work? Does printing my position in the Consumer reset something within the Consumer which makes it work?
EDIT: Full traceback is here:
$ python tkCons.py
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
(value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes
...
...
...
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes
(Note: I've removed the HTML escape codes to provide a clean translation. If you need more specific information or assistance, please let me know.)
英文:
I'm using python 3.9.16 and kafka-python version 2.0.2. I'm running on my Macbook Pro IOS 11.6.5.
New to Kafka and just playing around with it for now. I'm not sure what the issue is and I'm not sure why my workaround works.
What I'm trying to do is seek to a specific offset on the topic but I routinely run into a ValueError.
This is the code I have.
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
#import pdb
#pdb.set_trace()
myTP = TopicPartition('my-topic', 0)
consumer.assign([myTP])
print ("this is the consumer assignment: {}".format(consumer.assignment()))
#print ("not sure why this will work but printing position: {} ".format(consumer.position(myTP)))
consumer.seek(myTP, 22)
#print ("not sure why this will work but printing position: {} ".format(consumer.position(myTP)))
for blah in consumer:
print ("{}, {}".format(blah.offset, blah.value))
So most of the time when I run it, I'll get this ValueError. Once in a while it will mysteriously work without my workaround but I don't know why.
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
(value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes
...
...
...
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes
The workaround I found was if I printed the position before and after my seek command, it seems to work all the time but I don't know why. Can someone explain this to me? Do I need to build in some short delay to make this work? Does printing my position in the Consumer reset something within the Consumer which makes it work?
$ python tkCons.py
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
not sure why this will work but printing position: 34
not sure why this will work but printing position: 22
22, b'{"number": 8}'
23, b'{"number": 9}'
24, b'{"number": 0}'
25, b'{"number": 1}'
26, b'{"number": 2}'
27, b'{"number": 3}'
28, b'{"number": 4}'
29, b'{"number": 5}'
30, b'{"number": 6}'
31, b'{"number": 7}'
32, b'{"number": 8}'
33, b'{"number": 9}'
EDIT:
Full traceback is here:
$ python tkCons.py
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
(value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/my_secret_username/kafka/tkCons.py", line 34, in <module>
for blah in consumer:
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1193, in __next__
return self.next_v2()
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 702, in _poll_once
self._client.poll(timeout_ms=timeout_ms)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py", line 602, in poll
self._poll(timeout / 1000)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py", line 687, in _poll
self._pending_completion.extend(conn.recv())
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py", line 1053, in recv
responses = self._recv()
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py", line 1127, in _recv
return self._protocol.receive_bytes(recvd_data)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py", line 132, in receive_bytes
resp = self._process_response(self._rbuffer)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py", line 138, in _process_response
recv_correlation_id = Int32.decode(read_buffer)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 64, in decode
return _unpack(cls._unpack, data.read(4))
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 23, in _unpack
raise ValueError("Error encountered when attempting to convert value: "
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes
答案1
得分: 1
我建议升级到最新版本的Python并重试。
错误来自内部的字节解包函数。
英文:
I'd suggest upgrading to the most recent Python release and try again.
The error is coming from internal bytes unpack function.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论