英文:
How to create a limited size cache shared by multiple processes in Python
问题
我正在尝试使用multiprocessing.Manager
的dict
,使多个进程共享缓存。以下示例提供了一些背景信息(改编自此答案):
import multiprocessing as mp
import time
def foo_pool(x, cache):
if x not in cache:
time.sleep(2)
cache[x] = x*x
else:
print('using cache for', x)
return cache[x]
result_list = []
def log_result(result):
result_list.append(result)
def apply_async_with_callback():
manager = mp.Manager()
cache = manager.dict()
pool = mp.Pool()
jobs = list(range(10)) + list(range(10))
for i in jobs:
pool.apply_async(foo_pool, args = (i, cache), callback = log_result)
pool.close()
pool.join()
print(result_list)
if __name__ == '__main__':
apply_async_with_callback()
运行上述代码会得到类似以下结果:
using cache for 0
using cache for 2
using cache for 4
using cache for 1
using cache for 3
using cache for 5
using cache for 7
using cache for 6
[25, 16, 4, 1, 9, 0, 36, 49, 0, 4, 16, 1, 9, 25, 49, 36, 64, 81, 81, 64]
所以缓存正常工作。
我想要实现的是为这个manager.dict()
设置一个大小限制,类似于functools.lru_cache
的maxsize
参数。我目前的尝试如下:
class LimitedSizeDict:
def __init__(self, max_size):
self.max_size = max_size
self.manager = mp.Manager()
self.dict = self.manager.dict()
self.keys = self.manager.list()
def __getitem__(self, key):
return self.dict[key]
def __setitem__(self, key, value):
if len(self.keys) >= self.max_size:
oldest_key = self.keys.pop(0)
del self.dict[oldest_key]
self.keys.append(key)
self.dict[key] = value
def __contains__(self, key):
return key in self.dict
def __len__(self):
return len(self.dict)
def __iter__(self):
for key in self.keys:
yield key
然后使用以下代码启动进程:
def apply_async_with_callback():
cache = LimitedSizeDict(3)
pool = mp.Pool()
jobs = list(range(10)) + list(range(10))
for i in jobs:
pool.apply_async(foo_pool, args = (i, cache), callback = log_result)
pool.close()
pool.join()
print(result_list)
但这给我一个空列表:[]
。
我认为我可能需要子类化multiprocessing.managers.DictProxy
类来实现这一点,所以我查看了源代码。但似乎没有DictProxy
类的类定义。
如何为这个共享的字典缓存设置大小限制?提前感谢您的帮助。
英文:
I'm trying to use a cache shared by multiple processes, using multiprocessing.Manager
's dict
. The following demo gives some context (adopted from this answer):
import multiprocessing as mp
import time
def foo_pool(x, cache):
if x not in cache:
time.sleep(2)
cache[x] = x*x
else:
print('using cache for', x)
return cache[x]
result_list = []
def log_result(result):
result_list.append(result)
def apply_async_with_callback():
manager = mp.Manager()
cache = manager.dict()
pool = mp.Pool()
jobs = list(range(10)) + list(range(10))
for i in jobs:
pool.apply_async(foo_pool, args = (i, cache), callback = log_result)
pool.close()
pool.join()
print(result_list)
if __name__ == '__main__':
apply_async_with_callback()
Running the above code gives something like this:
using cache for 0
using cache for 2
using cache for 4
using cache for 1
using cache for 3
using cache for 5
using cache for 7
using cache for 6
[25, 16, 4, 1, 9, 0, 36, 49, 0, 4, 16, 1, 9, 25, 49, 36, 64, 81, 81, 64]
So the cache is working as expected.
What I'd like to achieve is to give a size limit to this manager.dict()
, like the maxsize
argument for the functools.lru_cache
. My current attempt is:
class LimitedSizeDict:
def __init__(self, max_size):
self.max_size = max_size
self.manager = mp.Manager()
self.dict = self.manager.dict()
self.keys = self.manager.list()
def __getitem__(self, key):
return self.dict[key]
def __setitem__(self, key, value):
if len(self.keys) >= self.max_size:
oldest_key = self.keys.pop(0)
del self.dict[oldest_key]
self.keys.append(key)
self.dict[key] = value
def __contains__(self, key):
return key in self.dict
def __len__(self):
return len(self.dict)
def __iter__(self):
for key in self.keys:
yield key
Then use the following to launch the processes:
def apply_async_with_callback():
cache = LimitedSizeDict(3)
pool = mp.Pool()
jobs = list(range(10)) + list(range(10))
for i in jobs:
pool.apply_async(foo_pool, args = (i, cache), callback = log_result)
pool.close()
pool.join()
print(result_list)
But this gives me an empty list: []
.
I thought I probably have to subclass the multiprocessing.managers.DictProxy
class to achieve this, so I looked into the source code. But there doesn't seem to be class definition of DictProxy
.
How to give a size limit to this shared dict cache? Thanks in advance.
答案1
得分: 1
首先,我将定义LimitedSizeDict
,以便它不与多进程耦合,而是可以作为一个独立的类。因此,它不应该有任何与"manager"或"managed objects"相关的引用。其次,我将为这个类定义一个迭代器类,因为你当前的实现是基于生成器的,而生成器无法在多个进程之间进行序列化。第三,有一种方法可以生成任何任意类的代理,就像下面的代码所示:
from multiprocessing import Process
from multiprocessing.managers import NamespaceProxy, BaseManager
import inspect
from collections import deque
from threading import Lock
class LimitedSizeDict():
class Iter:
def __init__(self, cache):
self._cache = cache
self._index = 0
def __next__(self):
if self._index >= len(self._cache):
raise StopIteration
key = self._cache._get_key(self._index)
self._index += 1
return key
def __init__(self, max_size):
self._max_size = max_size
self._d = {}
self._keys = deque(maxlen=max_size)
# 当不用于多进程时:
self._proxy = self
self._lock = Lock()
def __len__(self):
return len(self._keys)
def __getitem__(self, key):
return self._d[key]
def __setitem__(self, key, value):
with self._lock:
# 键可能已经存在:
if key not in self._d:
if len(self._keys) == self._max_size:
oldest_key = self._keys[0]
del self._d[oldest_key]
# 这将自动删除self.keys[0]
self._keys.append(key)
self._d[key] = value
# 迭代器所需的方法:
def _get_key(self, index):
return self._keys[index]
def __iter__(self):
return LimitedSizeDict.Iter(self._proxy)
# 用于多进程时
def _set_proxy(self, proxy):
self._proxy = proxy
def worker(cache):
cache['a'] = 1
cache['b'] = 2
cache['c'] = 3
cache['d'] = 4
for key in cache:
print(key, cache[key])
class ObjProxy(NamespaceProxy):
"""为任何用户定义的数据类型返回代理实例。代理实例将具有数据类型的命名空间和函数(除了私有/受保护的可调用/属性)。此外,代理将是可序列化的,其状态可以在不同进程之间共享。"""
@classmethod
def populate_obj_attributes(cls, real_cls):
DISALLOWED = set(dir(cls))
DISALLOWED.add('__class__')
ALLOWED = ['__sizeof__', '__eq__', '__ne__', '__le__', '__repr__', '__dict__', '__lt__', '__gt']
new_dict = {}
for (attr, value) in inspect.getmembers(real_cls, callable):
if attr not in DISALLOWED or attr in ALLOWED:
new_dict[attr] = cls.proxy_wrap(attr)
return new_dict
@staticmethod
def proxy_wrap(attr):
"""该方法创建一个调用被代理对象方法的函数。"""
def f(self, *args, **kwargs):
# _callmethod 是multiprocessing提供的用于调用代理对象中的方法的方法
return self._callmethod(attr, args, kwargs)
return f
# 在运行时创建一个类
LimitedSizeDictProxy = type("LimitedSizeDictProxy", (ObjProxy,), ObjProxy.populate_obj_attributes(LimitedSizeDict))
if __name__ == '__main__':
BaseManager.register('LimitedSizeDict', LimitedSizeDict, LimitedSizeDictProxy, exposed=tuple(dir(LimitedSizeDictProxy)))
with BaseManager() as manager:
cache = manager.LimitedSizeDict(3)
# 将代理存储在实际对象中:
cache._set_proxy(cache)
p = Process(target=worker, args=(cache,))
p.start()
p.join()
输出:
b 2
c 3
d 4
(请注意,这是你提供的代码的翻译,已去掉代码部分。)
英文:
First of all, I would define LimitedSizeDict
so that it is not coupled to multiprocessing but rather could be a standalone class. Therefore it should not have any references to a "manager" or "managed objects". Second, I would define an iterator class for the class since your current implementation is based on a generator and generators cannot be pickled across processes. Third, there is a way of generating a proxy for just about any arbitrary class as in the following code:
from multiprocessing import Process
from multiprocessing.managers import NamespaceProxy, BaseManager
import inspect
from collections import deque
from threading import Lock
class LimitedSizeDict():
class Iter:
def __init__(self, cache):
self._cache = cache
self._index = 0
def __next__(self):
if self._index >= len(self._cache):
raise StopIteration
key = self._cache._get_key(self._index)
self._index += 1
return key
def __init__(self, max_size):
self._max_size = max_size
self._d = {}
self._keys = deque(maxlen=max_size)
# When not being used with multiprocessing:
self._proxy = self
self._lock = Lock()
def __len__(self):
return len(self._keys)
def __getitem__(self, key):
return self._d[key]
def __setitem__(self, key, value):
with self._lock:
# key may already exist:
if key not in self._d:
if len(self._keys) == self._max_size:
oldest_key = self._keys[0]
del self._d[oldest_key]
# This automatically will automatically remove self.keys[0]
self._keys.append(key)
self._d[key] = value
# Required by iterator:
def _get_key(self, index):
return self._keys[index]
def __iter__(self):
return LimitedSizeDict.Iter(self._proxy)
# When used in multiprocessing
def _set_proxy(self, proxy):
self._proxy = proxy
def worker(cache):
cache['a'] = 1
cache['b'] = 2
cache['c'] = 3
cache['d'] = 4
for key in cache:
print(key, cache[key])
class ObjProxy(NamespaceProxy):
"""Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
pickable and can its state can be shared among different processes. """
@classmethod
def populate_obj_attributes(cls, real_cls):
DISALLOWED = set(dir(cls))
DISALLOWED.add('__class__')
ALLOWED = ['__sizeof__', '__eq__', '__ne__', '__le__', '__repr__', '__dict__', '__lt__',
'__gt__']
new_dict = {}
for (attr, value) in inspect.getmembers(real_cls, callable):
if attr not in DISALLOWED or attr in ALLOWED:
new_dict[attr] = cls.proxy_wrap(attr)
return new_dict
@staticmethod
def proxy_wrap(attr):
""" This method creates function that calls the proxified object's method."""
def f(self, *args, **kwargs):
# _callmethod is the method that proxies provided by multiprocessing use to call methods in the proxified object
return self._callmethod(attr, args, kwargs)
return f
# Create a class during runtime
LimitedSizeDictProxy = type("LimitedSizeDictProxy", (ObjProxy,), ObjProxy.populate_obj_attributes(LimitedSizeDict))
if __name__ == '__main__':
BaseManager.register('LimitedSizeDict', LimitedSizeDict, LimitedSizeDictProxy, exposed=tuple(dir(LimitedSizeDictProxy)))
with BaseManager() as manager:
cache = manager.LimitedSizeDict(3)
# Store the proxy in the actual object:
cache._set_proxy(cache)
p = Process(target=worker, args=(cache,))
p.start()
p.join()
Prints:
b 2
c 3
d 4
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论