在Python中缓存任意函数和值

huangapple go评论64阅读模式
英文:

Caching of arbitrary functions and values in Python

问题

I am working on a cache handler class that given an arbitrary function and a set of arguments caches its result. This will be used in a load-balancing MPI scheduler written in python. Although the program will be run on multiple processes, only a master process will have a working function of this class.

The scheduler allows to submit a task (func, args) and returns an instance of an MPIFuture object that will in the future hold the result. I want to cache these (func, args) to return a saved MPIFuture object instead of recomputing the result.

I have multiple implementations of this class using cachetools and functools, yet for the sake of argument I have implemented a simple example using a dictionary.

from .MPIFutures import MPIFuture, MPIFutureArray
import dill
import hashlib
import numpy as np

class CacheHandler():
    def __init__(self, scheduler):
        self.sched = scheduler
        self.cache = {}

        if self.sched.is_master():
            print("Caching is enabled!")
    
    # ======= Forward methods =======

    def compute(self):
        self.sched.compute()

    def init(self):
        self.sched.init()
    
    def finalize(self):
        self.sched.finalize()

    # ======= END =======

    # ======= Redefine methods =======  

    def submit_multiple(self, func, *args):
        return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])

    def _generate_key(self, func, args):
        # Serialize the function and arguments using dill
        serialized_data = dill.dumps(args)

        # Generate a hash of the serialized data
        data_hash = hashlib.sha256(serialized_data).hexdigest()

        return (func, data_hash)

    def submit(self, func, *args):
        # Get hash
        key = self._generate_key(func, args)

        # Check if result has been cached already
        if key in self.cache.keys():
            return self.cache[key]
            
        # Submit work to the scheduler and cache returned future
        future = self.sched.submit(func, *args)
        self.cache[key] = future
        
        return future

When testing this implementation with simple MPI benchmarks, it produces the correct results. However, when I implement it into the main MPI application code, it fails to find any cache hits (always misses).

To see what was going on, I modified the class to this:

from .MPIFutures import MPIFuture, MPIFutureArray
import dill
import hashlib
import numpy as np

class CacheHandler():
    def __init__(self, scheduler):
        self.sched = scheduler
        self.cache = {}
        self.f = []
        self.x = []
        self.h = []
        if self.sched.is_master():
            print("Caching is enabled!")
    
    # ======= Forward methods =======

    def compute(self):
        self.sched.compute()

    def init(self):
        self.sched.init()
    
    def finalize(self):
        self.sched.finalize()

    # ======= END =======

    # ======= Redefine methods =======  

    def submit_multiple(self, func, *args):
        return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])

    def _generate_key(self, func, args):
        # Serialize the function and arguments using dill
        serialized_data = dill.dumps(args)

        # Generate a hash of the serialized data
        data_hash = hashlib.sha256(serialized_data).digest()

        return (func, data_hash)

    def submit(self, func, *args):
        # Get hash
        key = self._generate_key(func, args)

        if key in self.cache.keys():
            print("Found cached results!")
            return self.cache[key]
            
        print("Result not found, computing...")
        future = self.sched.submit(func, *args)
        self.cache[key] = future
        
        print(f"FUNCTION: {func in self.f} ARGS: {args in self.x} HASH: {hash(key) in self.h} ")
        self.f.append(func)
        self.x.append(args)
        self.h.append(hash(key))

        return future

Eventually, the program starts outputting the following:

...
Result not found, computing...
FUNCTION: True ARGS: True HASH: False 
Result not found, computing...
FUNCTION: True ARGS: True HASH: False 
Result not found, computing...
FUNCTION: True ARGS: True HASH: False 
...

This indicates that the functions and arguments are indeed the same, yet the hash of a tuple containing the two is not. For the sake of testing I even tried replacing func with func.__name__ and I got the same result. Most of the time the arguments are one or more numpy arrays. I also tried converting args to a tuple before serialization but that did not help.

At this point, I am not sure what is happening. I am not able to reproduce the issue in my minimal working examples.

Can any Python expert give me a hint on what is wrong here?

EDIT 1:

Here is an example benchmark I tested:

class T():
    def __init__(self):
        self.k = 1
    def f(x):
        print("COMPUTING F")
        return x

test = T()

x0 = np.array([1., 2., 3.])

# Submit a first time
sched.submit(test.f, x0)
sched.compute()

# Alter state of the object
test.k = 2

# Submit a second time
sched.submit(test.f, x0)
sched.compute()

This produces the following correct results:

mpirun -np 2 python test.py 
Using MPI scheduler.
Caching is enabled!
Result not found, computing...
FUNCTION: False ARGS: False HASH: False 
COMPUTING F
Found cached results!
英文:

I am working on a cache handler class that given an arbitrary function and a set of arguments caches its result. This will be used in a load-balancing MPI scheduler written in python. Although the program will be run on multiple process, only a master process will have a working function of this class.

