选择性终止Python多进程中的进程。

huangapple go评论59阅读模式
英文:

Selectively terminate processes in python multiprocessing

问题

以下是您的翻译内容:

我正在使用多进程进行一些密集型的并行计算。我已经在runcode()函数中将它抽象成一个休眠函数。与该函数类似,很难确定任何运行需要多长时间。在启动第一批运行(主运行)后,我等待它们完成。经常我意识到其中一部分已经完成,我正在等待一些较慢的运行。我不想无所作为地等待,而是想委托第二批运行来“打发时间”。但是一旦主运行完成,我将无论完成与否都要终止第二批运行(即pool.terminate()),然后继续执行我的代码的其余部分。我不能盲目地终止整个池,因为我有一个处理读写的监听函数,需要始终处于活动状态。我曾考虑过两个想法。

1 - 将第二批运行添加到同一个池中,并有选择性地终止它们。但我不确定如何有选择性地这样做,也许可以通过某种标记来实现。

2 - 创建一个第二个池,并在那里运行第二批。我部分地展示了下面的代码(这些是带有'###'的行)。我记得以前在某个地方读到过多个池不是一个好主意,但无法再次找到它。

这两种想法都有另一个共同的问题。我可能会盲目地委托许多第二批运行,并希望主运行在次要运行完成之前完成。理想情况下,我只能在第一批主运行尚未完成且仍有可用核心的情况下委托次要运行(下面的'有空闲核心()')。有办法可以实现这一点吗?

import multiprocessing
import multiprocessing.pool
from contextlib import ExitStack
import time
import random

class BoundedQueuePool:
    def __init__(self, limit, semaphore_type):
        self._semaphore = semaphore_type(limit)

    def release(self, result, callback=None):
        self._semaphore.release()
        if callback:
            callback(result)

    def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
        self._semaphore.acquire()
        callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
        error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
        return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)

class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        limit = self._processes + max_waiting_tasks
        BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore)

def listener(q, csv_names):
    '''
    Listens for results from each run and prints it to file.
    '''
    with ExitStack() as stack:
        files = {key: stack.enter_context(open(csv_names[key], "w")) for key in csv_names}
        for key in csv_names:
            files[key].write('Value\n')
            files[key].flush()

        while True:
            m = q.get()
            if m == 'STOP':
                break

            ff = m[0]
            files[ff].write(m[1])
            files[ff].flush()

    return

def runcode(q):
    x = random.random()
    time.sleep(x * 10)
    q.put([1, str(x) + '\n'])

    return

iterlimit = 40
csv_names = {1: "./file1.csv"}
count = 0

manager = multiprocessing.Manager()
q = manager.Queue()
pool = BoundedQueueProcessPool(multiprocessing.cpu_count() + 2)
pool2 = BoundedQueueProcessPool(multiprocessing.cpu_count() + 2) ###

pool.apply_async(listener, (q, csv_names)) # 激活监听器 - 处理读写

while count <= iterlimit: # 让我们将这些运行称为主运行
    count += 1
    pool.apply_async(runcode, (q,))

while len(pool._cache) > 1: # 等待运行完成
    # 新的块,我想要添加
    # if ThereAreFreeCores(): ###
    #     pool2.apply_async(runcode, (q,)) ### 让我们将这些运行称为次要运行
    continue

# 现在所有主运行都完成了,但次要运行可能还没有完成
pool.close()
pool2.terminate() ###
q.put('STOP')
pool.join()

希望这可以帮助您理解代码的内容。如果您有任何其他问题,请随时提出。

英文:

I am running a few intensive parallel computations using multiprocessing. I have abstracted that in runcode() as a sleep function. Similar to the function, it is hard to say how long any of the runs can take. After starting the first batch of runs (master), i wait for them to finish. Often i realize that a subset of it finishes running and I am waiting on a few slower ones. Instead of waiting and doing nothing, I wanted to comission a second set of runs to 'kill time'. But once the master runs finish, i will to kill the second set of runs regardless of completion(i.e. pool.terminate()) and get on with the rest of my code. I cant blindly terminate the entire pool as i have a listener function that handles reading and writing that needs to be active at all times. There were 2 ideas i thought of.

1 - Add the second set of runs on the same pool and selectively kill them. But I wasnt sure how to do so selectively, perhaps through some tag.

2 - Make a 2nd pool and run the second set there and i can blindly terminate that when all the master runs comeplete. This is what i have partially shown below (these are the lines with '###'). I remember reading somewhere a while back that having multiple pools is not a good idea but was unable to find it again.

Both ideas have another common issue. I could blindly comission MANY MANY runs for the second set and hope that the master runs finish before the secondary ones complete. Ideally, i will be able to comission secondary runs only when the first master runs have NOT completed AND there are cores available (nicknamed 'ThereAreFreeCores()' below). Is there a way to do this?

