如何在Celery中获取任务状态

huangapple go评论77阅读模式
英文:

how to get task status in celery

问题

我有一个Celery任务,我想使用任务ID来获取任务的状态。我已经阅读了之前的答案,但无法使其工作。我使用了以下命令:

celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8pe

命令结果

celery.py

import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "coutoEditor.settings")
app = Celery("coutoEditor")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

settings.py

CELERY_BROKER_URL = "redis://localhost:6379"
CELERY_RESULT_BACKEND = "redis://localhost:6379"

tasks.py

@shared_task()
def speed_up_vid_task(input_path, speed_factor, start, end):
    '''
    方法参数:
    input_path: 视频文件的URL或目录路径文件名
    speed_factor: 加速过程的速度因子
    start: 需要加速的视频部分的开始时间(以秒为单位)
    end: 需要加速的视频部分的结束时间(以秒为单位)
    '''
    # 你的任务代码在这里

# views.py

class speed_up_video(APIView):
    def post(self, request):
        # 你的视图代码在这里

我试图在Django上实现Celery,在一些尝试后,它正常运行并在Celery终端中显示输出。我想从终端获取任务的状态,使用任务ID。Django、Redis和Celery服务器都正常运行。我正在学习如何实现Celery,但卡在这一部分。

我正在使用Redis作为代理和后端数据库。我已经看到有一篇关于如何从Redis获取任务状态的博客,但无法使其工作。如果有其他查询方式,请回答。

当我在任务装饰器中使用bind=True时,我的Django应用程序会抛出参数错误。

谢谢。

英文:

I have a celery task and I want the status of the task using task ID. I have read the previous answers but couldn't make it work. I used the command

celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8pe

command result

celery.py

import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "coutoEditor.settings")
app = Celery("coutoEditor")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

settings.py

CELERY_BROKER_URL = "redis://localhost:6379"
CELERY_RESULT_BACKEND = "redis://localhost:6379"

tasks.py

@shared_task()
def speed_up_vid_task(input_path, speed_factor, start, end):


    '''
    Method Params:
    input_path: The video file url or directory path file name
    speed_factor: The factor for the speed up process
    start: start of the video part that needs to be sped up (in secs)
    end: end of the video part that needs to be sped up (in secs)
    '''

    start = convert_to_sec(start)
    end = convert_to_sec(end)

    filename = str(uuid.uuid4())
    print(filename, "new")
    temporary_dir = BASE_DIR + '/' + editor_speedUp_temp_dir  # editor_temp_dir = media/editor/speed_vid/temp/"
    output_dir = BASE_DIR + '/' + editor_speedUp_output_dir  # editor_speedUp_output_dir = media/editor/speed_vid/

    # Check for broken url
    r = requests.get(input_path, stream=True)
    if not r.status_code == 200:
        return Response({
            'message': "media file is corrupted",
            'data': "broken url process could not be completed",
            'status': False
        }, status=status.HTTP_400_BAD_REQUEST)

    if not os.path.exists(output_dir):
        os.mkdir(output_dir)

    if not os.path.exists(temporary_dir):
        os.mkdir(temporary_dir)

    stream = os.popen(
        "ffmpeg.ffprobe -loglevel error -select_streams a -show_entries stream=codec_type -of csv=p=0 '{}'".format(
            input_path))
    output = stream.read()
    if len(output) == 0:
        input_path_vid = os.path.join(BASE_DIR, temporary_dir) + filename + "_temp_video.mp4"
        cmd = "ffmpeg -f lavfi -i anullsrc=channel_layout=stereo:sample_rate=44100 -i '{}' -c:v copy -c:a aac -shortest {}".format(
            input_path, input_path_vid)
        os.system(cmd)
    else:
        # check if it's a directory or a url
        if(os.path.isfile(input_path)):
            input_path_vid = BASE_DIR + input_path
            pass
        else:
            ext_name = filename + '_temp_video.mp4'
            ext_path = temporary_dir + ext_name
            r = requests.get(input_path)
            with open(ext_path, 'wb') as outfile:
                outfile.write(r.content)
            outfile.close()
            input_path_vid = ext_path

    output_path = os.path.join(BASE_DIR, editor_speedUp_output_dir + filename + ".mp4")

    cmd = 'ffmpeg -i ' + input_path_vid + ' \
            -filter_complex \
            "[0:v]trim=0:' + str(start) + ',setpts=PTS-STARTPTS[v1]; \
            [0:v]trim=' + str(start) + ':' + str(end) + ',setpts=1/' + str(speed_factor) + '*(PTS-STARTPTS)[v2]; \
            [0:v]trim=' + str(end) + ',setpts=PTS-STARTPTS[v3]; \
            [0:a]atrim=0:' + str(start) + ',asetpts=PTS-STARTPTS[a1]; \
            [0:a]atrim=' + str(start) + ':' + str(end) + ',asetpts=PTS-STARTPTS,atempo=' + str(speed_factor) + '[a2]; \
            [0:a]atrim=' + str(end) + ',asetpts=PTS-STARTPTS[a3]; \
            [v1][a1][v2][a2][v3][a3]concat=n=3:v=1:a=1" \
            -preset superfast -profile:v baseline ' + output_path

    os.system(cmd)

    generated_video = open(output_path, "rb")
    generated_video_file = TemporaryFiles.objects.create(temp_file=File(generated_video, name=filename + ".mp4"),
                                                         created_at=datetime.utcnow())
    generated_video.close()

    if os.path.exists(input_path_vid):
        os.remove(input_path_vid)

    if os.path.exists(output_path):
        os.remove(output_path)

    res_dict = {}
    res_dict["video_url"] = os.path.join(BASE_URL, generated_video_file.temp_file.url[1:])

    return res_dict

