Multiprocessed reduce function hangs when task queue size of above 1200 elements in Python.

huangapple go评论68阅读模式
英文:

Multiprocessed reduce function hangs when task queue size of above 1200 elements in Python

问题

这个程序在to_sum列表大小达到约1150时工作正常。之后,进程将在task_queue = result_queue的第一个点挂起。它们将成功填充结果队列并终止,但然后会挂起。如果数组大小低于1150,此问题将不会发生。有没有任何可能导致这个问题的想法?

英文:

the program works fine untill the size of the to_sum list reaches about 1150. Afterwards, the processes will hang at the first point where task_queue = result_queue. They will successfully fill the result queue and terminate, but then will hang. This problem does not happen if the size of the array is below 1150.
Restarting the computer sometimes allows for the program to work with an array of bigger size before it hangs due to size, but it is always around the 1100-1300 range.
Do you have any idea what could cause this problem?

    import multiprocessing

class CustomProcess(multiprocessing.Process):

    def __init__(self, name, task_queue, result_queue, lock, chunks=2, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.name = name
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.chunks = chunks
        self.lock = lock

    def run(self):
        while True:
            """
            Using a lock to avoid a race condition where 2 threads both get a number, then
            both to get another, but it is None so they both put the number back resulting in a result queue
            with bigger size. For example:
            Expected result_queue_size = 500, current_queue_size 499, after summation of 1 and 2 we will add 3 to result queue
            achieving 500.
            With race condition result is 501.
            [1, 2, None, None]
            Thread 1 gets 1.
            Thread 2 gets 2.
            Thread 1 gets None and puts 1 in result queue instead of getting 2 and summing.
            Thread 2 gets None and puts 2 in result queue instead of just getting the first None and returning.
            A lock on both gets removes the race condition.
            """
            with self.lock:
                if not self.task_queue.empty():
                    number_1 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_1 is None:
                        #Poison pill - terminate.
                        print(f"Terminated {self.name}")
                        return
                else:
                    #Queue empty - terminate.
                    return
                if not self.task_queue.empty():
                    number_2 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_2 is None:
                        #Cannot compute sum of 1 number so just add number_1 to result_queue and terminate since poison pill
                        #acquired.
                        self.result_queue.put(number_1)
                        print(f"Terminated {self.name}")
                        return
                else:
                    self.result_queue.put(number_1)
                    #Queue empty, put the 1 number in result queue and terminate.
                    return
            self.result_queue.put(number_1 + number_2)


def multiprocess_sum(array):
    if len(array) == 1:
        return array[0]
    lock = multiprocessing.Lock()
    task_queue = multiprocessing.JoinableQueue()
    [task_queue.put(element) for element in to_sum]
    task_queue_size = len(array)

    while task_queue_size > 1:
        print(task_queue.qsize(), task_queue_size)
        result_queue = multiprocessing.JoinableQueue()
        processes = [CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue, lock=lock)
                    for i in range(8)]
        [task_queue.put(None) for process in processes]
        [process.start() for process in processes]
        #[process.join() for process in processes]
        task_queue.join()
        task_queue = result_queue
        task_queue_size = task_queue_size // 2 + task_queue_size % 2
    return result_queue.get()

if __name__ == "__main__":
    to_sum = [i for i in range(1350)]
    """
    If range is below 1200, the program will run and compute everything correctly.
    If it is above it, it will hang at the first halving, the moment the first task_queue is empty and the
    result_queue becomes the new task_queue. 
    Computer restart will make the range values fluctuate, yesterday it would hang at 1177 but run fine up to 1776.
    Queue pipe full???
    """
    print(sum(to_sum))
    for i in range(5):
        print(multiprocess_sum(to_sum))

答案1

得分: 2

以下是代码部分的翻译:

这段代码中首先对多进程队列上的`empty``qsize`调用是不可靠的不应该使用请阅读文档)。其次您正在使用的锁阻止了任何真正的多进程处理因为在`run`方法中执行的大部分处理都是串行执行的第三要将1359个数字相加需要1349次加法否则怎么可能呢?)。因此只需将您的1350个数字均匀分成8个列表让每个进程对其列表进行求和并返回结果然后只需将8个返回的值相加即可获得最终结果

随着`to_sum`的大小增加使用多进程在总运行时间中的贡献将变得越来越微不足道