import multiprocessing
import multiprocessing.pool
from contextlib import ExitStack
import time
import random
class BoundedQueuePool:
def __init__(self, limit, semaphore_type):
self._semaphore = semaphore_type(limit)
def release(self, result, callback=None):
self._semaphore.release()
if callback:
callback(result)
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
self._semaphore.acquire()
callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)
class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks &lt; 0:
raise ValueError(f&#39;Invalid negative max_waiting_tasks value: {max_waiting_tasks}&#39;)
limit = self._processes + max_waiting_tasks
BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore)
def listener (q, csv_names):
&#39;&#39;&#39;
Listens for results from each run and prints it to file.
&#39;&#39;&#39;
with ExitStack() as stack:
files = {key: stack.enter_context(open(csv_names[key], &quot;w&quot;)) for key in csv_names}
for key in csv_names:
files[key].write(&#39;Value\n&#39;)
files[key].flush()
while True:
m = q.get()
if m == &#39;STOP&#39;:
break
ff = m[0]
files[ff].write(m[1])
files[ff].flush()
return
def runcode(q):
x = random.random()
time.sleep(x*10)
q.put([1, str(x)+&#39;\n&#39;])
return
iterlimit = 40
csv_names = {1:&quot;./file1.csv&quot;}
count = 0
manager = multiprocessing.Manager()
q = manager.Queue()    
pool = BoundedQueueProcessPool(multiprocessing.cpu_count() + 2)
pool2 = BoundedQueueProcessPool(multiprocessing.cpu_count() + 2) ###
pool.apply_async(listener, (q,csv_names)) # Activating Listener - Handles read and write
while count &lt;= iterlimit: # Lets call these runs as master runs
count += 1
pool.apply_async(runcode, (q,)) 
while len(pool._cache) &gt; 1: # Waiting for runs to finish
# New chunk I want to add
# if ThereAreFreeCores(): ###
#     pool2.apply_async(runcode, (q,)) ### Lets call these runs secondary runs
continue
# Now ALL master runs are finished but secondary runs MIGHT not be done
pool.close()
pool2.terminate() ###
q.put(&#39;STOP&#39;)
pool.join()

答案1

得分: 1

这是你提供的代码的翻译部分:

我有一个潜在的解决方案,它使用一个单一的多进程池,但不是你正在使用的BoundedQueueProcessPool类。首先,我认为没有理由使用它,因为你提交的主任务数量似乎不会对内存造成太大压力,这是这种池的主要用途。如果你真的需要,随时可以重新使用BoundedQueueProcessPool。但在提交了第一个iterlimit主任务之后,我只在有空闲进程时才提交额外的任务。因此,队列会保持有限。

请阅读代码中的注释,我在那里提供了很多解释。 基本上,我知道正在提交多少个主任务,每个主任务完成后,我都知道剩下多少个主任务需要完成。我还知道池的大小,因此很容易判断是否有可用的池进程来提交额外的任务。

因为我现在使用单一池,我希望在最后一个主任务完成时立即终止它,所以监听器不能在池中运行。但请注意,我的池终止条件是所有原始的主任务都已完成。在提交的任务完成之前,提交的非主任务可能会终止。因此,你的CSV文件可能会有超过iterlimit行(不包括第一行标题)。但我认为这就是你想要的。

import multiprocessing
from contextlib import ExitStack
import time
import random

def listener(q, csv_names):
    '''
    监听每次运行的结果并将其打印到文件中。
    '''
    with ExitStack() as stack:
        # 使用dict.items方法
        files = {key: stack.enter_context(open(path, "w")) for key, path in csv_names.items()}
        for key in csv_names:
            files[key].write('Value\n')
            files[key].flush()

        while True:
            m = q.get()
            if m == 'STOP':
                break

            ff = m[0]
            files[ff].write(m[1])
            files[ff].flush()

def init_pool_processes(the_queue):
    global q

    q = the_queue

def runcode():
    x = random.random()
    time.sleep(x * 10)
    q.put([1, str(x) + '\n'])

iterlimit = 40
csv_names = {1: "./file1.csv"}

# 使用多进程队列
q = multiprocessing.Queue()
# 使用常规的multiprocessing.pool.Pool实例,因为:
# (1) 提交的任务数量不高
# (2) 我们不希望回调函数阻塞。
# 非主任务将仅在已提交的任务完成后提交,从而将队列大小限制为iterlimit。
# 可以提交的非主任务的最大数量由池的大小确定。
# 例如,如果池的大小为N,那么当未完成的主任务数量为N-1时,将提交第一个非主任务,
# 这意味着有一个空闲的池进程准备执行该任务。然后只有在任务完成(主任务或非主任务)后,
# 才会提交下一个非主任务。当最后一个主任务完成时,仍在运行的任何非主任务都将被终止。
pool_size = multiprocessing.cpu_count()  # 为什么要+2?
pool = multiprocessing.Pool(pool_size, initializer=init_pool_processes, initargs=(q,))

# 使用单独的进程,这将更清晰为什么:
listener_process = multiprocessing.Process(target=listener, args=(q, csv_names))
listener_process.start()

# 通过使用回调,不需要循环:
# 主任务的回调:
def master_callback(result):
    global master_tasks_left_to_run

    master_tasks_left_to_run -= 1
    if master_tasks_left_to_run == 0:
        # 所有主任务已完成,因此终止池
        # (这就是为什么监听器是一个单独的进程):
        pool.terminate()
        # 告诉监听器在处理当前队列中的所有结果后结束:
        q.put('STOP')
    elif master_tasks_left_to_run < pool_size:
        # 现在我们可以提交非主任务:
        non_master_callback(result)

# 非主任务的回调:
def non_master_callback(result):
    # 只需提交另一个非主任务:
    pool.apply_async(runcode, callback=non_master_callback, error_callback=non_master_callback)

master_tasks_left_to_run = iterlimit
for _ in range(iterlimit):
    pool.apply_async(runcode, callback=master_callback, error_callback=master_callback)

# 现在等待所有文件写入完成:
listener_process.join()

希望这有助于你理解代码的工作原理。

英文:

I have a potential solution that uses a single multiprocessing pool, but it is the standard one and not the BoundedQueueProcessPool class you are using. First, I saw no reason to be using this since the number of master tasks you are submitting does not seem to be anything that is going to stress memory, which is the main purpose for such a pool. You can always reinstate the BoundedQueueProcessPool if you feel you really need it. But after the first iterlimit master tasks have been submitted, I only submit additional tasks when there is a free process. So the queue remains bounded as a result.

Please read the comments in the code as I have put much of the explanation there. Basically, I know how many master tasks are being submitted and as each one completes I know how many master tasks left to complete are. I also know the pool size and so it is easy to deduce whether there is a free pool process or not available for submitting additional tasks.

Because I am now using a single pool, which I want to terminate as soon as the last master task has completed, I need for the listener not to be running in the pool. But note that my pool-termination condition is when the all the original master tasks have completed. Non-master tasks submitted could terminate before a submitted task completes. So your CSV file can have more than iterlimit rows (not counting the first, header row). But I believe that is what you want.

import multiprocessing
from contextlib import ExitStack
import time
import random

def listener (q, csv_names):
    &#39;&#39;&#39;
    Listens for results from each run and prints it to file.
    &#39;&#39;&#39;
    with ExitStack() as stack:
        # Use dict.items method
        files = {key: stack.enter_context(open(path, &quot;w&quot;)) for key, path in csv_names.items()}
        for key in csv_names:
            files[key].write(&#39;Value\n&#39;)
            files[key].flush()

        while True:
            m = q.get()
            if m == &#39;STOP&#39;:
                break

            ff = m[0]
            files[ff].write(m[1])
            files[ff].flush()

def init_pool_processes(the_queue):
    global q

    q = the_queue

def runcode():
    x = random.random()
    time.sleep(x*10)
    q.put([1, str(x)+&#39;\n&#39;])

iterlimit = 40
csv_names = {1: &quot;./file1.csv&quot;}

# Use a multiprocessing queue
q = multiprocessing.Queue()
# Use a regular multiprocessing.pool.Pool instance since:
# (1) number of tasks submitted is not high and
# (2) We don&#39;t want our callback function to block.
# Non-master tasks will only be submitted as submitted tasks complete,
# so that will keep the queue size bounded to iterlimit.
# The maximum number of non-master tasks that can be submitted is
# determined by the pool size.
# For example, if the pool size were N, then the first non-master task will
# be submitted when the number of uncompleted master tasks is N-1,
# implying that there is a free pool process ready to execute that
# task. Then only when a task completes (master of non-master) will
# the next non-master task be submitted. When the last master task
# completes, any non-master tasks still running will be killed.
pool_size = multiprocessing.cpu_count() # Why + 2?
pool = multiprocessing.Pool(pool_size, initializer=init_pool_processes, initargs=(q,))

# Use a separate process, which will become clearer why:
listener_process = multiprocessing.Process(target=listener, args=(q, csv_names))
listener_process.start()

# By using a callback there is no looping required:
# Callback for master tasks:
def master_callback(result):
    global master_tasks_left_to_run

    master_tasks_left_to_run -= 1
    if master_tasks_left_to_run == 0:
        # All the master tasks have completed, so kill the pool
        # (this is why the listener is a separate process):
        pool.terminate()
        # And tell the listener to end up when it has processed all the results
        # that are currently in the queue:
        q.put(&#39;STOP&#39;)
    elif master_tasks_left_to_run &lt; pool_size:
        # Now we can submit non master tasks:
        non_master_callback(result)

# Callback for non-master-tasks:
def non_master_callback(result):
    # Just submit another non-master task:
    pool.apply_async(runcode, callback=non_master_callback, error_callback=non_master_callback)

master_tasks_left_to_run = iterlimit
for _ in range(iterlimit):
    pool.apply_async(runcode, callback=master_callback, error_callback=master_callback)

# Now wait for all file writing to complete:
listener_process.join()

huangapple
  • 本文由 发表于 2023年6月13日 00:44:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/76458707.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定