子类化 `Process` 以设置进程级常量

huangapple go评论57阅读模式
英文:

Subclassing `Process` to set process level constant

问题

I am trying to subclass mp.Process to create process level constant to decide between GPUs on my desktop. To achieve this, I'd like to have a device id inside each process object and later pass it to a function in run method. The example code here does not actually yet use self._gpu_id and does without error handling, but attempts to create subclassed process with a gpu_id.

From I attempted

import multiprocessing as mp
from typing import Type

# Attempt to override default behaviour of the `Process`
# to add a gpu_id parameter during construction.
class GPUProcessMeta(type):
    def __call__(cls, *args, **kwargs):
        obj = cls.__new__(cls, *args, **kwargs)
        gpu_id = cls.next_id
        if gpu_id in cls.used_ids:
            raise RuntimeError(
                f"Attempt to reserve reserved processor {gpu_id} {cls.used_ids=}"
            )
        cls.next_id += 1
        cls.used_ids.append(gpu_id)
        kwargs["gpu_id"] = gpu_id
        obj.__init__(*args, **kwargs)
        return obj

class GPUProcess(mp.Process, metaclass=GPUProcessMeta):
    used_ids: list[int] = []
    next_id: int = 0

    def __init__(
        self,
        group=None,
        target=None,
        name=None,
        args=(),
        kwargs={},
        *,
        daemon=None,
        gpu_id=None,
    ):
        super(GPUProcess, self).__init__(
            group,
            target,
            name,
            args,
            kwargs,
            daemon=daemon,
        )
        self._gpu_id = gpu_id

    @property
    def gpu_id(self):
        return self._gpu_id

    def __del__(self):
        GPUProcess.used_ids.remove(self.gpu_id)

    def __repr__(self) -> str:
        return f"<{type(self)} gpu_id={self.gpu_id} hash={hash(self)}>"

    @classmethod
    def create_gpu_context(cls) -> Type[mp.context.DefaultContext]:
        context = mp.get_context()
        context.Process = cls
        return context

def test_gpu_pool():    
    ctx = GPUProcess.create_gpu_context()
    with ctx.Pool(2) as pool:
        payload = (tuple(range(3)) for _ in range(10))
        response = pool.starmap(
            _dummy_func,
            payload,
        )
    assert response == ((0, 1, 2),) * 10

def _dummy_func(*args, **kwargs):
    return args, kwargs

if __name__ == "__main__":
    test_gpu_pool()

However, this crashes with RecursionError: maximum recursion depth exceeded. I do not understand what could cause this. How can I create this implementation Process with a unique process level variable?

EDIT: After some digging to python source

#
# Type of default context -- underlying context can be set at most once
#

class Process(process.BaseProcess):
    _start_method = None
    @staticmethod
    def _Popen(process_obj):
        return _default_context.get_context().Process._Popen(process_obj)

    @staticmethod
    def _after_fork():
        return _default_context.get_context().Process._after_fork()

Where _POpen causes call to _POpen with infinite recursion and crashes my code. I am not sure how GPUProcess can differ from standard Process but the root for the crash is here.

英文:

I am trying to subclass mp.Process to create process level constant to decide between GPUs on my desktop. To achieve this, I'd like to have a device id inside each process object and later pass it to a function in run method. The example code here does not actually yet use self._gpu_id and does without error handling, but attempts to create subclassed process with a gpu_id.

From I attempted

import multiprocessing as mp
from typing import Type

# Attempt to override default behaviour of the `Process`
# to add a gpu_id parameter during construction.
class GPUProcessMeta(type):
    def __call__(cls, *args, **kwargs):
        obj = cls.__new__(cls, *args, **kwargs)
        gpu_id = cls.next_id
        if gpu_id in cls.used_ids:
            raise RuntimeError(
                f&quot;Attempt to reserve reserved processor {gpu_id} {cls.used_ids=}&quot;
            )
        cls.next_id += 1
        cls.used_ids.append(gpu_id)
        kwargs[&quot;gpu_id&quot;] = gpu_id
        obj.__init__(*args, **kwargs)
        return obj

