Celery会在工作进程异常终止并重新启动后继续执行历史任务。

huangapple go评论66阅读模式
英文:

Celery will continue to execute historical tasks after the worker terminates abnormally and restarts

问题

I added the celery crontab task. When an exception occurs in the worker, I hope that the worker will no longer execute unfinished historical tasks.
我添加了Celery的定时任务。当工作进程出现异常时,我希望工作进程不再执行未完成的历史任务。

I used the following task base class and wanted to preprocess the task by judging the status of the worker, but it didn't achieve the result I expected.
我使用了以下任务基类,并希望通过判断工作进程的状态来预处理任务,但它没有达到我期望的结果。

When I stop the worker, after the time exceeds the task execution time, I restart the worker, it still executes the expired task.
当我停止工作进程,超过任务执行时间后重新启动工作进程,它仍然执行已过期的任务。

class MyTask(Task):

    def before_start(self, task_id, args, kwargs):
        inspect = app.control.inspect()
        active_workers = inspect.active()
        if not active_workers:
            print('worker not running')
            return False
        print('worker running')
        return super().before_start(task_id, args, kwargs)


@app.task(base=MyTask)
def test():
    return 'test'

I hope there is any method or parameter setting that can make the worker no longer execute expired tasks.
我希望有一种方法或参数设置,可以使工作进程不再执行已过期的任务。

英文:

I added the celery crontab task. When an exception occurs in the worker, I hope that the worker will no longer execute unfinished historical tasks.
I used the following task base class and wanted to preprocess the task by judging the status of the worker, but it didn't achieve the result I expected.
When I stop the worker, after the time exceeds the task execution time, I restart the worker, it still executes the expired task.

class MyTask(Task):

    def before_start(self, task_id, args, kwargs):
        inspect = app.control.inspect()
        active_workers = inspect.active()
        if not active_workers:
            print('worker not runing')
            return False
        print('worker runing')
        return super().before_start(task_id, args, kwargs)


@app.task(base=MyTask)
def test():
    return 'test'

I hope there is any method or parameter setting that can make the worker no longer execute expired tasks

答案1

得分: 0

我创建PeriodicTask任务时,设置了expire_seconds=5。当预定任务达到执行时间点时,如果超过5秒未执行,任务将被标记为已取消,从而实现所期望的结果。

英文:

When I created the PeriodicTask task, I set expire_seconds=5. When the scheduled task reaches the execution time point, if it is not executed for more than 5 seconds, the task will be marked as revoked, which can achieve the desired result.

huangapple
  • 本文由 发表于 2023年2月10日 14:18:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/75407550.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定