cannot pickle 'PyCapsule' object error, when using pybind11 function and dask

huangapple go评论75阅读模式
英文:

cannot pickle 'PyCapsule' object error, when using pybind11 function and dask

问题

以下是您提供的代码的中文翻译部分:

setup.py 部分:

  1. import sys
  2. from pybind11 import get_cmake_dir
  3. from pybind11.setup_helpers import Pybind11Extension, build_ext
  4. from setuptools import setup
  5. __version__ = "0.0.1"
  6. ext_modules = [
  7. Pybind11Extension("python_example",
  8. ["src/main.cpp"],
  9. define_macros=[('VERSION_INFO', __version__)],
  10. ),
  11. ]
  12. setup(
  13. name="python_example",
  14. version=__version__,
  15. author="Sylvain Corlay",
  16. author_email="sylvain.corlay@gmail.com",
  17. url="https://github.com/pybind/python_example",
  18. description="A test project using pybind11",
  19. long_description="",
  20. ext_modules=ext_modules,
  21. extras_require={"test": "pytest"},
  22. cmdclass={"build_ext": build_ext},
  23. zip_safe=False,
  24. python_requires=">=3.7",
  25. )

src/main.cpp 部分:

  1. #include <pybind11/pybind11.h>
  2. #define STRINGIFY(x) #x
  3. #define MACRO_STRINGIFY(x) STRINGIFY(x)
  4. int add(int i, int j) {
  5. return i + j;
  6. }
  7. namespace py = pybind11;
  8. PYBIND11_MODULE(python_example, m) {
  9. m.doc() = R"pbdoc(
  10. Pybind11 example plugin
  11. -----------------------
  12. .. currentmodule:: python_example
  13. .. autosummary::
  14. :toctree: _generate
  15. add
  16. subtract
  17. )pbdoc";
  18. m.def("add", &add, R"pbdoc(
  19. Add two numbers
  20. Some other explanation about the add function.
  21. )pbdoc");
  22. m.def("subtract", [](int i, int j) { return i - j; }, R"pbdoc(
  23. Subtract two numbers
  24. Some other explanation about the subtract function.
  25. )pbdoc";
  26. #ifdef VERSION_INFO
  27. m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
  28. #else
  29. m.attr("__version__") = "dev";
  30. #endif
  31. }

example.py 部分:

  1. import numpy as np
  2. import pandas as pd
  3. import dask.dataframe as dd
  4. from dask.diagnostics import ProgressBar
  5. from python_example import add
  6. def python_add(i: int, j: int) -> int:
  7. return i + j
  8. def add_column_values_python(row: pd.Series) -> pd.Series:
  9. row['sum'] = python_add(row['i'], row['j'])
  10. def add_column_values(row: pd.Series) -> pd.Series:
  11. row['sum'] = add(int(row['i']), int(row['j']))
  12. def main():
  13. dataframe = pd.read_csv('./example.csv', index_col=[])
  14. dataframe['sum'] = np.nan
  15. with ProgressBar():
  16. d_dataframe = dd.from_pandas(dataframe, npartitions=16)
  17. dataframe = d_dataframe.map_partitions(
  18. lambda df: df.apply(add_column_values_python, axis=1)).compute(scheduler='processes')
  19. with ProgressBar():
  20. d_dataframe = dd.from_pandas(dataframe, npartitions=16)
  21. dataframe = d_dataframe.map_partitions(
  22. lambda df: df.apply(add_column_values, axis=1), meta=pd.Series(dtype='float64')).compute(scheduler='processes')
  23. if __name__ == '__main__':
  24. main()

以上是代码的翻译,如果您有关于代码运行时出现的错误的问题,可以提供更多信息,以便我提供帮助。

英文:

