如何在Kafka消费者中移动到特定偏移量,而不会遇到ValueError?

huangapple go评论100阅读模式
英文:

How do I move to a specific offset in a Kafka consumer without running into a ValueError?

问题

I'm using python 3.9.16 and kafka-python version 2.0.2. I'm running on my Macbook Pro IOS 11.6.5.

New to Kafka and just playing around with it for now. I'm not sure what the issue is and I'm not sure why my workaround works.

What I'm trying to do is seek to a specific offset on the topic but I routinely run into a ValueError.

This is the code I have.

  1. from kafka import KafkaConsumer, TopicPartition
  2. consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
  3. myTP = TopicPartition('my-topic', 0)
  4. consumer.assign([myTP])
  5. print("this is the consumer assignment: {}".format(consumer.assignment()))
  6. consumer.seek(myTP, 22)
  7. for blah in consumer:
  8. print("{}, {}".format(blah.offset, blah.value))

So most of the time when I run it, I'll get this ValueError. Once in a while it will mysteriously work without my workaround but I don't know why.

The workaround I found was if I printed the position before and after my seek command, it seems to work all the time but I don't know why. Can someone explain this to me? Do I need to build in some short delay to make this work? Does printing my position in the Consumer reset something within the Consumer which makes it work?

EDIT: Full traceback is here:

  1. $ python tkCons.py
  2. this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
  3. Traceback (most recent call last):
  4. File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
  5. (value,) = f(data)
  6. struct.error: unpack requires a buffer of 4 bytes
  7. ...
  8. ...
  9. ...
  10. ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes

(Note: I've removed the HTML escape codes to provide a clean translation. If you need more specific information or assistance, please let me know.)

英文:

I'm using python 3.9.16 and kafka-python version 2.0.2. I'm running on my Macbook Pro IOS 11.6.5.

New to Kafka and just playing around with it for now. I'm not sure what the issue is and I'm not sure why my workaround works.

What I'm trying to do is seek to a specific offset on the topic but I routinely run into a ValueError.

This is the code I have.

  1. from kafka import KafkaConsumer, TopicPartition
  2. consumer = KafkaConsumer(bootstrap_servers=[&#39;localhost:9092&#39;])
  3. #import pdb
  4. #pdb.set_trace()
  5. myTP = TopicPartition(&#39;my-topic&#39;, 0)
  6. consumer.assign([myTP])
  7. print (&quot;this is the consumer assignment: {}&quot;.format(consumer.assignment()))
  8. #print (&quot;not sure why this will work but printing position: {} &quot;.format(consumer.position(myTP)))
  9. consumer.seek(myTP, 22)
  10. #print (&quot;not sure why this will work but printing position: {} &quot;.format(consumer.position(myTP)))
  11. for blah in consumer:
  12. print (&quot;{}, {}&quot;.format(blah.offset, blah.value))

So most of the time when I run it, I'll get this ValueError. Once in a while it will mysteriously work without my workaround but I don't know why.

  1. this is the consumer assignment: {TopicPartition(topic=&#39;my-topic&#39;, partition=0)}
  2. Traceback (most recent call last):
  3. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py&quot;, line 20, in _unpack
  4. (value,) = f(data)
  5. struct.error: unpack requires a buffer of 4 bytes
  6. ...
  7. ...
  8. ...
  9. ValueError: Error encountered when attempting to convert value: b&#39;&#39; to struct format: &#39;&lt;built-in method unpack of _struct.Struct object at 0x10539a930&gt;&#39;, hit error: unpack requires a buffer of 4 bytes

The workaround I found was if I printed the position before and after my seek command, it seems to work all the time but I don't know why. Can someone explain this to me? Do I need to build in some short delay to make this work? Does printing my position in the Consumer reset something within the Consumer which makes it work?

  1. $ python tkCons.py
  2. this is the consumer assignment: {TopicPartition(topic=&#39;my-topic&#39;, partition=0)}
  3. not sure why this will work but printing position: 34
  4. not sure why this will work but printing position: 22
  5. 22, b&#39;{&quot;number&quot;: 8}&#39;
  6. 23, b&#39;{&quot;number&quot;: 9}&#39;
  7. 24, b&#39;{&quot;number&quot;: 0}&#39;
  8. 25, b&#39;{&quot;number&quot;: 1}&#39;
  9. 26, b&#39;{&quot;number&quot;: 2}&#39;
  10. 27, b&#39;{&quot;number&quot;: 3}&#39;
  11. 28, b&#39;{&quot;number&quot;: 4}&#39;
  12. 29, b&#39;{&quot;number&quot;: 5}&#39;
  13. 30, b&#39;{&quot;number&quot;: 6}&#39;
  14. 31, b&#39;{&quot;number&quot;: 7}&#39;
  15. 32, b&#39;{&quot;number&quot;: 8}&#39;
  16. 33, b&#39;{&quot;number&quot;: 9}&#39;

EDIT:
Full traceback is here:

  1. $ python tkCons.py
  2. this is the consumer assignment: {TopicPartition(topic=&#39;my-topic&#39;, partition=0)}
  3. Traceback (most recent call last):
  4. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py&quot;, line 20, in _unpack
  5. (value,) = f(data)
  6. struct.error: unpack requires a buffer of 4 bytes
  7. During handling of the above exception, another exception occurred:
  8. Traceback (most recent call last):
  9. File &quot;/Users/my_secret_username/kafka/tkCons.py&quot;, line 34, in &lt;module&gt;
  10. for blah in consumer:
  11. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py&quot;, line 1193, in __next__
  12. return self.next_v2()
  13. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py&quot;, line 1201, in next_v2
  14. return next(self._iterator)
  15. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py&quot;, line 1116, in _message_generator_v2
  16. record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  17. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py&quot;, line 655, in poll
  18. records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  19. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py&quot;, line 702, in _poll_once
  20. self._client.poll(timeout_ms=timeout_ms)
  21. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py&quot;, line 602, in poll
  22. self._poll(timeout / 1000)
  23. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py&quot;, line 687, in _poll
  24. self._pending_completion.extend(conn.recv())
  25. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py&quot;, line 1053, in recv
  26. responses = self._recv()
  27. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py&quot;, line 1127, in _recv
  28. return self._protocol.receive_bytes(recvd_data)
  29. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py&quot;, line 132, in receive_bytes
  30. resp = self._process_response(self._rbuffer)
  31. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py&quot;, line 138, in _process_response
  32. recv_correlation_id = Int32.decode(read_buffer)
  33. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py&quot;, line 64, in decode
  34. return _unpack(cls._unpack, data.read(4))
  35. File &quot;/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py&quot;, line 23, in _unpack
  36. raise ValueError(&quot;Error encountered when attempting to convert value: &quot;
  37. ValueError: Error encountered when attempting to convert value: b&#39;&#39; to struct format: &#39;&lt;built-in method unpack of _struct.Struct object at 0x10539a930&gt;&#39;, hit error: unpack requires a buffer of 4 bytes

答案1

得分: 1

我建议升级到最新版本的Python并重试。

错误来自内部的字节解包函数。

英文:

I'd suggest upgrading to the most recent Python release and try again.

The error is coming from internal bytes unpack function.

huangapple
  • 本文由 发表于 2023年5月22日 15:02:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/76303698.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定