如何使我的类实例在Python中可用于多进程序列化?

huangapple go评论52阅读模式
英文:

How can I make my class instance serializable for multiprocessing in Python?

问题

I'm sorry, but it seems like the code you provided contains specific technical details and code snippets that cannot be directly translated. However, I can offer you a summary of the issue and suggest a possible solution:

你想要在Python中创建一个类似Mathematica的库,名为Mathics,希望能够使用multiprocessing模块使程序能够并行运行。但是,由于MathicsSession类的实例化需要较长时间(约5秒),并且这些实例不能被multiprocessing模块序列化,因此每次在进程池中调用函数时都需要重新创建实例,导致代码的并行性能不如预期。

你希望能够只实例化一次会话,或者每个CPU实例化一次(如果可能的话)。最后,你提到了一个错误,指出你的会话无法被序列化。

可能的解决方案是尝试使MathicsSession的实例可以序列化,以便在多进程环境中重复使用。这可能需要对MathicsSession类进行一些自定义序列化的工作,以满足multiprocessing的要求。

英文:

I am currently working on a Mathematica-like library on Python called Mathics, and actually I want to make my program parallelizable with the multiprocessing module.

But I have a problem, to make mathematical evaluation with Mathics, I have to open a Mathics session, by making an instance of the class MathicsSession(). Making this instance take a lot of time (approximately 5 seconds), and this instance can't be pickled by the multiprocessing module, which means I have to instance my sessions each time I call my function in the pool map, which make my code with multiprocessing longer than my previous code. That's a problem because the main goal of my project is to be highly parallelizable...

I'm not sure I'm understandable, but what I want is to instance my sessions just one time, maybe one time per CPU (i don't know if it's possible), here's a code example of what i mean :

from multiprocessing import Pool, cpu_count
from mathics.session import MathicsSession


def evaluation(session):
    return session.evaluate("5+5")


if __name__ == '__main__':

    sessions = []
    for i in range(50):
        sessions.append(MathicsSession())

    with Pool(cpu_count()) as pool:
        result = pool.map(evaluation, sessions)

Here's the error showing that my sessions can't be pickled :

Traceback (most recent call last): File "/home/thales_usradm/PycharmProjects/Test/test2.py", line 16, in <module> result = pool.map(evaluation, sessions) File "/usr/lib/python3.10/multiprocessing/pool.py", line 367, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/usr/lib/python3.10/multiprocessing/pool.py", line 774, in get raise self._value File "/usr/lib/python3.10/multiprocessing/pool.py", line 540, in _handle_tasks put(task) File "/usr/lib/python3.10/multiprocessing/connection.py", line 206, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'Builtin.contribute.<locals>.check_options'

Maybe i can make the instance serializable ?

答案1

得分: 2

我不确定为什么您希望在每次调用evaluation时都创建一个新会话,如果正如您所说,创建会话是一个相对昂贵的操作,即使您能够序列化一个会话。我认为您应该为每个池进程创建一个会话,以便在该池进程中可重复使用,用于多次调用您的工作函数 evaluation

因此,您可以在创建池时指定一个池初始化程序。这是一个在初始化池时在每个池进程中执行一次的函数,通常会创建一个或多个全局变量,供您的工作函数访问。

也许这是一个更现实的示例。与其重新计算相同的值50次,我们将评估50个不同的表达式:

from multiprocessing import Pool, cpu_count
from mathics.session import MathicsSession

def init_pool_processes():
    """
    为每个池进程创建一个会话。
    """
    global session
    
    session = MathicsSession()
    
    
def evaluation(n):
    # session现在是一个全局变量
    return session.evaluate(f"5+{n}")


if __name__ == '__main__':
    with Pool(cpu_count(), initializer=init_pool_processes) as pool:
        result = pool.map(evaluation, range(50))
英文:

I am not sure why you would want to create a new session for each invocation of evaluation if, as you say, creating a session is a rather expensive operation even if you were able to pickle a session. I would think you would want to create one session for each pool process that will be reusable for multiple invocations of your worker function, evaluation, executing in that pool process.

Therefore, you can specify a pool initializer when you create your pool. This is a function that will be executed once in each pool process when the pool is initialized and it will typically create one or more global variables that your worker function can access.

Perhaps this is a more realistic example. Instead of re-computing the same value 50 times, we will evaluate 50 different expressions:

from multiprocessing import Pool, cpu_count
from mathics.session import MathicsSession

def init_pool_processes():
    """
    Create a session for each pool process.
    """
    global session
    
    session = MathicsSession()
    
    
def evaluation(n):
    # session is now a global variable
    return session.evaluate(f"5+{n}")


if __name__ == '__main__':
    with Pool(cpu_count(), initializer=init_pool_processes) as pool:
        result = pool.map(evaluation, range(50))

答案2

得分: 0

I believe you can use the initializer argument to Pool() in order to run a function once per process (which I think is once per CPU by default).

This example uses MyClass as an unpickleable object (you can't pickle classes defined below the module scope).

There is probably something better to do with initi than setting a global but this shows the general idea:

from multiprocessing import Pool
import time

sub_class = None

def get_sub_class():
    class MyClass():
        pass

    time.sleep(1)
    return MyClass()

def f(x):
    print(id(sub_class), x)

def initi():
    global sub_class
    sub_class = get_sub_class()

if __name__ == '__main__':
    mylist = list(range(5))
    
    with Pool(processes=4, initializer=initi) as pool: 
        pool.map(f, mylist)
英文:

I believe you can use the initializer argument to Pool() in order to run a function once per process (which I think is once per CPU by default).

This example uses MyClass as an unpickleable object (you can't pickle classes defined below the module scope).

There is probably something better to do with initi than setting a global but this shows the general idea:

from multiprocessing import Pool
import time

sub_class = None

def get_sub_class():
    class MyClass():
        pass

    time.sleep(1)
    return MyClass()

def f(x):
    print(id(sub_class), x)

def initi():
    global sub_class
    sub_class = get_sub_class()

if __name__ == '__main__':
    mylist = list(range(5))
    
    with Pool(processes=4, initializer=initi) as pool: 
        pool.map(f, mylist)

huangapple
  • 本文由 发表于 2023年5月25日 17:24:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/76330735.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定