class GPUProcess(mp.Process, metaclass=GPUProcessMeta):
    used_ids: list[int] = []
    next_id: int = 0

    def __init__(
        self,
        group=None,
        target=None,
        name=None,
        args=(),
        kwargs={},
        *,
        daemon=None,
        gpu_id=None,
    ):
        super(GPUProcess, self).__init__(
            group,
            target,
            name,
            args,
            kwargs,
            daemon=daemon,
        )
        self._gpu_id = gpu_id

    @property
    def gpu_id(self):
        return self._gpu_id

    def __del__(self):
        GPUProcess.used_ids.remove(self.gpu_id)

    def __repr__(self) -&gt; str:
        return f&quot;&lt;{type(self)} gpu_id={self.gpu_id} hash={hash(self)}&gt;&quot;

    @classmethod
    def create_gpu_context(cls) -&gt; Type[mp.context.DefaultContext]:
        context = mp.get_context()
        context.Process = cls
        return context

def test_gpu_pool():    
    ctx = GPUProcess.create_gpu_context()
    with ctx.Pool(2) as pool:
        payload = (tuple(range(3)) for _ in range(10))
        response = pool.starmap(
            _dummy_func,
            payload,
        )
    assert response == ((0, 1, 2),) * 10

def _dummy_func(*args, **kwargs):
    return args, kwargs

if __name__ == &quot;__main__&quot;:
    test_gpu_pool()

However, this crashes with RecursionError: maximum recursion depth exceeded. I do not understand what could cause this. How can I create this implementation Process with a unique process level variable?

EDIT: After some digging to python source

#
# Type of default context -- underlying context can be set at most once
#
class Process(process.BaseProcess):
_start_method = None
@staticmethod
def _Popen(process_obj):
return _default_context.get_context().Process._Popen(process_obj)
@staticmethod
def _after_fork():
return _default_context.get_context().Process._after_fork()

Where _POpen causes call to _POpen with infinite recursion and crashes my code. I am not sure how GPUProcess can differ from standard Process but the root for the crash is here.

答案1

得分: 2

以下是翻译好的部分:

看起来你实际上想要使用multiprocessing shared memory来协调哪个子进程可以访问给定的GPU。

这里有一个简单的示例,其中有4个进程,每个进程有10个任务,它们竞争使用2个GPU。它使用了一个锁和一个共享内存数组;每个进程通过将其PID写入GPU数组的第一个空闲“槽”来标记自己作为给定GPU的所有者。

对于更适用于生产环境的示例,你需要添加一些错误容忍性(例如,确保工作者始终释放他们的GPU等)。

import multiprocessing
import os
import random
import time
from dataclasses import dataclass

@dataclass
class SharedVariables:
    gpu_pool_lock: multiprocessing.Lock
    gpu_array: multiprocessing.Array

    def get_free_gpu(self):
        with self.gpu_pool_lock:
            for gpu_id, pid in enumerate(self.gpu_array):
                if pid == 0:
                    self.gpu_array[gpu_id] = os.getpid()
                    return gpu_id
            return None

    def release_gpu(self, gpu_id):
        with self.gpu_pool_lock:
            assert self.gpu_array[gpu_id] == os.getpid()
            self.gpu_array[gpu_id] = 0

shared_variables: SharedVariables

def init_worker(shv: SharedVariables):
    global shared_variables
    shared_variables = shv

def do_work(x):
    print(f"Worker {os.getpid()} starting work {x}")
    gpu_id = None
    while gpu_id is None:
        gpu_id = shared_variables.get_free_gpu()
        time.sleep(0.5)
    print(f"Worker {os.getpid()} got gpu {gpu_id}")
    time.sleep(random.uniform(2, 4))
    shared_variables.release_gpu(gpu_id)
    print(f"Worker {os.getpid()} released gpu {gpu_id}")
    time.sleep(0.5)

