cannot pickle 'PyCapsule' object error, when using pybind11 function and dask

huangapple go评论57阅读模式
英文:

cannot pickle 'PyCapsule' object error, when using pybind11 function and dask

问题

以下是您提供的代码的中文翻译部分:

setup.py 部分:

import sys
from pybind11 import get_cmake_dir
from pybind11.setup_helpers import Pybind11Extension, build_ext
from setuptools import setup

__version__ = "0.0.1"

ext_modules = [
    Pybind11Extension("python_example",
        ["src/main.cpp"],
        define_macros=[('VERSION_INFO', __version__)],
    ),
]

setup(
    name="python_example",
    version=__version__,
    author="Sylvain Corlay",
    author_email="sylvain.corlay@gmail.com",
    url="https://github.com/pybind/python_example",
    description="A test project using pybind11",
    long_description="",
    ext_modules=ext_modules,
    extras_require={"test": "pytest"},
    cmdclass={"build_ext": build_ext},
    zip_safe=False,
    python_requires=">=3.7",
)

src/main.cpp 部分:

#include <pybind11/pybind11.h>

#define STRINGIFY(x) #x
#define MACRO_STRINGIFY(x) STRINGIFY(x)

int add(int i, int j) {
    return i + j;
}

namespace py = pybind11;

PYBIND11_MODULE(python_example, m) {
    m.doc() = R"pbdoc(
        Pybind11 example plugin
        -----------------------

        .. currentmodule:: python_example

        .. autosummary::
        :toctree: _generate

        add
        subtract
    )pbdoc";

    m.def("add", &add, R"pbdoc(
        Add two numbers

        Some other explanation about the add function.
    )pbdoc");

    m.def("subtract", [](int i, int j) { return i - j; }, R"pbdoc(
        Subtract two numbers

        Some other explanation about the subtract function.
    )pbdoc";

#ifdef VERSION_INFO
    m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
#else
    m.attr("__version__") = "dev";
#endif
}

example.py 部分:

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from python_example import add

def python_add(i: int, j: int) -> int:
    return i + j

def add_column_values_python(row: pd.Series) -> pd.Series:
    row['sum'] = python_add(row['i'], row['j'])

def add_column_values(row: pd.Series) -> pd.Series:
    row['sum'] = add(int(row['i']), int(row['j']))

def main():
    dataframe = pd.read_csv('./example.csv', index_col=[])
    dataframe['sum'] = np.nan

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values_python, axis=1)).compute(scheduler='processes')

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values, axis=1), meta=pd.Series(dtype='float64')).compute(scheduler='processes')

if __name__ == '__main__':
    main()

以上是代码的翻译,如果您有关于代码运行时出现的错误的问题,可以提供更多信息,以便我提供帮助。

英文:

