英文:
Sharing a large numpy array across python multiprocessing map
问题
我有一个脚本,它是并行化的,但需要读取一个可能在不同情况下大小不同的大型numpy数组(可能超过100GB)。这个数组是只读的。我有一个类对象(比如A),它正在并行化,从另一个类(比如B)的函数内部调用。我开始将数组设置为类B的成员,但这意味着对于每个创建的线程,对象都会被序列化然后加载,这导致程序变得非常慢(我解决了这个问题,将数组分成小批次,这样可以使程序运行速度更快)。然后我尝试将数组设置为全局变量,但这也比预期的慢。有没有更好的方法来解决这个问题,以获得最快的解决方案?
英文:
I have a script which is parallelized but requires reading from a large numpy array whose size might vary between different cases(can be 100GB+). The array is to be read-only. I have a class object(say A) which is being parallelized and is being called from inside a function of a different class(say B). I started of by setting the array as a member of class B but this meant that for every thread being created, the object was being pickled and then loaded which caused the program to be very slow(I worked around this to divide the array into small batches and that helped it to be much faster). I then tried to make the array a global variable but this is also slower than expected. Is there a better approach to this problem to get the fastest solution?
答案1
得分: 1
以下是翻译好的部分:
以下(不切实际的)示例通过使用多进程逐个添加每一行,然后在部分和上执行最终求和来添加只读的 numpy
数组的元素。
从 numpy
数组的缓冲区创建了一个共享内存数组。然后使用“进程池初始化器”来使用基于共享数组的 numpy
数组初始化每个池进程。
import numpy as np
import multiprocessing
import ctypes
def init_process(shared_array, shape):
global arr
arr = shared_array_to_np_array(shared_array, shape)
def worker(row_number):
return np.sum(arr[row_number])
def np_array_to_shared_array(arr, ctype):
shared_array = multiprocessing.RawArray(ctype, arr.size)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def shared_array_to_np_array(shared_array, shape):
return np.ctypeslib.as_array(shared_array).reshape(shape)
def main():
"""
通过对每一行进行求和,然后将部分和相加来求和所有元素。
"""
arr = np.array([[1, 2, 3, 4],
[5, 6, 7, 8,],
[9, 10, 11, 12]], dtype=np.int32)
shape = arr.shape
n_rows = shape[0]
dtype = arr.dtype
shared_array = np_array_to_shared_array(arr, ctypes.c_int32)
arr = shared_array_to_np_array(shared_array, shape)
with multiprocessing.Pool(n_rows, initializer=init_process, initargs=(shared_array, shape)) as pool:
print(sum(pool.map(worker, range(n_rows))))
if __name__ == '__main__':
main()
输出:
78
英文:
The following (unrealistic) example adds the elements of a read-only numpy
array by using multiprocessing to individually add each row and then performing a final summation over the partial sums.
A shared memory array is created from the numpy
arrays buffer. A "process pool initializer" is then used to initialize each pool process with a numpy
array based on the shared array.
import numpy as np
import multiprocessing
import ctypes
def init_process(shared_array, shape):
global arr
arr = shared_array_to_np_array(shared_array, shape)
def worker(row_number):
return np.sum(arr[row_number])
def np_array_to_shared_array(arr, ctype):
shared_array = multiprocessing.RawArray(ctype, arr.size)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def shared_array_to_np_array(shared_array, shape):
return np.ctypeslib.as_array(shared_array).reshape(shape)
def main():
"""
Sum all elements by summing each row and then adding together
the partial sums.
"""
arr = np.array([[1, 2, 3, 4],
[5, 6, 7, 8,],
[9, 10, 11, 12]], dtype=np.int32)
shape = arr.shape
n_rows = shape[0]
dtype = arr.dtype
shared_array = np_array_to_shared_array(arr, ctypes.c_int32)
arr = shared_array_to_np_array(shared_array, shape)
with multiprocessing.Pool(n_rows, initializer=init_process, initargs=(shared_array, shape)) as pool:
print(sum(pool.map(worker, range(n_rows))))
if __name__ == '__main__':
main()
Prints:
78
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论