英文:
Caching of arbitrary functions and values in Python
问题
I am working on a cache handler class that given an arbitrary function and a set of arguments caches its result. This will be used in a load-balancing MPI scheduler written in python. Although the program will be run on multiple processes, only a master process will have a working function of this class.
The scheduler allows to submit a task (func, args) and returns an instance of an MPIFuture object that will in the future hold the result. I want to cache these (func, args) to return a saved MPIFuture object instead of recomputing the result.
I have multiple implementations of this class using cachetools and functools, yet for the sake of argument I have implemented a simple example using a dictionary.
from .MPIFutures import MPIFuture, MPIFutureArray
import dill
import hashlib
import numpy as np
class CacheHandler():
def __init__(self, scheduler):
self.sched = scheduler
self.cache = {}
if self.sched.is_master():
print("Caching is enabled!")
# ======= Forward methods =======
def compute(self):
self.sched.compute()
def init(self):
self.sched.init()
def finalize(self):
self.sched.finalize()
# ======= END =======
# ======= Redefine methods =======
def submit_multiple(self, func, *args):
return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])
def _generate_key(self, func, args):
# Serialize the function and arguments using dill
serialized_data = dill.dumps(args)
# Generate a hash of the serialized data
data_hash = hashlib.sha256(serialized_data).hexdigest()
return (func, data_hash)
def submit(self, func, *args):
# Get hash
key = self._generate_key(func, args)
# Check if result has been cached already
if key in self.cache.keys():
return self.cache[key]
# Submit work to the scheduler and cache returned future
future = self.sched.submit(func, *args)
self.cache[key] = future
return future
When testing this implementation with simple MPI benchmarks, it produces the correct results. However, when I implement it into the main MPI application code, it fails to find any cache hits (always misses).
To see what was going on, I modified the class to this:
from .MPIFutures import MPIFuture, MPIFutureArray
import dill
import hashlib
import numpy as np
class CacheHandler():
def __init__(self, scheduler):
self.sched = scheduler
self.cache = {}
self.f = []
self.x = []
self.h = []
if self.sched.is_master():
print("Caching is enabled!")
# ======= Forward methods =======
def compute(self):
self.sched.compute()
def init(self):
self.sched.init()
def finalize(self):
self.sched.finalize()
# ======= END =======
# ======= Redefine methods =======
def submit_multiple(self, func, *args):
return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])
def _generate_key(self, func, args):
# Serialize the function and arguments using dill
serialized_data = dill.dumps(args)
# Generate a hash of the serialized data
data_hash = hashlib.sha256(serialized_data).digest()
return (func, data_hash)
def submit(self, func, *args):
# Get hash
key = self._generate_key(func, args)
if key in self.cache.keys():
print("Found cached results!")
return self.cache[key]
print("Result not found, computing...")
future = self.sched.submit(func, *args)
self.cache[key] = future
print(f"FUNCTION: {func in self.f} ARGS: {args in self.x} HASH: {hash(key) in self.h} ")
self.f.append(func)
self.x.append(args)
self.h.append(hash(key))
return future
Eventually, the program starts outputting the following:
...
Result not found, computing...
FUNCTION: True ARGS: True HASH: False
Result not found, computing...
FUNCTION: True ARGS: True HASH: False
Result not found, computing...
FUNCTION: True ARGS: True HASH: False
...
This indicates that the functions and arguments are indeed the same, yet the hash of a tuple containing the two is not. For the sake of testing I even tried replacing func
with func.__name__
and I got the same result. Most of the time the arguments are one or more numpy arrays. I also tried converting args
to a tuple before serialization but that did not help.
At this point, I am not sure what is happening. I am not able to reproduce the issue in my minimal working examples.
Can any Python expert give me a hint on what is wrong here?
EDIT 1:
Here is an example benchmark I tested:
class T():
def __init__(self):
self.k = 1
def f(x):
print("COMPUTING F")
return x
test = T()
x0 = np.array([1., 2., 3.])
# Submit a first time
sched.submit(test.f, x0)
sched.compute()
# Alter state of the object
test.k = 2
# Submit a second time
sched.submit(test.f, x0)
sched.compute()
This produces the following correct results:
mpirun -np 2 python test.py
Using MPI scheduler.
Caching is enabled!
Result not found, computing...
FUNCTION: False ARGS: False HASH: False
COMPUTING F
Found cached results!
英文:
I am working on a cache handler class that given an arbitrary function and a set of arguments caches its result. This will be used in a load-balancing MPI scheduler written in python. Although the program will be run on multiple process, only a master process will have a working function of this class.
The scheduler allows to submit a task (func, args) and returns an instance of an MPIFuture object that will in the future hold the result. I want to cache these (func, args) to return a saved MPIFuture object instead of recomputing the result.
I have multiple implementations of this class using cachetools and functools, yet for the sake of argument I have implemented a simple example using a dictionary.
from .MPIFutures import MPIFuture, MPIFutureArray
import dill
import hashlib
import numpy as np
class CacheHandler():
def __init__(self, scheduler):
self.sched = scheduler
self.cache = {}
if self.sched.is_master():
print("Caching is enabled!")
# ======= Forward methods =======
def compute(self):
self.sched.compute()
def init(self):
self.sched.init()
def finalize(self):
self.sched.finalize()
# ======= END =======
# ======= Redefine methods =======
def submit_multiple(self, func, *args):
return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])
def _generate_key(self, func, args):
# Serialize the function and arguments using dill
serialized_data = dill.dumps(args)
# Generate a hash of the serialized data
data_hash = hashlib.sha256(serialized_data).hexdigest()
return (func, data_hash)
def submit(self, func, *args):
# Get hash
key = self._generate_key(func, args)
# Check if result has been cached already
if key in self.cache.keys():
return self.cache[key]
# Submit work to the scheduler and cache returned future
future = self.sched.submit(func, *args)
self.cache[key] = future
return future
When testing this implementation with simple MPI benchmarks, it produces the correct results, yet when I implement it into the main MPI application code, it fails to find any cache hits (always misses).
To see what was going on, I modified the class to this:
from .MPIFutures import MPIFuture, MPIFutureArray
import dill
import hashlib
import numpy as np
class CacheHandler():
def __init__(self, scheduler):
self.sched = scheduler
self.cache = {}
self.f = []
self.x = []
self.h = []
if self.sched.is_master():
print("Caching is enabled!")
# ======= Forward methods =======
def compute(self):
self.sched.compute()
def init(self):
self.sched.init()
def finalize(self):
self.sched.finalize()
# ======= END =======
# ======= Redefine methods =======
def submit_multiple(self, func, *args):
return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])
def _generate_key(self, func, args):
# Serialize the function and arguments using dill
serialized_data = dill.dumps(args)
# Generate a hash of the serialized data
data_hash = hashlib.sha256(serialized_data).digest()
return (func, data_hash)
def submit(self, func, *args):
# Get hash
key = self._generate_key(func, args)
if key in self.cache.keys():
print("Found cached results!")
return self.cache[key]
print("Result not found, computing...")
future = self.sched.submit(func, *args)
self.cache[key] = future
print(f"FUNCTION: {fi in self.f} ARGS: {xi in self.x} HASH: {hash(key) in self.h} ")
self.f.append(fi)
self.x.append(xi)
self.h.append(hash(key))
return future
Eventually, the program starts outputting the following:
...
Result not found, computing...
FUNCTION: True ARGS: True HASH: False
Result not found, computing...
FUNCTION: True ARGS: True HASH: False
Result not found, computing...
FUNCTION: True ARGS: True HASH: False
...
This indicates that the functions and arguments are indeed the same, yet the hash of a tuple containing the two is not. For the sake of testing I even tried replacing func
with func.__name__
and I got the same result. Most of the time the arguments are one or more numpy arrays. I also tried converting args
to a tuple before serialisation but that did not help.
At this point I am not sure what is happening. I am not able reproduce the issue in my minimal working examples.
Can any Python expert give me a hint on what is wrong here?
EDIT 1:
Here is an example benchmark I tested:
class T():
def __init__(self):
self.k = 1
def f(x):
print("COMPUTING F")
return x
test = T()
x0 = np.array([1., 2., 3.])
# Submit a first time
sched.submit(test.f, x0)
sched.compute()
# Alter state of the object
test.k = 2
# Submit a second time
sched.submit(test.f, x0)
sched.compute()
This produces the following correct results:
mpirun -np 2 python test.py
Using MPI scheduler.
Caching is enabled!
Result not found, computing...
FUNCTION: False ARGS: False HASH: False
COMPUTING F
Found cached results!
答案1
得分: 0
以下是您提供的代码的翻译部分:
鉴于我知道我要缓存的函数的参数主要是numpy数组,我决定重新设计这个类,以利用arr.tobytes()
函数而不是通过dill进行序列化。
我不确定与旧实现的主要区别是什么,但这个代码可以正常工作。
from .MPIFutures import MPIFuture, MPIFutureArray
import numpy as np
from cachetools import LRUCache, MRUCache, RRCache
import hashlib
from math import floor, log10
class CacheHandler():
def __init__(self, scheduler, tol=1e-6, eviction_strategy="RR", max_cache_size=1000):
self._sched = scheduler
self._dec = self._tol_to_dec(tol)
self._cache = {}
self._maxsize = max_cache_size
self._evict = eviction_strategy
if self._sched.is_master():
print("Caching is enabled!")
print(f"Tolerance set to {self._dec} decimals precision (tol = {tol})")
print(f"Eviction strategy set to {self._evict}")
# ======= 前向方法 =======
def compute(self):
self._sched.compute()
def init(self):
self._sched.init()
def finalize(self):
self._sched.finalize()
# ======= END =======
# ======= 辅助函数 =======
def _tol_to_dec(self, tol):
if tol > 0:
return -int(floor(log10(tol)))
else:
raise ValueError("Tolerance must be a positive number.")
def _hash_args(self, args):
# 序列化参数
serialized_args = [np.round(arg, decimals=self._dec).tobytes() for arg in args]
# 生成序列化数据的哈希
hashes = [hashlib.sha256(arg).hexdigest() for arg in serialized_args]
return "".join(hashes)
def _get_new_cache(self):
if self._evict == "LRU":
return LRUCache(maxsize=self._maxsize)
elif self._evict == "MRU":
return MRUCache(maxsize=self._maxsize)
elif self._evict == "RR":
return RRCache(maxsize=self._maxsize)
else:
raise ValueError(f"Unknown eviction strategy {self._evict}")
# ======= 重新定义方法 =======
def submit_multiple(self, func, *args):
return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])
def submit(self, func, *args):
# 只能缓存具有numpy参数的函数
if any(not isinstance(arg, np.ndarray) for arg in args):
return self._sched.submit(func, *args)
# 函数尚未缓存
if func not in self._cache.keys():
self._cache[func] = self._get_new_cache()
# 为参数生成哈希
hash = self._hash_args(args)
# 使用密钥尝试从缓存中获取结果
future = self._cache[func].get(hash)
# 缓存未命中
if future is None:
future = self._sched.submit(func, *args)
self._cache[func][hash] = future
return future
英文:
Given that I know that the arguments of the functions I have to cache are mainly numpy arrays, I decided to redesign the class to leverage the arr.tobytes()
function instead of serialisation via dill.
I am not sure what the major difference from the old implementation is, but this works correctly.
from .MPIFutures import MPIFuture, MPIFutureArray
import numpy as np
from cachetools import LRUCache, MRUCache, RRCache
import hashlib
from math import floor, log10
class CacheHandler():
def __init__(self, scheduler, tol=1e-6, eviction_strategy = "RR", max_cache_size=1000):
self._sched = scheduler
self._dec = self._tol_to_dec(tol)
self._cache = {}
self._maxsize = max_cache_size
self._evict = eviction_strategy
if self._sched.is_master():
print("Caching is enabled!")
print(f"Tolerance set to {self._dec} decimals precision (tol = {tol})")
print(f"Eviction strategy set to {self._evict}")
# ======= Forward methods =======
def compute(self):
self._sched.compute()
def init(self):
self._sched.init()
def finalize(self):
self._sched.finalize()
# ======= END =======
# ======= Helper functions =======
def _tol_to_dec(self, tol):
if tol > 0:
return -int(floor(log10(tol)))
else:
raise ValueError("Tolerance must be a positive number.")
def _hash_args(self, args):
# Serialize args
serialized_args = [np.round(arg, decimals=self._dec).tobytes() for arg in args]
# Generate a hash of the serialized data
hashes = [hashlib.sha256(arg).hexdigest() for arg in serialized_args]
return "".join(hashes)
def _get_new_cache(self):
if self._evict == "LRU":
return LRUCache(maxsize=self._maxsize)
elif self._evict == "MRU":
return MRUCache(maxsize=self._maxsize)
elif self._evict == "RR":
return RRCache(maxsize=self._maxsize)
else:
raise ValueError(f"Unknown eviction strategy {self._evict}")
# ======= Redefine methods =======
def submit_multiple(self, func, *args):
return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])
def submit(self, func, *args):
# Can only cache functions with numpy arguments
if any(not isinstance(arg, np.ndarray) for arg in args):
return self._sched.submit(func, *args)
# Function not cached yet
if func not in self._cache.keys():
self._cache[func] = self._get_new_cache()
# Generate hash for args
hash = self._hash_args(args)
# Use the key to attempt to get the result from the cache
future = self._cache[func].get(hash)
# Cache miss
if future is None:
future = self._sched.submit(func, *args)
self._cache[func][hash]= future
return future
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论