如何在Python中从装饰器本身中调用函数的装饰器。

huangapple go评论69阅读模式
英文:

How to recall function's decorators from the decorator itself in python

问题

我有一个装饰器,用来限制API请求的频率,但我想在请求被限制时,不是忽略它们,而是在一段时间后重试它们。

装饰器的逻辑如下:

def rate_limiter(self, func: Callable):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if is_request_allowed():
            LOGGER.info('Request allowed')
            return func(*args, **kwargs)
        else:
            LOGGER.info('Request not allowed')
            # sleep(reasonable_amount_of_time)
            # retry_function_with_decorator()
    return wrapper

装饰器中的“允许”部分运行良好,但装饰器本身中的“retry_function_with_decorator()”是我遇到问题的地方。

我尝试将函数对象传递给另一个函数,并从那里调用它,但它直接执行了被装饰的函数,而不是包装函数。

这是否可能?我漏掉了什么吗?还是有更好的方法?

提前谢谢。

英文:

I have a decorator that I use to rate limit api requests, but instead of ignore the requests, I want to retry them after some time wait.

The decorator logic is the following:

def rate_limiter(self, func: Callable):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if is_request_allowed():
            LOGGER.info('Request allowed')
            return func(*args, **kwargs)
        else:
            LOGGER.info('Request not allowed')
            # sleep(reasonable_amount_of_time)
            # retry_function_with_decorator()
    return wrapper

The "allowed" part of the decorator works fine, but the "retry_function_with_decorator()" from the decorator itself is where I'm stuck with.

I have tried to pass the function object to another function and call it from there but it executes the decorated function directly, not the wrapper.

Is this something possible? I'm missing something? or there is a better approach?

Thank you in advance.

答案1

得分: 0

根据您当前的定义,您可以只是递归调用wrapper

def rate_limiter(self, func: Callable):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if is_request_allowed():
            LOGGER.info('请求允许')
            return func(*args, **kwargs)
        else:
            LOGGER.info('请求不允许')
            # sleep(合理的等待时间)
            return wrapper(*args, **wargs)
    return wrapper

然而,一个简单的while循环会更好。

def rate_limiter(self, func: Callable):
    @wraps(func)
    def wrapper(*args, **kwargs):
        while not is_request_allowed():
            LOGGER.info('请求不允许')
            # sleep(合理的等待时间)
        LOGGER.info('请求允许')
        return func(*args, **kwargs)
    return wrapper
英文:

Given your current definition, you could just call wrapper recursively.

def rate_limiter(self, func: Callable):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if is_request_allowed():
            LOGGER.info('Request allowed')
            return func(*args, **kwargs)
        else:
            LOGGER.info('Request not allowed')
            # sleep(reasonable_amount_of_time)
            return wrapper(*args, **wargs)
    return wrapper

However, a simple while loop would be better.

def rate_limiter(self, func: Callable):
    @wraps(func)
    def wrapper(*args, **kwargs):
        while not is_request_allowed():
            LOGGER.info('Request not allowed')
            # sleep(reasonable_amount_of_time)
        LOGGER.info('Request allowed')
        return func(*args, **kwargs)
    return wrapper

答案2

得分: 0

只需要一个简单的while循环。

然而,在那里的装饰函数通过func来进行处理,你可以将它传递或再次调用。

重新排列你的代码以正确重试:

def rate_limiter(self, func: Callable):
    @wraps(func)
    def wrapper(*args, _limit_retries=2, **kwargs):
        counter = 0
        while counter < _limit_retries:
            if is_request_allowed():
                LOGGER.info(f'在第 {counter} 次尝试时允许请求')
                return func(*args, **kwargs)
        
            LOGGER.info(f'在第 {counter} 次尝试时不允许请求')
            # sleep(reasonable_amount_of_time)
            counter += 1
            # retry_function_with_decorator()
        LOGGER.warn(f'在 {_limit_retries} 次尝试后拒绝请求。')

    return wrapper

这将适用于单个工作线程,但请记住,对于一个将水平扩展并具有多个工作线程的系统,不论是多线程、多进程、异步还是分布在多台主机上(使用Celery或类似工具),速率限制都需要考虑到这一点。

英文:

you just need a plain old while loop instead.

However, the decorated function in there attends by func, and you can just pass it around or call it again.

Rearranging your code for correctly retrying stuff:

def rate_limiter(self, func: Callable):
    @wraps(func)
    def wrapper(*args, _limit_retries=2, **kwargs):
        counter = 0
        while counter &lt; _limit_retries
            if is_request_allowed():
                LOGGER.info(f&#39;Request allowed at attempt {counter}&#39;)
                return func(*args, **kwargs)
        
            LOGGER.info(f&#39;Request not allowed at attempt {counter}&#39;)
            # sleep(reasonable_amount_of_time)
            counter += 1
            # retry_function_with_decorator()
        LOGGER.warn(f&#39;Request denied after {_limit_retries} attempts.&#39;)

    return wrapper

This will work for a single worker - keep in mind that for a system that will scale horizontally and have several workers - be them multi-threaded, multi-processed, async, or distributed across hosts (with celery or similar), limit rating have to take that into account.

huangapple
  • 本文由 发表于 2023年2月8日 22:25:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/75387180.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定