你如何安全地在使用ProcessPoolExecutor的Python进程之间访问变量?

huangapple go评论98阅读模式
英文:

How can I safely access a variable between Python processes which use ProcessPoolExecutor?

问题

我有一个使用多进程执行许多与IO相关的任务的脚本。我想以安全的方式在进程之间访问一个变量。是否有一种简单的方法可以做到这一点,而不涉及低级别的逻辑,比如操纵锁?

import concurrent.futures
import time
import random

def do_book_task(books, year):
    books.append(year)

    print(f'执行与{year}年的书有关的任务。')
    time.sleep(random.random() + 0.5)

    books.remove(year)

    return f'{year}的结果'

def main():
    years = ['1996', '1997', '1998', '1999', '2000', '2001']

    books = [] # 我希望这个变量是进程安全的

    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = []
        for year in years:
            futures.append(executor.submit(do_book_task, books, year))
            print(f'将{year}提交到进程队列')
        
        for future in concurrent.futures.as_completed(futures):
            try:
                year = years[futures.index(future)]
                print(f'{year}完成')
                print(future.result())
            except Exception as e:
                print(f'年份{year}发生错误:')
                print(e)

请注意,我已经将代码中的字符串格式修改为中文,并进行了相应的翻译。

英文:

I have a script that uses multiprocessing to execute many io-bound tasks. I want to access a variable between processes in a safe manner. Is there a simple way to do this does not involve low level logic like manipulating locks?

import concurrent.futures
import time
import random

def do_book_task(books, year):
    books.append(year)

    print(f'Doing task with books from {year}.')
    time.sleep(random.random() + 0.5)

    books.remove(year)

    return f'Result for {year}'

def main():
    years = ['1996', '1997', '1998', '1999', '2000', '2001']

    books = [] # I want this variable to be process-safe

    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = []
        for year in years:
            futures.append(executor.submit(do_book_task, books, year))
            print(f'Submitted {year} to process queue')
        
        for future in concurrent.futures.as_completed(futures):
            try:
                year = years[futures.index(future)]
                print(f'Done {year}')
                print(future.result())
            except Exception as e:
                print(f'Error with year: {year}')
                print(e)

答案1

得分: 0

是的,你可以使用multiprocessing.managers中的Manager类。请查看https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Manager。

from multiprocessing import Manager

def main():
    years = ['1996', '1997', '1998', '1999', '2000', '2001']

    with Manager() as manager:
        books = manager.list() # 创建一个进程安全的代理对象
        # 其余代码保持不变

        with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
            futures = []
            for year in years:
                futures.append(executor.submit(do_book_task, books, year))
                print(f'Submitted {year} to process queue')
            
            for future in concurrent.futures.as_completed(futures):
                try:
                    year = years[futures.index(future)]
                    print(f'Done {year}')
                    print(future.result())
                except Exception as e:
                    print(f'Error with year: {year}')
                    print(e)
英文:

Yes, you can use the Manager class from multiprocessing.managers. See https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Manager.

from multiprocessing import Manager

def main():
    years = ['1996', '1997', '1998', '1999', '2000', '2001']

    with Manager() as manager:
        books = manager.list() # creates a proxy object which is process-safe
        # the rest of the code is the same

        with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
            futures = []
            for year in years:
                futures.append(executor.submit(do_book_task, books, year))
                print(f'Submitted {year} to process queue')
            
            for future in concurrent.futures.as_completed(futures):
                try:
                    year = years[futures.index(future)]
                    print(f'Done {year}')
                    print(future.result())
                except Exception as e:
                    print(f'Error with year: {year}')
                    print(e)

huangapple
  • 本文由 发表于 2023年8月5日 04:06:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/76838861.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定