```python
import multiprocessing
from functools import reduce
from operator import add

class CustomProcess(multiprocessing.Process):

    def __init__(self, name, task_queue, result_queue, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        self.result_queue.put(reduce(add, self.task_queue.get(), 0))

def split(iterable, n):  # 将可迭代对象分成n个均匀的部分的函数
    if type(iterable) is range and iterable.step != 1:
        # 算法不适用于步长不为1的情况:
        iterable = list(iterable)
    l = len(iterable)
    n = min(l, n)
    k, m = divmod(l, n)
    return (iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

N_PROCESSES = 8

def multiprocess_sum(l):
    if len(l) == 1:
        return l[0]

    task_queue = multiprocessing.Queue()
    lists = split(l, N_PROCESSES)
    for l in lists:
        task_queue.put(l)

    result_queue = multiprocessing.Queue()

    processes = [
        CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue)
        for i in range(N_PROCESSES)
    ]
    for process in processes:
        process.start()

    the_sum = reduce(add, (result_queue.get() for _ in range(N_PROCESSES)), 0)

    for process in processes:
        process.join()

    return the_sum

if __name__ == "__main__":
    to_sum = list(range(1350))
    print(sum(to_sum))
    for i in range(5):
        print(multiprocess_sum(to_sum))

打印结果:

910575
910575
910575
910575
910575
910575

我想知道要使用多进程来解决这个特定问题,to_sum列表必须有多大才能获得任何时间节省。

英文:

This is too long for a comment, and so:

First and foremost, calls to empty and qsize on multiprocessing queues are unreliable and should not be used (read the docs). Second, the lock you are using prevents any real multiprocessing from occurring since the bulk of processing done in your run method is being executed serially. Third, to add up 1359 numbers requires 1349 additions (how can it be otherwise?). So simply divide your 1350 numbers into 8 lists as evenly as possible and have each of your processes sum their lists and return the results. Then just add the 8 returned values to get the final result.

As the size of your to_sum grows the more negligible is the contribution to the total running time made by the final addition of the 8 partial sums.

import multiprocessing
from functools import reduce
from operator import add

class CustomProcess(multiprocessing.Process):

    def __init__(self, name, task_queue, result_queue, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        self.result_queue.put(reduce(add, self.task_queue.get(), 0))

def split(iterable, n):  # function to split iterable in n even parts
    if type(iterable) is range and iterable.step != 1:
        # algorithm doesn't work with steps other than 1:
        iterable = list(iterable)
    l = len(iterable)
    n = min(l, n)
    k, m = divmod(l, n)
    return (iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

N_PROCESSES = 8

def multiprocess_sum(l):
    if len(l) == 1:
        return l[0]

    task_queue = multiprocessing.Queue()
    lists = split(l, N_PROCESSES)
    for l in lists:
        task_queue.put(l)

    result_queue = multiprocessing.Queue()

    processes = [
        CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue)
        for i in range(N_PROCESSES)
    ]
    for process in processes:
        process.start()

    the_sum = reduce(add, (result_queue.get() for _ in range(N_PROCESSES)), 0)

    for process in processes:
        process.join()

    return the_sum

if __name__ == "__main__":
    to_sum = list(range(1350))
    print(sum(to_sum))
    for i in range(5):
        print(multiprocess_sum(to_sum))

Prints:

910575
910575
910575
910575
910575
910575

I wonder how large the to_sum list must be to gain any time savings by using multiprocessing for this particular problem.

Update

To do the reduction per the OP's attempt, I would use the following code. Since calls to qsize on a queue are not reliable, we keep track of the number of items on a queue using a sharable integer Value that must be incremented or decremented under control of a lock. We also create the processes once. Read the comments, please.

import multiprocessing

class CustomProcess(multiprocessing.Process):

    def __init__(self,
                 name,
                 task_queue,
                 result_queue,
                 task_queue_size,
                 result_queue_size,
                 condition,
                 *args,
                 **kwargs
                 ):
        super().__init__(name=name, *args, **kwargs)
        self.name = name
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.task_queue_size = task_queue_size
        self.result_queue_size = result_queue_size
        self.condition = condition

    def run(self):
        task_queue = self.task_queue
        result_queue = self.result_queue
        task_queue_size = self.task_queue_size
        result_queue_size = self.result_queue_size
        condition = self.condition

        task_queue_size_lock = task_queue_size.get_lock()

        while True:
            # When the task queue size goes down to zero, the main process
            # will move all the items from the result queue to the
            # task queue and then set the new task queue size.
            # We must not attempt to process the task queue while
            # this occurs, i.e. we must wait until the task queue size
            # is again non-zero:
            with condition:
                while task_queue_size.value == 0:
                    condition.wait()

            # No need to acquire lock for this test:
            if task_queue_size.value < 0:
                return # We are done

            # There is no gurantee we will find anything on the input queue:
            task_queue_size_lock.acquire()
            if task_queue_size.value == 0:
                task_queue_size_lock.release()
                continue

            number_1 = task_queue.get()
            task_queue_size.value -= 1
            if task_queue_size.value == 0:
                # put number on result_queue:
                task_queue_size_lock.release()
                result_queue.put(number_1)
                with result_queue_size.get_lock():
                    result_queue_size.value += 1
                task_queue.task_done()
            else:
                number_2 = task_queue.get()
                task_queue_size.value -= 1
                task_queue_size_lock.release()
                # No lock is held for the actual reduction operation:
                result_queue.put(number_1 + number_2)
                with result_queue_size.get_lock():
                    result_queue_size.value += 1
                # Since we have tasken off 2 elements from the task queue:
                task_queue.task_done()
                task_queue.task_done()

def multiprocess_sum(array):
    n = len(array)
    if n == 1:
        return array[0]

    task_queue = multiprocessing.JoinableQueue()
    # You should be iterating array, not to_sun and
    # using a comprehension for its side effect is not Pythonic:
    for element in array:
        task_queue.put(element)
    task_queue_size = multiprocessing.Value('i', n)

    result_queue = multiprocessing.Queue()
    result_queue_size = multiprocessing.Value('i', 0)

    condition = multiprocessing.Condition()

    processes = [
        CustomProcess(name=str(i),
                      task_queue=task_queue,
                      result_queue=result_queue,
                      task_queue_size=task_queue_size,
                      result_queue_size=result_queue_size,
                      condition=condition,
                      )
        for i in range(8)
    ]
    for process in processes:
        process.start()

    while True:
        print('n =', n)

        # Wait for task_queue to be emptied:
        task_queue.join()

        # Now we can be sure that the child processes are no longer retrieving from the task queue
        # and putting items on the result queue:

        n = result_queue_size.value
        if n == 1:
            print('n =', n)
            result = result_queue.get()
            # Tell child processes to terminate:
            with condition:
                task_queue_size.value = -1
                condition.notify_all()
            return result

        # Child processes get their input from task queue.
        # So we must get items from the result queue and move them to the task queue.
        # The task queue size is now 0 so this should pause the child processes.
        for _ in range(n):
            task_queue.put(result_queue.get())
        result_queue_size.value = 0
        # Allow children to run
        with condition:
            # The new n value will be half of the previous n value (more or less)
            task_queue_size.value = n
            condition.notify_all()

if __name__ == "__main__":
    to_sum = [i for i in range(1350)]
    print(sum(to_sum))


    for i in range(5):
        print(multiprocess_sum(to_sum))

Prints:

910575
task queue size: 1350
task queue size: 675
task queue size: 338
task queue size: 169
task queue size: 85
task queue size: 43
task queue size: 22
task queue size: 11
task queue size: 6
task queue size: 3
task queue size: 2
task queue size: 1
910575
task queue size: 1350
task queue size: 675
task queue size: 338
task queue size: 169
task queue size: 85
task queue size: 43
task queue size: 22
task queue size: 11
task queue size: 6
task queue size: 3
task queue size: 2
task queue size: 1
910575
task queue size: 1350
task queue size: 675
etc.

Update 2

But simpler yet is to use a multiprocessing pool and to pass to the worker function pairs of items to be reduced:

import multiprocessing
from itertools import chain

def custom_process(pair):
    return pair[0] + pair[1] if len(pair) == 2 else pair[0]

def multiprocess_sum(array):
    n = len(array)
    if n == 1:
        return array[0]

    results = array
    pool = multiprocessing.Pool(8)
    while True:
        print('n =', n)
        if n == 1:
            break

        # Create pairs:
        pairs = zip(results[:n//2], results[n//2:])
        # Did we start with an odd number of elements?
        if n % 2 == 1:
            pairs = chain(pairs, ((results[-1],),))
        # specify a chunksize for improved performance:
        results = list(pool.imap_unordered(custom_process, pairs))
        n = len(results)
    result = results[0]
    pool.close()
    pool.join()
    return result

if __name__ == "__main__":
    to_sum = [i for i in range(1350)]
    print(sum(to_sum))


    for i in range(5):
        print(multiprocess_sum(to_sum))

Prints:

910575
n = 1350
n = 675
n = 338
n = 169
n = 85
n = 43
n = 22
n = 11
n = 6
n = 3
n = 2
n = 1
910575
n = 1350
n = 675

答案2

得分: 0

以下是翻译好的部分:

使用管理器解决了这个问题。

import multiprocessing

class CustomProcess(multiprocessing.Process):
    def __init__(self, name, task_queue, result_queue, lock, chunks=2, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.name = name
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.chunks = chunks
        self.lock = lock

    def run(self):
        while True:
            with self.lock:
                if not self.task_queue.empty():
                    number_1 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_1 is None:
                        # 毒丸 - 终止。
                        print(f"终止 {self.name}")
                        return
                else:
                    # 队列为空 - 终止。
                    return
                if not self.task_queue.empty():
                    number_2 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_2 is None:
                        # 无法计算1个数字的和,因此只需将number_1添加到result_queue并终止,因为已获取毒丸。
                        self.result_queue.put(number_1)
                        print(f"终止 {self.name}")
                        return
                else:
                    self.result_queue.put(number_1)
                    # 队列为空,将第一个数字放入result队列并终止。
                    return
            self.result_queue.put(number_1 + number_2)

def multiprocess_sum(array):
    if len(array) == 1:
        return array[0]
    lock = multiprocessing.Lock()

    with multiprocessing.Manager() as manager:
        task_queue = manager.Queue()
        [task_queue.put(element) for element in to_sum]
        task_queue_size = len(array)

        while task_queue_size > 1:
            result_queue = manager.Queue()

            processes = [CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue, lock=lock)
                        for i in range(8)]
            [task_queue.put(None) for process in processes]
            [process.start() for process in processes]
            #[process.join() for process in processes]
            task_queue.join()
            task_queue = result_queue
            task_queue_size = task_queue_size // 2 + task_queue_size % 2
        return result_queue.get()

if __name__ == "__main__":
    to_sum = [i for i in range(2000)]

    print(sum(to_sum))
    for i in range(5):
        print(multiprocess_sum(to_sum))

希望这可以帮助您理解代码的含义。如果有其他问题,请随时提出。

英文:

Using a manager solves the problem.

import multiprocessing
"""
Using a manager avoids the problem where the program will hang if the input is too big.
"""
class CustomProcess(multiprocessing.Process):
def __init__(self, name, task_queue, result_queue, lock, chunks=2, *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.name = name
self.task_queue = task_queue
self.result_queue = result_queue
self.chunks = chunks
self.lock = lock
def run(self):
while True:
"""
Using a lock to avoid a race condition where 2 threads both get a number, then
both to get another, but it is None so they both put the number back resulting in a result queue
with bigger size. For example:
Expected result_queue_size = 500, current_queue_size 499, after summation of 1 and 2 we will add 3 to result queue
achieving 500.
With race condition result is 501.
[1, 2, None, None]
Thread 1 gets 1.
Thread 2 gets 2.
Thread 1 gets None and puts 1 in result queue instead of getting 2 and summing.
Thread 2 gets None and puts 2 in result queue instead of just getting the first None and returning.
A lock on both gets removes the race condition.
"""
with self.lock:
if not self.task_queue.empty():
number_1 = self.task_queue.get()
self.task_queue.task_done()
if number_1 is None:
#Poison pill - terminate.
print(f"Terminated {self.name}")
return
else:
#Queue empty - terminate.
return
if not self.task_queue.empty():
number_2 = self.task_queue.get()
self.task_queue.task_done()
if number_2 is None:
#Cannot compute sum of 1 number so just add number_1 to result_queue and terminate since poison pill
#acquired.
self.result_queue.put(number_1)
print(f"Terminated {self.name}")
return
else:
self.result_queue.put(number_1)
#Queue empty, put the 1 number in result queue and terminate.
return
self.result_queue.put(number_1 + number_2)
def multiprocess_sum(array):
if len(array) == 1:
return array[0]
lock = multiprocessing.Lock()
with multiprocessing.Manager() as manager:
task_queue = manager.Queue()
[task_queue.put(element) for element in to_sum]
task_queue_size = len(array)
while task_queue_size > 1:
result_queue = manager.Queue()
processes = [CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue, lock=lock)
for i in range(8)]
[task_queue.put(None) for process in processes]
[process.start() for process in processes]
#[process.join() for process in processes]
task_queue.join()
task_queue = result_queue
task_queue_size = task_queue_size // 2 + task_queue_size % 2
return result_queue.get()
if __name__ == "__main__":
to_sum = [i for i in range(2000)]
print(sum(to_sum))
for i in range(5):
print(multiprocess_sum(to_sum))

I must agree with user Booboo that this specific implementation is not the best (might be the worst, actually) for a performant parallel sum function. If you are looking for a parallel sum function, read his answer. If you are interested in why your program hangs when faced with big Queues, using a Manager should solve your problem. Should you have an implementation of a parallel reduce function (better handling of the poison pill race condition), please do share with the intent of bettering the provided implementation for those looking for that specifically.
Best regards,
Tary

huangapple
  • 本文由 发表于 2023年5月13日 18:40:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76242290.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定