英文:
How to pass a Kafka Consumer object to a function that will be submitted to dask in python?
问题
我在尝试将消费者对象作为参数传递给提交给Dask的函数时,总是遇到一个Pickle错误。我正在使用confluent_kafka来创建消费者,但我相信在使用kafka-python时也会发生相同的情况。有没有办法解决这个问题?
谢谢。
英文:
I get a pickle error whenever I try to pass the consumer object as an argument to a function that is being submitted to dask. I'm using confluent_kafka to create the consumer, but I believe the same happens when using kafka-python. Is there any way to solve this?
Thanks.
答案1
得分: 2
你可能会对尝试使用streamz感兴趣,它与kafka有集成,还支持dask。如果不使用streamz,客户端需要在每个工作进程中重新创建,可以将其作为全局变量,或者在每个任务内创建(后者会产生一些开销)。
问题的解释:
rdkafka对象实质上是一个引用C结构的对象,其中包含内部状态、线程和打开的套接字。Python 不知道如何“pickle”(序列化)这个对象,这是将其传输到另一个进程(例如dask工作进程)所需的方式。理论上,你可以为dask创建自己的序列化方法(参见此处),但实际上,你应该为每个工作进程创建新的消费者客户端。
英文:
You might be interested to try streamz, which has an integration with kafka as well as dask.
You may be interested in this blog by RapidsAI, showing how many kafka events can be processed per second with the help of GPUs.
If not using streamz, the client needs to be recreated on each worker, either as some global, or within each task (the latter incurring overhead).
Explanation of problem:
the rdkafka object is essentially a reference to a C struct with both internal state, thread and open sockets. Python does not know how to "pickle" (serialise) this thing, which is how it would need to be transferred to another process, your dask worker. You could in theory create your own serialisation for dask (see here), but in practice you should instead create new consumer clients for each worker.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论