The scheduler allows to submit a task (func, args) and returns an instance of an MPIFuture object that will in the future hold the result. I want to cache these (func, args) to return a saved MPIFuture object instead of recomputing the result.

I have multiple implementations of this class using cachetools and functools, yet for the sake of argument I have implemented a simple example using a dictionary.

from .MPIFutures import MPIFuture, MPIFutureArray
import dill
import hashlib
import numpy as np

class CacheHandler():
    def __init__(self, scheduler):
        self.sched = scheduler
        self.cache = {}

        if self.sched.is_master():
            print("Caching is enabled!")
    
    # ======= Forward methods =======
    def compute(self):
        self.sched.compute()

    def init(self):
        self.sched.init()
    
    def finalize(self):
        self.sched.finalize()
    # ======= END =======

    # ======= Redefine methods =======  
    def submit_multiple(self, func, *args):
        return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])

    def _generate_key(self, func, args):
        # Serialize the function and arguments using dill
        serialized_data = dill.dumps(args)

        # Generate a hash of the serialized data
        data_hash = hashlib.sha256(serialized_data).hexdigest()

        return (func, data_hash)

    def submit(self, func, *args):
        # Get hash
        key = self._generate_key(func, args)

        # Check if result has been cached already
        if key in self.cache.keys():
            return self.cache[key]
            
        # Submit work to the scheduler and cache returned future
        future = self.sched.submit(func, *args)
        self.cache[key] = future
        
        return future

When testing this implementation with simple MPI benchmarks, it produces the correct results, yet when I implement it into the main MPI application code, it fails to find any cache hits (always misses).

To see what was going on, I modified the class to this:

from .MPIFutures import MPIFuture, MPIFutureArray
import dill
import hashlib
import numpy as np

class CacheHandler():
    def __init__(self, scheduler):
        self.sched = scheduler
        self.cache = {}
        self.f = []
        self.x = []
        self.h = []
        if self.sched.is_master():
            print("Caching is enabled!")
    
    # ======= Forward methods =======
    def compute(self):
        self.sched.compute()

    def init(self):
        self.sched.init()
    
    def finalize(self):
        self.sched.finalize()
    # ======= END =======

    # ======= Redefine methods =======  
    def submit_multiple(self, func, *args):
        return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])

    def _generate_key(self, func, args):
        # Serialize the function and arguments using dill
        serialized_data = dill.dumps(args)

        # Generate a hash of the serialized data
        data_hash = hashlib.sha256(serialized_data).digest()

        return (func, data_hash)


    def submit(self, func, *args):
        # Get hash
        key = self._generate_key(func, args)

        if key in self.cache.keys():
            print("Found cached results!")
            return self.cache[key]
            
        print("Result not found, computing...")
        future = self.sched.submit(func, *args)
        self.cache[key] = future
        
        print(f"FUNCTION: {fi in self.f} ARGS: {xi in self.x} HASH: {hash(key) in self.h} ")
        self.f.append(fi)
        self.x.append(xi)
        self.h.append(hash(key))

        return future

Eventually, the program starts outputting the following:

...
Result not found, computing...
FUNCTION: True ARGS: True HASH: False 
Result not found, computing...
FUNCTION: True ARGS: True HASH: False 
Result not found, computing...
FUNCTION: True ARGS: True HASH: False 
...

This indicates that the functions and arguments are indeed the same, yet the hash of a tuple containing the two is not. For the sake of testing I even tried replacing func with func.__name__ and I got the same result. Most of the time the arguments are one or more numpy arrays. I also tried converting args to a tuple before serialisation but that did not help.

At this point I am not sure what is happening. I am not able reproduce the issue in my minimal working examples.

Can any Python expert give me a hint on what is wrong here?

EDIT 1:

Here is an example benchmark I tested:

class T():
    def __init__(self):
        self.k = 1
    def f(x):
        print("COMPUTING F")
        return x


test = T()

x0 = np.array([1., 2., 3.])

# Submit a first time
sched.submit(test.f, x0)
sched.compute()

# Alter state of the object
test.k = 2

# Submit a second time
sched.submit(test.f, x0)
sched.compute()

This produces the following correct results:

mpirun -np 2 python test.py 
Using MPI scheduler.
Caching is enabled!
Result not found, computing...
FUNCTION: False ARGS: False HASH: False 
COMPUTING F
Found cached results!

答案1

得分: 0

以下是您提供的代码的翻译部分:

鉴于我知道我要缓存的函数的参数主要是numpy数组,我决定重新设计这个类,以利用arr.tobytes()函数而不是通过dill进行序列化。

我不确定与旧实现的主要区别是什么,但这个代码可以正常工作。

from .MPIFutures import MPIFuture, MPIFutureArray
import numpy as np
from cachetools import LRUCache, MRUCache, RRCache
import hashlib
from math import floor, log10

