multiprocessing: 两个Python Shell之间可以共享一个字典吗?

huangapple go评论66阅读模式
英文:

multiprocessing: Can a dict be shared between two Python shell?

问题

我来自这篇帖子multiprocessing:如何在多个进程之间共享字典?
但我想要稍微不同的东西。在那篇帖子中,一个字典在父进程和由构造函数Process实例化的子进程之间共享。我想要的是在两个Python shell之间共享一个字典。

英文:

I come from this post multiprocessing: How do I share a dict among multiple processes?
but I want something slightly different. In that post, a dict is shared between a parent process and its child which is instantiated by the constructor Process. What I want is to share a dict between two Python shell.

答案1

得分: 1

你想要做的是使用managed dictionary。问题在于,如果多个进程使用以下代码,它们将获得自己的可共享字典实例:

from multiprocessing import Manager

with Manager() as manager:
    sharable_dict = manager.dict()
    ... # 等等。

相反,我们创建了一个新的sharable_dict类型的托管字典,它始终返回到相同的单例托管字典的代理。为使此工作正常,所有进程都必须连接到一个共同的SyncManager服务器:

文件 test2.py(您的实际处理代码)

from multiprocessing.managers import BaseManager
from multiprocessing import current_process

address = "127.0.0.1"
port = 50000
password = "secret"

def connect_to_manager():
    BaseManager.register('sharable_dict')
    manager = BaseManager(address=(address, port), authkey=password.encode('utf-8'))
    manager.connect()
    return manager.sharable_dict()

if __name__ == '__main__':
    sharable_dict = connect_to_manager()
    pid = current_process().pid
    print('My pic =', pid)
    sharable_dict[pid] = True

上述代码获取一个共享的通用字典,仅出于演示目的添加了一个当前进程ID的键。

文件 test.py

此文件创建了一个托管的可共享字典,可以提供给任何想要使用它的进程,然后使用 subprocess.Popen 运行多个外部进程(即 test2.py)。最后,该代码打印出可共享字典,以显示所有3个外部进程:

from multiprocessing.managers import BaseManager, DictProxy
from threading import Thread, Event
from test2 import address, port, password, connect_to_manager
from subprocess import Popen

the_dict = None

def get_dict():
    global the_dict

    if the_dict is None:
        the_dict = {}
    return the_dict

def server(started_event, shutdown_event):
    net_manager = BaseManager(address=(address, port), authkey=password.encode('utf-8'))
    BaseManager.register('sharable_dict', get_dict, DictProxy)
    net_manager.start()
    started_event.set() # 告诉主线程我们已经开始了
    shutdown_event.wait() # 等待被告知关闭
    net_manager.shutdown()

def main():
    started_event = Event()
    shutdown_event = Event()
    server_thread = Thread(target=server, args=(started_event, shutdown_event,))
    server_thread.start()
    # 等待管理器启动:
    started_event.wait()

    processes = [Popen(['python', 'test2.py']) for _ in range(3)]
    for process in processes:
        process.communicate()

    sharable_dict = connect_to_manager()
    print('sharable dictionary =', sharable_dict)

    # 告诉管理器我们已经完成了:
    shutdown_event.set()
    server_thread.join()

if __name__ == '__main__':
    main()

打印:

My pic = 18736
My pic = 12476
My pic = 10584
sharable dictionary = {18736: True, 12476: True, 10584: True}

更新

这个可共享的字典当然只适用于执行 Python 脚本的进程。假设您能够修改这些脚本以连接到服务器以获取可共享的字典,那么也许您可以将需要执行的代码放在一个"worker"函数中,例如 worker,并使用 multiprocessing.Process 实例创建进程。这将导致更简单的代码。

您的 worker 函数:

文件 test2.py

from multiprocessing import current_process

def worker(sharable_dict):
    pid = current_process().pid
    print('My pid =', pid)
    sharable_dict[pid] = True

以及创建可共享字典并创建使用它的子进程的代码:

文件 test.py