See the following example extracted from the pybind11 help (https://github.com/pybind/python_example):

The setup.py is:

  1. import sys
  2. # Available at setup time due to pyproject.toml
  3. from pybind11 import get_cmake_dir
  4. from pybind11.setup_helpers import Pybind11Extension, build_ext
  5. from setuptools import setup
  6. __version__ = &quot;0.0.1&quot;
  7. # The main interface is through Pybind11Extension.
  8. # * You can add cxx_std=11/14/17, and then build_ext can be removed.
  9. # * You can set include_pybind11=false to add the include directory yourself,
  10. # say from a submodule.
  11. #
  12. # Note:
  13. # Sort input source files if you glob sources to ensure bit-for-bit
  14. # reproducible builds (https://github.com/pybind/python_example/pull/53)
  15. ext_modules = [
  16. Pybind11Extension(&quot;python_example&quot;,
  17. [&quot;src/main.cpp&quot;],
  18. # Example: passing in the version to the compiled code
  19. define_macros = [(&#39;VERSION_INFO&#39;, __version__)],
  20. ),
  21. ]
  22. setup(
  23. name=&quot;python_example&quot;,
  24. version=__version__,
  25. author=&quot;Sylvain Corlay&quot;,
  26. author_email=&quot;sylvain.corlay@gmail.com&quot;,
  27. url=&quot;https://github.com/pybind/python_example&quot;,
  28. description=&quot;A test project using pybind11&quot;,
  29. long_description=&quot;&quot;,
  30. ext_modules=ext_modules,
  31. extras_require={&quot;test&quot;: &quot;pytest&quot;},
  32. # Currently, build_ext only provides an optional &quot;highest supported C++
  33. # level&quot; feature, but in the future it may provide more features.
  34. cmdclass={&quot;build_ext&quot;: build_ext},
  35. zip_safe=False,
  36. python_requires=&quot;&gt;=3.7&quot;,
  37. )

The Cpp part is (src/main.cpp):

  1. #include &lt;pybind11/pybind11.h&gt;
  2. #define STRINGIFY(x) #x
  3. #define MACRO_STRINGIFY(x) STRINGIFY(x)
  4. int add(int i, int j) {
  5. return i + j;
  6. }
  7. namespace py = pybind11;
  8. PYBIND11_MODULE(python_example, m) {
  9. m.doc() = R&quot;pbdoc(
  10. Pybind11 example plugin
  11. -----------------------
  12. .. currentmodule:: python_example
  13. .. autosummary::
  14. :toctree: _generate
  15. add
  16. subtract
  17. )pbdoc&quot;;
  18. m.def(&quot;add&quot;, &amp;add, R&quot;pbdoc(
  19. Add two numbers
  20. Some other explanation about the add function.
  21. )pbdoc&quot;);
  22. m.def(&quot;subtract&quot;, [](int i, int j) { return i - j; }, R&quot;pbdoc(
  23. Subtract two numbers
  24. Some other explanation about the subtract function.
  25. )pbdoc&quot;);
  26. #ifdef VERSION_INFO
  27. m.attr(&quot;__version__&quot;) = MACRO_STRINGIFY(VERSION_INFO);
  28. #else
  29. m.attr(&quot;__version__&quot;) = &quot;dev&quot;;
  30. #endif
  31. }

And the python code that I want to run is this (example.py):

  1. import numpy as np
  2. import pandas as pd
  3. import dask.dataframe as dd
  4. from dask.diagnostics import ProgressBar
  5. from python_example import add
  6. def python_add(i: int, j: int) -&gt; int:
  7. return i + j
  8. def add_column_values_python(row: pd.Series) -&gt; pd.Series:
  9. row[&#39;sum&#39;] = python_add(row[&#39;i&#39;], row[&#39;j&#39;])
  10. def add_column_values(row: pd.Series) -&gt; pd.Series:
  11. row[&#39;sum&#39;] = add(int(row[&#39;i&#39;]), int(row[&#39;j&#39;]))
  12. def main():
  13. dataframe = pd.read_csv(&#39;./example.csv&#39;, index_col=[])
  14. dataframe[&#39;sum&#39;] = np.nan
  15. with ProgressBar():
  16. d_dataframe = dd.from_pandas(dataframe, npartitions=16)
  17. dataframe = d_dataframe.map_partitions(
  18. lambda df: df.apply(add_column_values_python, axis=1)).compute(scheduler=&#39;processes&#39;)
  19. with ProgressBar():
  20. d_dataframe = dd.from_pandas(dataframe, npartitions=16)
  21. dataframe = d_dataframe.map_partitions(
  22. lambda df: df.apply(add_column_values, axis=1), meta=pd.Series(dtype=&#39;float64&#39;)).compute(scheduler=&#39;processes&#39;)
  23. if __name__ == &#39;__main__&#39;:
  24. main()

And the example.csv file looks like this:

  1. i,j
  2. 1,2
  3. 3,4
  4. 5,6
  5. 7,8
  6. 9,10

But when I run this code I get the following error when using the C++ add version:

  1. [########################################] | 100% Completed | 1.24 ss
  2. [ ] | 0% Completed | 104.05 ms
  3. Traceback (most recent call last):
  4. File &quot;/Users/user/local/src/python_example/example.py&quot;, line 38, in &lt;module&gt;
  5. main()
  6. File &quot;/Users/user/local/src/python_example/example.py&quot;, line 33, in main
  7. dataframe = d_dataframe.map_partitions(
  8. File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/base.py&quot;, line 314, in compute
  9. (result,) = compute(self, traverse=False, **kwargs)
  10. File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/base.py&quot;, line 599, in compute
  11. results = schedule(dsk, keys, **kwargs)
  12. File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/multiprocessing.py&quot;, line 233, in get
  13. result = get_async(
  14. File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/local.py&quot;, line 499, in get_async
  15. fire_tasks(chunksize)
  16. File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/local.py&quot;, line 481, in fire_tasks
  17. dumps((dsk[key], data)),
  18. File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py&quot;, line 73, in dumps
  19. cp.dump(obj)
  20. File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py&quot;, line 632, in dump
  21. return Pickler.dump(self, obj)
  22. TypeError: cannot pickle &#39;PyCapsule&#39; object

Is there a way to solve that, maybe by defining something in the C++ module definition?

Note that this example is only to illustrate the problem.

答案1

得分: 2

由于mdurant的帮助,我终于让它正常工作了,以下是更新后的代码:

现在的main.cpp看起来像这样:

  1. #include <pybind11/pybind11.h>
  2. #define STRINGIFY(x) #x
  3. #define MACRO_STRINGIFY(x) STRINGIFY(x)
  4. int add(int i, int j) {
  5. return i + j;
  6. }
  7. class Add {
  8. public:
  9. Add() {};
  10. int add(int i, int j) {
  11. return i + j;
  12. }
  13. };
  14. namespace py = pybind11;
  15. PYBIND11_MODULE(python_example, m) {
  16. m.doc() = R"pbdoc(
  17. Pybind11 example plugin
  18. -----------------------
  19. .. currentmodule:: python_example
  20. .. autosummary::
  21. :toctree: _generate
  22. add
  23. subtract
  24. )pbdoc";
  25. m.def("add", &add, R"pbdoc(
  26. Add two numbers
  27. Some other explanation about the add function.
  28. )pbdoc");
  29. m.def("subtract", [](int i, int j) { return i - j; }, R"pbdoc(
  30. Subtract two numbers
  31. Some other explanation about the subtract function.
  32. )pbdoc");
  33. #ifdef VERSION_INFO
  34. m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
  35. #else
  36. m.attr("__version__") = "dev";
  37. #endif
  38. py::class_<Add>(m, "Add")
  39. .def(py::init<>())
  40. .def("__call__", &Add::add)
  41. .def("__getstate__", [](const Add &p) {
  42. /* 返回完全编码对象状态的元组 */
  43. return py::make_tuple();
  44. })
  45. .def("__setstate__", [](Add &p, py::tuple t) {
  46. if (t.size() != 0)
  47. throw std::runtime_error("无效状态!");
  48. /* 调用就地构造函数。请注意,即使对象只有一个平凡的默认构造函数,这也是必要的 */
  49. new (&p) Add();
  50. /* 分配任何其他状态 */
  51. });
  52. }

而example.py文件如下:

  1. import numpy as np
  2. import pandas as pd
  3. import dask.dataframe as dd
  4. from dask.diagnostics import ProgressBar
  5. from python_example import Add
  6. def python_add(i: int, j: int) -> int:
  7. return i + j
  8. def add_column_values_python(row: pd.Series) -> pd.Series:
  9. row['sum'] = python_add(row['i'], row['j'])
  10. return row
  11. def add_column_values(row: pd.Series) -> pd.Series:
  12. row['sum'] = Add()(int(row['i']), int(row['j']))
  13. return row
  14. def add_column_values_import(row: pd.Series) -> pd.Series:
  15. from python_example import add
  16. row['sum'] = add(int(row['i']), int(row['j']))
  17. return row
  18. def main():
  19. dataframe = pd.read_csv('./example.csv', index_col=[])
  20. dataframe['sum'] = np.nan
  21. with ProgressBar():
  22. d_dataframe = dd.from_pandas(dataframe, npartitions=16)
  23. dataframe = d_dataframe.map_partitions(
  24. lambda df: df.apply(add_column_values_python, axis=1)).compute(scheduler='processes')
  25. with ProgressBar():
  26. d_dataframe = dd.from_pandas(dataframe, npartitions=16)
  27. dataframe = d_dataframe.map_partitions(
  28. lambda df: df.apply(add_column_values, axis=1)).compute(scheduler='processes')
  29. with ProgressBar():
  30. d_dataframe = dd.from_pandas(dataframe, npartitions=16)
  31. dataframe = d_dataframe.map_partitions(
  32. lambda df: df.apply(add_column_values_import, axis=1)).compute(scheduler='processes')
  33. if __name__ == '__main__':
  34. main()

这个想法是将函数放入一个类中,然后定义__getstate____setstate__python魔术函数,或者在函数内导入。

了解更多信息:https://pybind11-jagerman.readthedocs.io/en/stable/advanced.html

英文:

Thanks to mdurant I finally got it working, here is the updated code:

Now the main.cpp look like this:

  1. #include &lt;pybind11/pybind11.h&gt;
  2. #define STRINGIFY(x) #x
  3. #define MACRO_STRINGIFY(x) STRINGIFY(x)
  4. int add(int i, int j) {
  5. return i + j;
  6. }
  7. class Add {
  8. public:
  9. Add() {};
  10. int add(int i, int j) {
  11. return i + j;
  12. }
  13. };
  14. namespace py = pybind11;
  15. PYBIND11_MODULE(python_example, m) {
  16. m.doc() = R&quot;pbdoc(
  17. Pybind11 example plugin
  18. -----------------------
  19. .. currentmodule:: python_example
  20. .. autosummary::
  21. :toctree: _generate
  22. add
  23. subtract
  24. )pbdoc&quot;;
  25. m.def(&quot;add&quot;, &amp;add, R&quot;pbdoc(
  26. Add two numbers
  27. Some other explanation about the add function.
  28. )pbdoc&quot;);
  29. m.def(&quot;subtract&quot;, [](int i, int j) { return i - j; }, R&quot;pbdoc(
  30. Subtract two numbers
  31. Some other explanation about the subtract function.
  32. )pbdoc&quot;);
  33. #ifdef VERSION_INFO
  34. m.attr(&quot;__version__&quot;) = MACRO_STRINGIFY(VERSION_INFO);
  35. #else
  36. m.attr(&quot;__version__&quot;) = &quot;dev&quot;;
  37. #endif
  38. py::class_&lt;Add&gt;(m, &quot;Add&quot;)
  39. .def(py::init&lt;&gt;())
  40. .def(&quot;__call__&quot;, &amp;Add::add)
  41. .def(&quot;__getstate__&quot;, [](const Add &amp;p) {
  42. /* Return a tuple that fully encodes the state of the object */
  43. return py::make_tuple();
  44. })
  45. .def(&quot;__setstate__&quot;, [](Add &amp;p, py::tuple t) {
  46. if (t.size() != 0)
  47. throw std::runtime_error(&quot;Invalid state!&quot;);
  48. /* Invoke the in-place constructor. Note that this is needed even
  49. when the object just has a trivial default constructor */
  50. new (&amp;p) Add();
  51. /* Assign any additional state */
  52. });
  53. }

And the example.py file looks like this:

  1. import numpy as np
  2. import pandas as pd
  3. import dask.dataframe as dd
  4. from dask.diagnostics import ProgressBar
  5. from python_example import Add
  6. def python_add(i: int, j: int) -&gt; int:
  7. return i + j
  8. def add_column_values_python(row: pd.Series) -&gt; pd.Series:
  9. row[&#39;sum&#39;] = python_add(row[&#39;i&#39;], row[&#39;j&#39;])
  10. return row
  11. def add_column_values(row: pd.Series) -&gt; pd.Series:
  12. row[&#39;sum&#39;] = Add()(int(row[&#39;i&#39;]), int(row[&#39;j&#39;]))
  13. return row
  14. def add_column_values_import(row: pd.Series) -&gt; pd.Series:
  15. from python_example import add
  16. row[&#39;sum&#39;] = add(int(row[&#39;i&#39;]), int(row[&#39;j&#39;]))
  17. return row
  18. def main():
  19. dataframe = pd.read_csv(&#39;./example.csv&#39;, index_col=[])
  20. dataframe[&#39;sum&#39;] = np.nan
  21. with ProgressBar():
  22. d_dataframe = dd.from_pandas(dataframe, npartitions=16)
  23. dataframe = d_dataframe.map_partitions(
  24. lambda df: df.apply(add_column_values_python, axis=1)).compute(scheduler=&#39;processes&#39;)
  25. with ProgressBar():
  26. d_dataframe = dd.from_pandas(dataframe, npartitions=16)
  27. dataframe = d_dataframe.map_partitions(
  28. lambda df: df.apply(add_column_values, axis=1)).compute(scheduler=&#39;processes&#39;)
  29. with ProgressBar():
  30. d_dataframe = dd.from_pandas(dataframe, npartitions=16)
  31. dataframe = d_dataframe.map_partitions(
  32. lambda df: df.apply(add_column_values_import, axis=1)).compute(scheduler=&#39;processes&#39;)
  33. if __name__ == &#39;__main__&#39;:
  34. main()

The idea is to put the functions inside a class and then define the __getstate__ and __setstate__ python magic functions or the import inside the function.

For more information:
https://pybind11-jagerman.readthedocs.io/en/stable/advanced.html

答案2

得分: 1

General

如果你想在Python中将一个对象从一个进程传递到另一个进程(使用Dask或不使用Dask),你需要一种序列化的方式。默认的方法是使用"pickle"。C库中的对象基本上是基于动态指针的东西,pickle不知道该如何处理它们。你可以通过提供getstate/setstatereduce双下划线方法来为你的C对象实现pickle协议。

另外,Dask有一个序列化层,你可以为特定类注册特定的序列化和反序列化函数,但这仅适用于分布式调度器,不适用于多进程(前者在各方面都更好,没有好的理由使用多进程)。

Specific

还有一些更简单的选项:

  • 使用线程调度器,这样就不需要序列化(C代码应该释放GIL并获得完全的并行性)。
  • 我认为只有add函数是问题所在,可能只需将你的导入移到add_column_values函数中,这样每个工作进程都会得到自己的副本,而不是从闭包中传递它。
英文:

General

If you want to pass an object from one process to another in python (with or without dask), you need a way to serialise it. The default method for this is "pickle". Objects in C libraries are fundamentally dynamic pointer-based things and pickle doesn't know what to do with them. You can implement the pickle protocol for your C object by providing getstate/setstate or reduce dunder methods.

Alternatively, dask has a layer of serialisation where you can register specific ser/de functions for specific classes, but that is only with the distributed scheduler, not multiprocessing (the former is better in every way, there is no good reason you should be using multiprocessing).

Specific

A couple of simpler options:

  • use the threading scheduler, so that no serialisation is needed (C code ought to release the GIL and get full parallelism)
  • I think it's only the add function that is the problem, it's probably enough to move your import into the add_column_values function, so that each worker gets its own copy instead of passing it from the closure.

huangapple
  • 本文由 发表于 2023年2月14日 21:36:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/75448632.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定