如何将Kafka Consumer对象传递给将在Python中提交给Dask的函数?

huangapple go评论72阅读模式
英文:

How to pass a Kafka Consumer object to a function that will be submitted to dask in python?

问题

我在尝试将消费者对象作为参数传递给提交给Dask的函数时,总是遇到一个Pickle错误。我正在使用confluent_kafka来创建消费者,但我相信在使用kafka-python时也会发生相同的情况。有没有办法解决这个问题?

谢谢。

英文:

I get a pickle error whenever I try to pass the consumer object as an argument to a function that is being submitted to dask. I'm using confluent_kafka to create the consumer, but I believe the same happens when using kafka-python. Is there any way to solve this?

Thanks.

答案1

得分: 2

你可能会对尝试使用streamz感兴趣,它与kafka有集成,还支持dask。如果不使用streamz,客户端需要在每个工作进程中重新创建,可以将其作为全局变量,或者在每个任务内创建(后者会产生一些开销)。

问题的解释:
rdkafka对象实质上是一个引用C结构的对象,其中包含内部状态、线程和打开的套接字。Python 不知道如何“pickle”(序列化)这个对象,这是将其传输到另一个进程(例如dask工作进程)所需的方式。理论上,你可以为dask创建自己的序列化方法(参见此处),但实际上,你应该为每个工作进程创建新的消费者客户端。

英文:

You might be interested to try streamz, which has an integration with kafka as well as dask.

You may be interested in this blog by RapidsAI, showing how many kafka events can be processed per second with the help of GPUs.

If not using streamz, the client needs to be recreated on each worker, either as some global, or within each task (the latter incurring overhead).

Explanation of problem:
the rdkafka object is essentially a reference to a C struct with both internal state, thread and open sockets. Python does not know how to "pickle" (serialise) this thing, which is how it would need to be transferred to another process, your dask worker. You could in theory create your own serialisation for dask (see here), but in practice you should instead create new consumer clients for each worker.

huangapple
  • 本文由 发表于 2023年2月6日 12:27:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/75357339.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定