from multiprocessing import Manager, Process
from test2 import worker

def main():
    with Manager() as manager:
        sharable_dict = manager.dict()
        processes = [Process(target=worker, args=(sharable_dict,)) for _ in range(3)]
        for process in processes:
            process.start()
        for process in processes:
            process.join()

        print('sharable dictionary =', sharable_dict)

if __name__ == '__main__':
    main()
英文:

What you would like to do is use a managed dictionary. The problem is that multiple processes will acquire their own instance of of a sharable dictionary if they use code such as:

from multiprocessing import Manager

with Manager() as manager:
    sharable_dict = manager.dict()
    ... # etc.

Instead we crate a new managed dictionary of type sharable_dict that always returns a proxy to the same singleton managed dictionary. For this to work, all processes must connect to a common SyncManager server:

File test2.py (your actual processing code)

from multiprocessing.managers import BaseManager
from multiprocessing import current_process

address = "127.0.0.1"
port = 50000
password = "secret"

def connect_to_manager():
    BaseManager.register('sharable_dict')
    manager = BaseManager(address=(address, port), authkey=password.encode('utf-8'))
    manager.connect()
    return manager.sharable_dict()

if __name__ == '__main__':
    sharable_dict = connect_to_manager()
    pid = current_process().pid
    print('My pic =', pid)
    sharable_dict[pid] = True

The above code gets a common, sharable dictuonary to which, for demo purposes, just adds a key that is the current process id.

File test.py

This file creates the managed, sharable dictionary that can be served up to any process wanting to use it and then using subprocess.Popen runs multiple external processes (i.e. test2.py). Finally, this code prints out the sharable dictionary to show that all 3 external processes:

from multiprocessing.managers import BaseManager, DictProxy
from threading import Thread, Event
from test2 import address, port, password, connect_to_manager
from subprocess import Popen

the_dict = None

def get_dict():
    global the_dict

    if the_dict is None:
        the_dict = {}
    return the_dict

def server(started_event, shutdown_event):
    net_manager = BaseManager(address=(address, port), authkey=password.encode('utf-8'))
    BaseManager.register('sharable_dict', get_dict, DictProxy)
    net_manager.start()
    started_event.set() # tell main thread that we have started
    shutdown_event.wait() # wait to be told to shutdown
    net_manager.shutdown()

def main():
    started_event = Event()
    shutdown_event = Event()
    server_thread = Thread(target=server, args=(started_event, shutdown_event,))
    server_thread.start()
    # wait for manager to start:
    started_event.wait()

    processes = [Popen(['python', 'test2.py']) for _ in range(3)]
    for process in processes:
        process.communicate()

    sharable_dict = connect_to_manager()
    print('sharable dictionary =', sharable_dict)

    # tell manager we are through:
    shutdown_event.set()
    server_thread.join()

if __name__ == '__main__':
    main()

Prints:

My pic = 18736
My pic = 12476
My pic = 10584
sharable dictionary = {18736: True, 12476: True, 10584: True}

Update

This sharable dictionary, of course, only works with processes executing Python scripts. Assuming you are able to modify those scripts to connect to the server to get the sharable dictionary, then perhaps you can just place the code that needs to be executed in a "worker" function, e.g. worker, and create processes using multiprocessing.Process instances. This would result in far simpler code.

Your worker function:

File test2.py

from multiprocessing import current_process

def worker(sharable_dict):
    pid = current_process().pid
    print('My pid =', pid)
    sharable_dict[pid] = True

And the code to create a sharable dictionary and create child processes that use it:

File test.py

from multiprocessing import Manager, Process
from test2 import worker

def main():
    with Manager() as manager:
        sharable_dict = manager.dict()
        processes = [Process(target=worker, args=(sharable_dict,)) for _ in range(3)]
        for process in processes:
            process.start()
        for process in processes:
            process.join()

        print('sharable dictionary =', sharable_dict)

if __name__ == '__main__':
    main()

huangapple
  • 本文由 发表于 2023年7月11日 02:19:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/76656339.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定