def main():
    num_gpus = 2
    shared_variables = SharedVariables(
        gpu_pool_lock=multiprocessing.Lock(),
        gpu_array=(multiprocessing.Array("i", [0] * num_gpus)),
    )
    with multiprocessing.Pool(
        4,
        initializer=init_worker,
        initargs=(shared_variables,),
    ) as pool:
        res = [pool.apply_async(do_work, args=(x,)) for x in range(10)]
        while not (all(r.ready() for r in res)):
            print("Main: GPUs in use:", shared_variables.gpu_array[:])
            time.sleep(0.5)

if __name__ == "__main__":
    main()

示例输出可能如下:

Main: GPUs in use: [0, 0]
Worker 41543 starting work 0
Worker 41542 starting work 1
Worker 41541 starting work 2
Worker 41544 starting work 3
Main: GPUs in use: [41543, 41542]
Worker 41543 got gpu 0
Worker 41542 got gpu 1
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Worker 41542 released gpu 1
Main: GPUs in use: [41543, 0]
Worker 41542 starting work 4
Worker 41543 released gpu 0
Main: GPUs in use: [0, 41544]
Worker 41544 got gpu 1
Worker 41543 starting work 5
Main: GPUs in use: [41541, 41544]
Worker 41541 got gpu 0
Main: GPUs in use: [41541, 41544]
英文:

Sounds like you actually want multiprocessing shared memory to coordinate which subprocess can access a given GPU.

Here's a simple toy example where 4 processes with 10 jobs to do compete over 2 GPUs. It uses a lock and a shared memory array; each process marks itself as the owner of a given GPU by writing its PID into the first free "slot" of the GPU array.

For a more production-ready example, you would need to add some error tolerance (e.g. ensure the workers always release their GPUs, etc.)

import multiprocessing
import os
import random
import time
from dataclasses import dataclass
@dataclass
class SharedVariables:
gpu_pool_lock: multiprocessing.Lock
gpu_array: multiprocessing.Array
def get_free_gpu(self):
with self.gpu_pool_lock:
for gpu_id, pid in enumerate(self.gpu_array):
if pid == 0:
self.gpu_array[gpu_id] = os.getpid()
return gpu_id
return None
def release_gpu(self, gpu_id):
with self.gpu_pool_lock:
assert self.gpu_array[gpu_id] == os.getpid()
self.gpu_array[gpu_id] = 0
shared_variables: SharedVariables
def init_worker(shv: SharedVariables):
global shared_variables
shared_variables = shv
def do_work(x):
print(f&quot;Worker {os.getpid()} starting work {x}&quot;)
gpu_id = None
while gpu_id is None:
gpu_id = shared_variables.get_free_gpu()
time.sleep(0.5)
print(f&quot;Worker {os.getpid()} got gpu {gpu_id}&quot;)
time.sleep(random.uniform(2, 4))
shared_variables.release_gpu(gpu_id)
print(f&quot;Worker {os.getpid()} released gpu {gpu_id}&quot;)
time.sleep(0.5)
def main():
num_gpus = 2
shared_variables = SharedVariables(
gpu_pool_lock=multiprocessing.Lock(),
gpu_array=(multiprocessing.Array(&quot;i&quot;, [0] * num_gpus)),
)
with multiprocessing.Pool(
4,
initializer=init_worker,
initargs=(shared_variables,),
) as pool:
res = [pool.apply_async(do_work, args=(x,)) for x in range(10)]
while not (all(r.ready() for r in res)):
print(&quot;Main: GPUs in use:&quot;, shared_variables.gpu_array[:])
time.sleep(0.5)
if __name__ == &quot;__main__&quot;:
main()

An example output would be

Main: GPUs in use: [0, 0]
Worker 41543 starting work 0
Worker 41542 starting work 1
Worker 41541 starting work 2
Worker 41544 starting work 3
Main: GPUs in use: [41543, 41542]
Worker 41543 got gpu 0
Worker 41542 got gpu 1
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Worker 41542 released gpu 1
Main: GPUs in use: [41543, 0]
Worker 41542 starting work 4
Worker 41543 released gpu 0
Main: GPUs in use: [0, 41544]
Worker 41544 got gpu 1
Worker 41543 starting work 5
Main: GPUs in use: [41541, 41544]
Worker 41541 got gpu 0
Main: GPUs in use: [41541, 41544]

etc.

