英文:
Subclassing `Process` to set process level constant
问题
I am trying to subclass mp.Process
to create process level constant to decide between GPUs on my desktop. To achieve this, I'd like to have a device id inside each process object and later pass it to a function in run
method. The example code here does not actually yet use self._gpu_id
and does without error handling, but attempts to create subclassed process with a gpu_id
.
From I attempted
import multiprocessing as mp
from typing import Type
# Attempt to override default behaviour of the `Process`
# to add a gpu_id parameter during construction.
class GPUProcessMeta(type):
def __call__(cls, *args, **kwargs):
obj = cls.__new__(cls, *args, **kwargs)
gpu_id = cls.next_id
if gpu_id in cls.used_ids:
raise RuntimeError(
f"Attempt to reserve reserved processor {gpu_id} {cls.used_ids=}"
)
cls.next_id += 1
cls.used_ids.append(gpu_id)
kwargs["gpu_id"] = gpu_id
obj.__init__(*args, **kwargs)
return obj
class GPUProcess(mp.Process, metaclass=GPUProcessMeta):
used_ids: list[int] = []
next_id: int = 0
def __init__(
self,
group=None,
target=None,
name=None,
args=(),
kwargs={},
*,
daemon=None,
gpu_id=None,
):
super(GPUProcess, self).__init__(
group,
target,
name,
args,
kwargs,
daemon=daemon,
)
self._gpu_id = gpu_id
@property
def gpu_id(self):
return self._gpu_id
def __del__(self):
GPUProcess.used_ids.remove(self.gpu_id)
def __repr__(self) -> str:
return f"<{type(self)} gpu_id={self.gpu_id} hash={hash(self)}>"
@classmethod
def create_gpu_context(cls) -> Type[mp.context.DefaultContext]:
context = mp.get_context()
context.Process = cls
return context
def test_gpu_pool():
ctx = GPUProcess.create_gpu_context()
with ctx.Pool(2) as pool:
payload = (tuple(range(3)) for _ in range(10))
response = pool.starmap(
_dummy_func,
payload,
)
assert response == ((0, 1, 2),) * 10
def _dummy_func(*args, **kwargs):
return args, kwargs
if __name__ == "__main__":
test_gpu_pool()
However, this crashes with RecursionError: maximum recursion depth exceeded
. I do not understand what could cause this. How can I create this implementation Process
with a unique process level variable?
EDIT: After some digging to python source
#
# Type of default context -- underlying context can be set at most once
#
class Process(process.BaseProcess):
_start_method = None
@staticmethod
def _Popen(process_obj):
return _default_context.get_context().Process._Popen(process_obj)
@staticmethod
def _after_fork():
return _default_context.get_context().Process._after_fork()
Where _POpen
causes call to _POpen
with infinite recursion and crashes my code. I am not sure how GPUProcess
can differ from standard Process
but the root for the crash is here.
英文:
I am trying to subclass mp.Process
to create process level constant to decide between GPUs on my desktop. To achieve this, I'd like to have a device id inside each process object and later pass it to a function in run
method. The example code here does not actually yet use self._gpu_id
and does without error handling, but attempts to create subclassed process with a gpu_id
.
From I attempted
import multiprocessing as mp
from typing import Type
# Attempt to override default behaviour of the `Process`
# to add a gpu_id parameter during construction.
class GPUProcessMeta(type):
def __call__(cls, *args, **kwargs):
obj = cls.__new__(cls, *args, **kwargs)
gpu_id = cls.next_id
if gpu_id in cls.used_ids:
raise RuntimeError(
f"Attempt to reserve reserved processor {gpu_id} {cls.used_ids=}"
)
cls.next_id += 1
cls.used_ids.append(gpu_id)
kwargs["gpu_id"] = gpu_id
obj.__init__(*args, **kwargs)
return obj
class GPUProcess(mp.Process, metaclass=GPUProcessMeta):
used_ids: list[int] = []
next_id: int = 0
def __init__(
self,
group=None,
target=None,
name=None,
args=(),
kwargs={},
*,
daemon=None,
gpu_id=None,
):
super(GPUProcess, self).__init__(
group,
target,
name,
args,
kwargs,
daemon=daemon,
)
self._gpu_id = gpu_id
@property
def gpu_id(self):
return self._gpu_id
def __del__(self):
GPUProcess.used_ids.remove(self.gpu_id)
def __repr__(self) -> str:
return f"<{type(self)} gpu_id={self.gpu_id} hash={hash(self)}>"
@classmethod
def create_gpu_context(cls) -> Type[mp.context.DefaultContext]:
context = mp.get_context()
context.Process = cls
return context
def test_gpu_pool():
ctx = GPUProcess.create_gpu_context()
with ctx.Pool(2) as pool:
payload = (tuple(range(3)) for _ in range(10))
response = pool.starmap(
_dummy_func,
payload,
)
assert response == ((0, 1, 2),) * 10
def _dummy_func(*args, **kwargs):
return args, kwargs
if __name__ == "__main__":
test_gpu_pool()
However, this crashes with RecursionError: maximum recursion depth exceeded
. I do not understand what could cause this. How can I create this implementation Process
with a unique process level variable?
EDIT: After some digging to python source
#
# Type of default context -- underlying context can be set at most once
#
class Process(process.BaseProcess):
_start_method = None
@staticmethod
def _Popen(process_obj):
return _default_context.get_context().Process._Popen(process_obj)
@staticmethod
def _after_fork():
return _default_context.get_context().Process._after_fork()
Where _POpen
causes call to _POpen
with infinite recursion and crashes my code. I am not sure how GPUProcess
can differ from standard Process
but the root for the crash is here.
答案1
得分: 2
以下是翻译好的部分:
看起来你实际上想要使用multiprocessing
shared memory来协调哪个子进程可以访问给定的GPU。
这里有一个简单的示例,其中有4个进程,每个进程有10个任务,它们竞争使用2个GPU。它使用了一个锁和一个共享内存数组;每个进程通过将其PID写入GPU数组的第一个空闲“槽”来标记自己作为给定GPU的所有者。
对于更适用于生产环境的示例,你需要添加一些错误容忍性(例如,确保工作者始终释放他们的GPU等)。
import multiprocessing
import os
import random
import time
from dataclasses import dataclass
@dataclass
class SharedVariables:
gpu_pool_lock: multiprocessing.Lock
gpu_array: multiprocessing.Array
def get_free_gpu(self):
with self.gpu_pool_lock:
for gpu_id, pid in enumerate(self.gpu_array):
if pid == 0:
self.gpu_array[gpu_id] = os.getpid()
return gpu_id
return None
def release_gpu(self, gpu_id):
with self.gpu_pool_lock:
assert self.gpu_array[gpu_id] == os.getpid()
self.gpu_array[gpu_id] = 0
shared_variables: SharedVariables
def init_worker(shv: SharedVariables):
global shared_variables
shared_variables = shv
def do_work(x):
print(f"Worker {os.getpid()} starting work {x}")
gpu_id = None
while gpu_id is None:
gpu_id = shared_variables.get_free_gpu()
time.sleep(0.5)
print(f"Worker {os.getpid()} got gpu {gpu_id}")
time.sleep(random.uniform(2, 4))
shared_variables.release_gpu(gpu_id)
print(f"Worker {os.getpid()} released gpu {gpu_id}")
time.sleep(0.5)
def main():
num_gpus = 2
shared_variables = SharedVariables(
gpu_pool_lock=multiprocessing.Lock(),
gpu_array=(multiprocessing.Array("i", [0] * num_gpus)),
)
with multiprocessing.Pool(
4,
initializer=init_worker,
initargs=(shared_variables,),
) as pool:
res = [pool.apply_async(do_work, args=(x,)) for x in range(10)]
while not (all(r.ready() for r in res)):
print("Main: GPUs in use:", shared_variables.gpu_array[:])
time.sleep(0.5)
if __name__ == "__main__":
main()
示例输出可能如下:
Main: GPUs in use: [0, 0]
Worker 41543 starting work 0
Worker 41542 starting work 1
Worker 41541 starting work 2
Worker 41544 starting work 3
Main: GPUs in use: [41543, 41542]
Worker 41543 got gpu 0
Worker 41542 got gpu 1
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Worker 41542 released gpu 1
Main: GPUs in use: [41543, 0]
Worker 41542 starting work 4
Worker 41543 released gpu 0
Main: GPUs in use: [0, 41544]
Worker 41544 got gpu 1
Worker 41543 starting work 5
Main: GPUs in use: [41541, 41544]
Worker 41541 got gpu 0
Main: GPUs in use: [41541, 41544]
英文:
Sounds like you actually want multiprocessing
shared memory to coordinate which subprocess can access a given GPU.
Here's a simple toy example where 4 processes with 10 jobs to do compete over 2 GPUs. It uses a lock and a shared memory array; each process marks itself as the owner of a given GPU by writing its PID into the first free "slot" of the GPU array.
For a more production-ready example, you would need to add some error tolerance (e.g. ensure the workers always release their GPUs, etc.)
import multiprocessing
import os
import random
import time
from dataclasses import dataclass
@dataclass
class SharedVariables:
gpu_pool_lock: multiprocessing.Lock
gpu_array: multiprocessing.Array
def get_free_gpu(self):
with self.gpu_pool_lock:
for gpu_id, pid in enumerate(self.gpu_array):
if pid == 0:
self.gpu_array[gpu_id] = os.getpid()
return gpu_id
return None
def release_gpu(self, gpu_id):
with self.gpu_pool_lock:
assert self.gpu_array[gpu_id] == os.getpid()
self.gpu_array[gpu_id] = 0
shared_variables: SharedVariables
def init_worker(shv: SharedVariables):
global shared_variables
shared_variables = shv
def do_work(x):
print(f"Worker {os.getpid()} starting work {x}")
gpu_id = None
while gpu_id is None:
gpu_id = shared_variables.get_free_gpu()
time.sleep(0.5)
print(f"Worker {os.getpid()} got gpu {gpu_id}")
time.sleep(random.uniform(2, 4))
shared_variables.release_gpu(gpu_id)
print(f"Worker {os.getpid()} released gpu {gpu_id}")
time.sleep(0.5)
def main():
num_gpus = 2
shared_variables = SharedVariables(
gpu_pool_lock=multiprocessing.Lock(),
gpu_array=(multiprocessing.Array("i", [0] * num_gpus)),
)
with multiprocessing.Pool(
4,
initializer=init_worker,
initargs=(shared_variables,),
) as pool:
res = [pool.apply_async(do_work, args=(x,)) for x in range(10)]
while not (all(r.ready() for r in res)):
print("Main: GPUs in use:", shared_variables.gpu_array[:])
time.sleep(0.5)
if __name__ == "__main__":
main()
An example output would be
Main: GPUs in use: [0, 0]
Worker 41543 starting work 0
Worker 41542 starting work 1
Worker 41541 starting work 2
Worker 41544 starting work 3
Main: GPUs in use: [41543, 41542]
Worker 41543 got gpu 0
Worker 41542 got gpu 1
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Main: GPUs in use: [41543, 41542]
Worker 41542 released gpu 1
Main: GPUs in use: [41543, 0]
Worker 41542 starting work 4
Worker 41543 released gpu 0
Main: GPUs in use: [0, 41544]
Worker 41544 got gpu 1
Worker 41543 starting work 5
Main: GPUs in use: [41541, 41544]
Worker 41541 got gpu 0
Main: GPUs in use: [41541, 41544]
etc.
答案2
得分: 0
以下是翻译好的部分:
这个类的目标是mp.Process
,它是当前上下文Process
的代理。这会导致底层multiprocessing
实现中的跳转,最终导致程序崩溃,出现RecursionError
错误。
正确的继承应该实现为:
import multiprocessing as mp
from typing import Type
class GPUProcess(mp.get_context().Process):
used_ids: list[int] = []
next_id: int = 0
def __init__(
self,
*args,
**kwargs,
):
super().__init__(
*args,
**kwargs,
)
gpu_id = GPUProcess.next_id
if gpu_id in GPUProcess.used_ids:
raise RuntimeError(
f"尝试保留已保留的处理器 {gpu_id} {self.used_ids=}"
)
GPUProcess.next_id += 1
GPUProcess.used_ids.append(gpu_id)
self._gpu_id = gpu_id
@property
def gpu_id(self):
return self._gpu_id
def __del__(self):
GPUProcess.used_ids.remove(self.gpu_id)
def __repr__(self) -> str:
return f"<{type(self)} gpu_id={self.gpu_id} hash={hash(self)}>"
@classmethod
def create_gpu_context(cls) -> Type[mp.context.DefaultContext]:
context = mp.get_context()
context.Process = cls
return context
def test_gpu_pool():
ctx = GPUProcess.create_gpu_context()
with ctx.Pool(2) as pool:
payload = (tuple(range(3)) for _ in range(3))
response = pool.starmap(
_dummy_func,
payload,
)
assert response == (
expected := list(
(
(
(0, 1, 2),
{},
),
)
* 3
)
), f"{response=},{expected=}"
def _dummy_func(*args, **kwargs):
return args, kwargs
if __name__ == "__main__":
test_gpu_pool()
这段代码不是最优的,可以使用set
作为id而不是列表,但总体思想在这里实现了。
英文:
The class targets a mp.Process
which is a proxy for the current context Process
. This leads to jumps in underlying multiprocessing
implementation eventually crashing the program with RecursionError
.
Correct inheritance should be implemented with
import multiprocessing as mp
from typing import Type
class GPUProcess(mp.get_context().Process):
used_ids: list[int] = []
next_id: int = 0
def __init__(
self,
*args,
**kwargs,
):
super().__init__(
*args,
**kwargs,
)
gpu_id = GPUProcess.next_id
if gpu_id in GPUProcess.used_ids:
raise RuntimeError(
f"Attempt to reserve reserved processor {gpu_id} {self.used_ids=}"
)
GPUProcess.next_id += 1
GPUProcess.used_ids.append(gpu_id)
self._gpu_id = gpu_id
@property
def gpu_id(self):
return self._gpu_id
def __del__(self):
GPUProcess.used_ids.remove(self.gpu_id)
def __repr__(self) -> str:
return f"<{type(self)} gpu_id={self.gpu_id} hash={hash(self)}>"
@classmethod
def create_gpu_context(cls) -> Type[mp.context.DefaultContext]:
context = mp.get_context()
context.Process = cls
return context
def test_gpu_pool():
ctx = GPUProcess.create_gpu_context()
with ctx.Pool(2) as pool:
payload = (tuple(range(3)) for _ in range(3))
response = pool.starmap(
_dummy_func,
payload,
)
assert response == (
expected := list(
(
(
(0, 1, 2),
{},
),
)
* 3
)
), f"{response=}!={expected}"
def _dummy_func(*args, **kwargs):
return args, kwargs
if __name__ == "__main__":
test_gpu_pool()
This code is not optimal and set
could be used as id instead of list, but the overall idea is implemented here.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论