class CacheHandler():
    def __init__(self, scheduler, tol=1e-6, eviction_strategy="RR", max_cache_size=1000):
        self._sched = scheduler
        self._dec = self._tol_to_dec(tol)
        self._cache = {}
        self._maxsize = max_cache_size
        self._evict = eviction_strategy

        if self._sched.is_master():
            print("Caching is enabled!")
            print(f"Tolerance set to {self._dec} decimals precision (tol = {tol})")
            print(f"Eviction strategy set to {self._evict}")

    # ======= 前向方法 =======
    def compute(self):
        self._sched.compute()

    def init(self):
        self._sched.init()

    def finalize(self):
        self._sched.finalize()
    # ======= END =======

    # ======= 辅助函数 =======
    def _tol_to_dec(self, tol):
        if tol > 0:
            return -int(floor(log10(tol)))
        else:
            raise ValueError("Tolerance must be a positive number.")

    def _hash_args(self, args):
        # 序列化参数
        serialized_args = [np.round(arg, decimals=self._dec).tobytes() for arg in args]

        # 生成序列化数据的哈希
        hashes = [hashlib.sha256(arg).hexdigest() for arg in serialized_args]
        return "".join(hashes)

    def _get_new_cache(self):
        if self._evict == "LRU":
            return LRUCache(maxsize=self._maxsize)
        elif self._evict == "MRU":
            return MRUCache(maxsize=self._maxsize)
        elif self._evict == "RR":
            return RRCache(maxsize=self._maxsize)
        else:
            raise ValueError(f"Unknown eviction strategy {self._evict}")
    
    # ======= 重新定义方法 =======
    def submit_multiple(self, func, *args):
        return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])

    def submit(self, func, *args):
        # 只能缓存具有numpy参数的函数
        if any(not isinstance(arg, np.ndarray) for arg in args):
            return self._sched.submit(func, *args)

        # 函数尚未缓存
        if func not in self._cache.keys():
            self._cache[func] = self._get_new_cache()
        
        # 为参数生成哈希
        hash = self._hash_args(args)
        
        # 使用密钥尝试从缓存中获取结果
        future = self._cache[func].get(hash)

        # 缓存未命中
        if future is None:
            future = self._sched.submit(func, *args)
            self._cache[func][hash] = future

        return future
英文:

Given that I know that the arguments of the functions I have to cache are mainly numpy arrays, I decided to redesign the class to leverage the arr.tobytes() function instead of serialisation via dill.

I am not sure what the major difference from the old implementation is, but this works correctly.

from .MPIFutures import MPIFuture, MPIFutureArray
import numpy as np
from cachetools import LRUCache, MRUCache, RRCache
import hashlib
from math import floor, log10

class CacheHandler():
    def __init__(self, scheduler, tol=1e-6, eviction_strategy = "RR", max_cache_size=1000):
        self._sched = scheduler
        self._dec = self._tol_to_dec(tol)
        self._cache = {}
        self._maxsize = max_cache_size
        self._evict = eviction_strategy

        if self._sched.is_master():
            print("Caching is enabled!")
            print(f"Tolerance set to {self._dec} decimals precision (tol = {tol})")
            print(f"Eviction strategy set to {self._evict}")
    
    # ======= Forward methods =======
    def compute(self):
        self._sched.compute()

    def init(self):
        self._sched.init()
    
    def finalize(self):
        self._sched.finalize()
    # ======= END =======

    # ======= Helper functions =======
    def _tol_to_dec(self, tol):
        if tol > 0:
            return -int(floor(log10(tol)))
        else:
            raise ValueError("Tolerance must be a positive number.")
        
    def _hash_args(self, args):
        # Serialize args
        serialized_args = [np.round(arg, decimals=self._dec).tobytes() for arg in args]

        # Generate a hash of the serialized data
        hashes = [hashlib.sha256(arg).hexdigest() for arg in serialized_args]
        return "".join(hashes)

    def _get_new_cache(self):
        if self._evict == "LRU":
            return LRUCache(maxsize=self._maxsize)
        elif self._evict == "MRU":
            return MRUCache(maxsize=self._maxsize)
        elif self._evict == "RR":
            return RRCache(maxsize=self._maxsize)
        else:
            raise ValueError(f"Unknown eviction strategy {self._evict}")
    
    # ======= Redefine methods =======  
    def submit_multiple(self, func, *args):
        return MPIFutureArray([self.submit(func, *domain) for domain in list(zip(*args))])
    
    def submit(self, func, *args):
        # Can only cache functions with numpy arguments
        if any(not isinstance(arg, np.ndarray) for arg in args):
            return self._sched.submit(func, *args)

        # Function not cached yet
        if func not in self._cache.keys():
            self._cache[func] = self._get_new_cache()
        
        # Generate hash for args
        hash = self._hash_args(args)
        
        # Use the key to attempt to get the result from the cache
        future = self._cache[func].get(hash)

        # Cache miss
        if future is None:
            future = self._sched.submit(func, *args)
            self._cache[func][hash]= future

        return future

huangapple
  • 本文由 发表于 2023年6月4日 23:01:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/76401014.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定