See the following example extracted from the pybind11 help (https://github.com/pybind/python_example):

The setup.py is:

import sys

# Available at setup time due to pyproject.toml
from pybind11 import get_cmake_dir
from pybind11.setup_helpers import Pybind11Extension, build_ext
from setuptools import setup

__version__ = &quot;0.0.1&quot;

# The main interface is through Pybind11Extension.
# * You can add cxx_std=11/14/17, and then build_ext can be removed.
# * You can set include_pybind11=false to add the include directory yourself,
#   say from a submodule.
#
# Note:
#   Sort input source files if you glob sources to ensure bit-for-bit
#   reproducible builds (https://github.com/pybind/python_example/pull/53)

ext_modules = [
    Pybind11Extension(&quot;python_example&quot;,
        [&quot;src/main.cpp&quot;],
        # Example: passing in the version to the compiled code
        define_macros = [(&#39;VERSION_INFO&#39;, __version__)],
        ),
]

setup(
    name=&quot;python_example&quot;,
    version=__version__,
    author=&quot;Sylvain Corlay&quot;,
    author_email=&quot;sylvain.corlay@gmail.com&quot;,
    url=&quot;https://github.com/pybind/python_example&quot;,
    description=&quot;A test project using pybind11&quot;,
    long_description=&quot;&quot;,
    ext_modules=ext_modules,
    extras_require={&quot;test&quot;: &quot;pytest&quot;},
    # Currently, build_ext only provides an optional &quot;highest supported C++
    # level&quot; feature, but in the future it may provide more features.
    cmdclass={&quot;build_ext&quot;: build_ext},
    zip_safe=False,
    python_requires=&quot;&gt;=3.7&quot;,
)

The Cpp part is (src/main.cpp):

#include &lt;pybind11/pybind11.h&gt;

#define STRINGIFY(x) #x
#define MACRO_STRINGIFY(x) STRINGIFY(x)

int add(int i, int j) {
    return i + j;
}

namespace py = pybind11;

PYBIND11_MODULE(python_example, m) {
    m.doc() = R&quot;pbdoc(
        Pybind11 example plugin
        -----------------------

        .. currentmodule:: python_example

        .. autosummary::
        :toctree: _generate

        add
        subtract
    )pbdoc&quot;;

    m.def(&quot;add&quot;, &amp;add, R&quot;pbdoc(
        Add two numbers

        Some other explanation about the add function.
    )pbdoc&quot;);

    m.def(&quot;subtract&quot;, [](int i, int j) { return i - j; }, R&quot;pbdoc(
        Subtract two numbers

        Some other explanation about the subtract function.
    )pbdoc&quot;);

#ifdef VERSION_INFO
    m.attr(&quot;__version__&quot;) = MACRO_STRINGIFY(VERSION_INFO);
#else
    m.attr(&quot;__version__&quot;) = &quot;dev&quot;;
#endif
}

And the python code that I want to run is this (example.py):

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from python_example import add


def python_add(i: int, j: int) -&gt; int:
    return i + j


def add_column_values_python(row: pd.Series) -&gt; pd.Series:
    row[&#39;sum&#39;] = python_add(row[&#39;i&#39;], row[&#39;j&#39;])


def add_column_values(row: pd.Series) -&gt; pd.Series:
    row[&#39;sum&#39;] = add(int(row[&#39;i&#39;]), int(row[&#39;j&#39;]))


def main():
    dataframe = pd.read_csv(&#39;./example.csv&#39;, index_col=[])
    dataframe[&#39;sum&#39;] = np.nan

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values_python, axis=1)).compute(scheduler=&#39;processes&#39;)

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values, axis=1), meta=pd.Series(dtype=&#39;float64&#39;)).compute(scheduler=&#39;processes&#39;)


if __name__ == &#39;__main__&#39;:
    main()

And the example.csv file looks like this:

i,j
1,2
3,4
5,6
7,8
9,10

But when I run this code I get the following error when using the C++ add version:

[########################################] | 100% Completed | 1.24 ss
[                                        ] | 0% Completed | 104.05 ms
Traceback (most recent call last):
  File &quot;/Users/user/local/src/python_example/example.py&quot;, line 38, in &lt;module&gt;
    main()
  File &quot;/Users/user/local/src/python_example/example.py&quot;, line 33, in main
    dataframe = d_dataframe.map_partitions(
  File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/base.py&quot;, line 314, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/base.py&quot;, line 599, in compute
    results = schedule(dsk, keys, **kwargs)
  File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/multiprocessing.py&quot;, line 233, in get
    result = get_async(
  File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/local.py&quot;, line 499, in get_async
    fire_tasks(chunksize)
  File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/dask/local.py&quot;, line 481, in fire_tasks
    dumps((dsk[key], data)),
  File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py&quot;, line 73, in dumps
    cp.dump(obj)
  File &quot;/Users/user/local/src/python_example/.venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py&quot;, line 632, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle &#39;PyCapsule&#39; object

Is there a way to solve that, maybe by defining something in the C++ module definition?

Note that this example is only to illustrate the problem.

答案1

得分: 2

由于mdurant的帮助,我终于让它正常工作了,以下是更新后的代码:

现在的main.cpp看起来像这样:

#include <pybind11/pybind11.h>

#define STRINGIFY(x) #x
#define MACRO_STRINGIFY(x) STRINGIFY(x)

int add(int i, int j) {
    return i + j;
}

class Add {
public:
    Add() {};
    int add(int i, int j) {
        return i + j;
    }
};

namespace py = pybind11;

PYBIND11_MODULE(python_example, m) {
    m.doc() = R"pbdoc(
        Pybind11 example plugin
        -----------------------

        .. currentmodule:: python_example

        .. autosummary::
        :toctree: _generate

        add
        subtract
    )pbdoc";

    m.def("add", &add, R"pbdoc(
        Add two numbers

        Some other explanation about the add function.
    )pbdoc");

    m.def("subtract", [](int i, int j) { return i - j; }, R"pbdoc(
        Subtract two numbers

        Some other explanation about the subtract function.
    )pbdoc");

#ifdef VERSION_INFO
    m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
#else
    m.attr("__version__") = "dev";
#endif

    py::class_<Add>(m, "Add")
        .def(py::init<>())
        .def("__call__", &Add::add)
        .def("__getstate__", [](const Add &p) {
            /* 返回完全编码对象状态的元组 */
            return py::make_tuple();
        })
        .def("__setstate__", [](Add &p, py::tuple t) {
            if (t.size() != 0)
                throw std::runtime_error("无效状态!");

            /* 调用就地构造函数。请注意,即使对象只有一个平凡的默认构造函数,这也是必要的 */
            new (&p) Add();

            /* 分配任何其他状态 */
        });
}

而example.py文件如下:

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from python_example import Add

def python_add(i: int, j: int) -> int:
    return i + j

def add_column_values_python(row: pd.Series) -> pd.Series:
    row['sum'] = python_add(row['i'], row['j'])
    return row

def add_column_values(row: pd.Series) -> pd.Series:
    row['sum'] = Add()(int(row['i']), int(row['j']))
    return row

def add_column_values_import(row: pd.Series) -> pd.Series:
    from python_example import add
    row['sum'] = add(int(row['i']), int(row['j']))
    return row

def main():
    dataframe = pd.read_csv('./example.csv', index_col=[])
    dataframe['sum'] = np.nan

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values_python, axis=1)).compute(scheduler='processes')

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values, axis=1)).compute(scheduler='processes')

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values_import, axis=1)).compute(scheduler='processes')

if __name__ == '__main__':
    main()

这个想法是将函数放入一个类中,然后定义__getstate____setstate__python魔术函数,或者在函数内导入。

了解更多信息:https://pybind11-jagerman.readthedocs.io/en/stable/advanced.html

英文:

Thanks to mdurant I finally got it working, here is the updated code:

Now the main.cpp look like this:

#include &lt;pybind11/pybind11.h&gt;

#define STRINGIFY(x) #x
#define MACRO_STRINGIFY(x) STRINGIFY(x)

int add(int i, int j) {
    return i + j;
}

class Add {
public:
    Add() {};
    int add(int i, int j) {
        return i + j;
    }
};

namespace py = pybind11;

PYBIND11_MODULE(python_example, m) {
    m.doc() = R&quot;pbdoc(
        Pybind11 example plugin
        -----------------------

        .. currentmodule:: python_example

        .. autosummary::
        :toctree: _generate

        add
        subtract
    )pbdoc&quot;;

    m.def(&quot;add&quot;, &amp;add, R&quot;pbdoc(
        Add two numbers

        Some other explanation about the add function.
    )pbdoc&quot;);

    m.def(&quot;subtract&quot;, [](int i, int j) { return i - j; }, R&quot;pbdoc(
        Subtract two numbers

        Some other explanation about the subtract function.
    )pbdoc&quot;);

#ifdef VERSION_INFO
    m.attr(&quot;__version__&quot;) = MACRO_STRINGIFY(VERSION_INFO);
#else
    m.attr(&quot;__version__&quot;) = &quot;dev&quot;;
#endif

    py::class_&lt;Add&gt;(m, &quot;Add&quot;)
        .def(py::init&lt;&gt;())
        .def(&quot;__call__&quot;, &amp;Add::add)
        .def(&quot;__getstate__&quot;, [](const Add &amp;p) {
            /* Return a tuple that fully encodes the state of the object */
            return py::make_tuple();
        })
        .def(&quot;__setstate__&quot;, [](Add &amp;p, py::tuple t) {
            if (t.size() != 0)
                throw std::runtime_error(&quot;Invalid state!&quot;);

            /* Invoke the in-place constructor. Note that this is needed even
            when the object just has a trivial default constructor */
            new (&amp;p) Add();

            /* Assign any additional state */
        });
}

And the example.py file looks like this:

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from python_example import Add


def python_add(i: int, j: int) -&gt; int:
    return i + j


def add_column_values_python(row: pd.Series) -&gt; pd.Series:
    row[&#39;sum&#39;] = python_add(row[&#39;i&#39;], row[&#39;j&#39;])
    return row


def add_column_values(row: pd.Series) -&gt; pd.Series:
    row[&#39;sum&#39;] = Add()(int(row[&#39;i&#39;]), int(row[&#39;j&#39;]))
    return row

def add_column_values_import(row: pd.Series) -&gt; pd.Series:
    from python_example import add
    row[&#39;sum&#39;] = add(int(row[&#39;i&#39;]), int(row[&#39;j&#39;]))
    return row


def main():
    dataframe = pd.read_csv(&#39;./example.csv&#39;, index_col=[])
    dataframe[&#39;sum&#39;] = np.nan

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values_python, axis=1)).compute(scheduler=&#39;processes&#39;)

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values, axis=1)).compute(scheduler=&#39;processes&#39;)

    with ProgressBar():
        d_dataframe = dd.from_pandas(dataframe, npartitions=16)
        dataframe = d_dataframe.map_partitions(
            lambda df: df.apply(add_column_values_import, axis=1)).compute(scheduler=&#39;processes&#39;)


if __name__ == &#39;__main__&#39;:
    main()

The idea is to put the functions inside a class and then define the __getstate__ and __setstate__ python magic functions or the import inside the function.

For more information:
https://pybind11-jagerman.readthedocs.io/en/stable/advanced.html

答案2

得分: 1

General

如果你想在Python中将一个对象从一个进程传递到另一个进程(使用Dask或不使用Dask),你需要一种序列化的方式。默认的方法是使用"pickle"。C库中的对象基本上是基于动态指针的东西,pickle不知道该如何处理它们。你可以通过提供getstate/setstatereduce双下划线方法来为你的C对象实现pickle协议。

另外,Dask有一个序列化层,你可以为特定类注册特定的序列化和反序列化函数,但这仅适用于分布式调度器,不适用于多进程(前者在各方面都更好,没有好的理由使用多进程)。

Specific

还有一些更简单的选项:

  • 使用线程调度器,这样就不需要序列化(C代码应该释放GIL并获得完全的并行性)。
  • 我认为只有add函数是问题所在,可能只需将你的导入移到add_column_values函数中,这样每个工作进程都会得到自己的副本,而不是从闭包中传递它。
英文:

General

If you want to pass an object from one process to another in python (with or without dask), you need a way to serialise it. The default method for this is "pickle". Objects in C libraries are fundamentally dynamic pointer-based things and pickle doesn't know what to do with them. You can implement the pickle protocol for your C object by providing getstate/setstate or reduce dunder methods.

Alternatively, dask has a layer of serialisation where you can register specific ser/de functions for specific classes, but that is only with the distributed scheduler, not multiprocessing (the former is better in every way, there is no good reason you should be using multiprocessing).

Specific

A couple of simpler options:

  • use the threading scheduler, so that no serialisation is needed (C code ought to release the GIL and get full parallelism)
  • I think it's only the add function that is the problem, it's probably enough to move your import into the add_column_values function, so that each worker gets its own copy instead of passing it from the closure.

huangapple
  • 本文由 发表于 2023年2月14日 21:36:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/75448632.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定