答案2

得分: 0

以下是翻译好的部分:

这个类的目标是mp.Process,它是当前上下文Process的代理。这会导致底层multiprocessing实现中的跳转,最终导致程序崩溃,出现RecursionError错误。

正确的继承应该实现为:

import multiprocessing as mp
from typing import Type

class GPUProcess(mp.get_context().Process):
    used_ids: list[int] = []
    next_id: int = 0

    def __init__(
        self,
        *args,
        **kwargs,
    ):
        super().__init__(
            *args,
            **kwargs,
        )
        gpu_id = GPUProcess.next_id
        if gpu_id in GPUProcess.used_ids:
            raise RuntimeError(
                f"尝试保留已保留的处理器 {gpu_id} {self.used_ids=}"
            )
        GPUProcess.next_id += 1
        GPUProcess.used_ids.append(gpu_id)
        self._gpu_id = gpu_id

    @property
    def gpu_id(self):
        return self._gpu_id

    def __del__(self):
        GPUProcess.used_ids.remove(self.gpu_id)

    def __repr__(self) -> str:
        return f"<{type(self)} gpu_id={self.gpu_id} hash={hash(self)}>"

    @classmethod
    def create_gpu_context(cls) -> Type[mp.context.DefaultContext]:
        context = mp.get_context()
        context.Process = cls
        return context

def test_gpu_pool():    
    ctx = GPUProcess.create_gpu_context()
    with ctx.Pool(2) as pool:
        payload = (tuple(range(3)) for _ in range(3))
        response = pool.starmap(
            _dummy_func,
            payload,
        )
    assert response == (
        expected := list(
            (
                (
                    (0, 1, 2),
                    {},
                ),
            )
            * 3
        )
    ), f"{response=},{expected=}"

def _dummy_func(*args, **kwargs):
    return args, kwargs

if __name__ == "__main__":
    test_gpu_pool()

参考cpython问题#103645

这段代码不是最优的,可以使用set作为id而不是列表,但总体思想在这里实现了。

英文:

The class targets a mp.Process which is a proxy for the current context Process. This leads to jumps in underlying multiprocessing implementation eventually crashing the program with RecursionError.

Correct inheritance should be implemented with

import multiprocessing as mp
from typing import Type

class GPUProcess(mp.get_context().Process):
    used_ids: list[int] = []
    next_id: int = 0

    def __init__(
        self,
        *args,
        **kwargs,
    ):
        super().__init__(
            *args,
            **kwargs,
        )
        gpu_id = GPUProcess.next_id
        if gpu_id in GPUProcess.used_ids:
            raise RuntimeError(
                f&quot;Attempt to reserve reserved processor {gpu_id} {self.used_ids=}&quot;
            )
        GPUProcess.next_id += 1
        GPUProcess.used_ids.append(gpu_id)
        self._gpu_id = gpu_id

    @property
    def gpu_id(self):
        return self._gpu_id

    def __del__(self):
        GPUProcess.used_ids.remove(self.gpu_id)

    def __repr__(self) -&gt; str:
        return f&quot;&lt;{type(self)} gpu_id={self.gpu_id} hash={hash(self)}&gt;&quot;

    @classmethod
    def create_gpu_context(cls) -&gt; Type[mp.context.DefaultContext]:
        context = mp.get_context()
        context.Process = cls
        return context

def test_gpu_pool():    
    ctx = GPUProcess.create_gpu_context()
    with ctx.Pool(2) as pool:
        payload = (tuple(range(3)) for _ in range(3))
        response = pool.starmap(
            _dummy_func,
            payload,
        )
    assert response == (
        expected := list(
            (
                (
                    (0, 1, 2),
                    {},
                ),
            )
            * 3
        )
    ), f&quot;{response=}!={expected}&quot;

def _dummy_func(*args, **kwargs):
    return args, kwargs

if __name__ == &quot;__main__&quot;:
    test_gpu_pool()

Ref cpython issue #103645

This code is not optimal and set could be used as id instead of list, but the overall idea is implemented here.

huangapple
  • 本文由 发表于 2023年4月19日 18:45:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/76053605.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定