Sharing a large numpy array across python multiprocessing map.

huangapple go评论50阅读模式
英文:

Sharing a large numpy array across python multiprocessing map

问题

我有一个脚本,它是并行化的,但需要读取一个可能在不同情况下大小不同的大型numpy数组(可能超过100GB)。这个数组是只读的。我有一个类对象(比如A),它正在并行化,从另一个类(比如B)的函数内部调用。我开始将数组设置为类B的成员,但这意味着对于每个创建的线程,对象都会被序列化然后加载,这导致程序变得非常慢(我解决了这个问题,将数组分成小批次,这样可以使程序运行速度更快)。然后我尝试将数组设置为全局变量,但这也比预期的慢。有没有更好的方法来解决这个问题,以获得最快的解决方案?

英文:

I have a script which is parallelized but requires reading from a large numpy array whose size might vary between different cases(can be 100GB+). The array is to be read-only. I have a class object(say A) which is being parallelized and is being called from inside a function of a different class(say B). I started of by setting the array as a member of class B but this meant that for every thread being created, the object was being pickled and then loaded which caused the program to be very slow(I worked around this to divide the array into small batches and that helped it to be much faster). I then tried to make the array a global variable but this is also slower than expected. Is there a better approach to this problem to get the fastest solution?

答案1

得分: 1

以下是翻译好的部分:

以下(不切实际的)示例通过使用多进程逐个添加每一行,然后在部分和上执行最终求和来添加只读的 numpy 数组的元素。

numpy 数组的缓冲区创建了一个共享内存数组。然后使用“进程池初始化器”来使用基于共享数组的 numpy 数组初始化每个池进程。

import numpy as np
import multiprocessing
import ctypes

def init_process(shared_array, shape):
    global arr

    arr = shared_array_to_np_array(shared_array, shape)

def worker(row_number):
    return np.sum(arr[row_number])

def np_array_to_shared_array(arr, ctype):
    shared_array = multiprocessing.RawArray(ctype, arr.size)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

def shared_array_to_np_array(shared_array, shape):
    return np.ctypeslib.as_array(shared_array).reshape(shape)

def main():
    """
    通过对每一行进行求和,然后将部分和相加来求和所有元素。
    """

    arr = np.array([[1, 2, 3, 4],
                    [5, 6, 7, 8,],
                    [9, 10, 11, 12]], dtype=np.int32)
    shape = arr.shape
    n_rows = shape[0]
    dtype = arr.dtype
    shared_array = np_array_to_shared_array(arr, ctypes.c_int32)
    arr = shared_array_to_np_array(shared_array, shape)
    with multiprocessing.Pool(n_rows, initializer=init_process, initargs=(shared_array, shape)) as pool:
        print(sum(pool.map(worker, range(n_rows))))

if __name__ == '__main__':
    main()

输出:

78
英文:

The following (unrealistic) example adds the elements of a read-only numpy array by using multiprocessing to individually add each row and then performing a final summation over the partial sums.

A shared memory array is created from the numpy arrays buffer. A "process pool initializer" is then used to initialize each pool process with a numpy array based on the shared array.

import numpy as np
import multiprocessing
import ctypes

def init_process(shared_array, shape):
    global arr

    arr = shared_array_to_np_array(shared_array, shape)

def worker(row_number):
    return np.sum(arr[row_number])

def np_array_to_shared_array(arr, ctype):
    shared_array = multiprocessing.RawArray(ctype, arr.size)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

def shared_array_to_np_array(shared_array, shape):
    return np.ctypeslib.as_array(shared_array).reshape(shape)

def main():
    """
    Sum all elements by summing each row and then adding together
    the partial sums.
    """

    arr = np.array([[1, 2, 3, 4],
                    [5, 6, 7, 8,],
                    [9, 10, 11, 12]], dtype=np.int32)
    shape = arr.shape
    n_rows = shape[0]
    dtype = arr.dtype
    shared_array = np_array_to_shared_array(arr, ctypes.c_int32)
    arr = shared_array_to_np_array(shared_array, shape)
    with multiprocessing.Pool(n_rows, initializer=init_process, initargs=(shared_array, shape)) as pool:
        print(sum(pool.map(worker, range(n_rows))))

if __name__ == '__main__':
    main()

Prints:

78

huangapple
  • 本文由 发表于 2023年5月22日 02:44:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/76301413.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定