views.py

class speed_up_video(APIView):
    def post(self,request):
        video_url = request.data["video_url"]
        speed_factor = request.data["speed_factor"]
        start = request.data["start"]
        end = request.data["end"]
        result_vid = speed_up_vid_task.delay(video_url, speed_factor, start, end)
        return Response(result_vid.get())```



I was trying to implement celery on django and after some trials it worked and showed the output in the celery terminal. I want to get the status of a task from terminal using the task id. The django, redis and celery servers are running fine. I am learning to implement celery and I am stuck on getting this part.
I am using redis as a broker and backend database.
I have seen a blog about getting task status from redis but couln't make it work.
If there is any other way to make the query then please answer.

when I do bind=True in task decorator my django app throws argument error.

Thanks.

答案1

得分: 0

Django有 django-celery-results,它有一个名为 TaskResult 的模型用于保存任务的结果。您可以查询以检查任务的状态。

但它在Celery中运行,您必须等待任务队列运行并完成,然后才能获取数据。一种方法是循环等待,就像这样:

all_tasks = []
task = speed_up_vid_task.delay(video_url, speed_factor, start, end)
all_tasks.append(task.task_id)
while len(all_tasks) > 0:
    time.sleep(1)
    for task in TaskResult.objects.filter(task_id__in=all_tasks):
        if task.status in ['SUCCESS', 'FAILURE']:
            # 在此处执行您想要的操作
            all_tasks.remove(task.task_id)
英文:

Django have django-celery-results have model TaskResult for keep result of task. You can query for check status.

But it run in celery, you must wait task queue, run and finished before can get data. One way todo it is loop and wait.
Like this:

all_tasks = []
task = speed_up_vid_task.delay(video_url, speed_factor, start, end)
all_tasks.append(task.task_id)
while len(all_tasks) > 0:
    time.sleep(1)
    for task in TaskResult.objects.filter(task_id__in=all_tasks):
        if task.status in ['SUCCESS', 'FAILURE']:
            # do anything you want in this
            all_tasks.remove(task.task_id)

huangapple
  • 本文由 发表于 2023年2月10日 16:16:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/75408466.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定