获取由Flask应用程序中的Celery创建的Redis中任务的所有键列表。

huangapple go评论69阅读模式
英文:

Get list of all keys in redis created by celery for tasks in Flask app

问题

以下是代码的中文翻译部分:

import redis
import json

# 创建一个与 Redis 服务器连接的客户端
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# 获取所有以 'celery-task-meta-' 开头的键名
task_ids = redis_client.keys('celery-task-meta-*')

# 未完成任务列表
unfinished_tasks = []

# 已完成任务的数量
number_of_finished = []

# 遍历任务列表
for task_id in task_ids:
    # 获取任务结果
    task_result = redis_client.get(task_id)
    
    # 将结果解码为字符串
    string = task_result.decode('utf-8')
    
    # 将字符串解析为字典
    dictionary = json.loads(string)
    
    # 如果字典中没有 'status' 键,则任务未完成
    if "status" not in dictionary:
        unfinished_tasks.append(task_id)

# 打印未完成任务列表和数量
print('未完成的任务:', unfinished_tasks)
print('未完成任务的数量:', len(unfinished_tasks))

这段代码用于获取 Redis 中以 'celery-task-meta-' 开头的键,然后检查这些任务是否已完成。如果任务字典中没有 'status' 键,那么任务被视为未完成,将其添加到未完成任务列表中。最后,代码打印未完成任务的列表和数量。

注意:这段代码并没有直接解决您的问题,因为您提到的任务在第二个容器宕机后可能不会出现在 Redis 中。如果第二个容器宕机,那些任务可能会丢失,除非您的应用有一种机制来处理这种情况。

英文:

I have two containers which are running Python Flask apps let's say 1 and 2. Number 1 communicates with Redis and creates below messages in redis-cli monitor:

1677744862.693857 [0 192.168.80.13:44076] "SUBSCRIBE" "celery-task-meta-11902e64-94cc-4e02-89ce-3cea365294bb"
1677744862.694585 [0 192.168.80.13:44044] "PING"
1677744862.694949 [0 192.168.80.13:44044] "LPUSH" "celery" "{"body": "siDxNiIICwDfDwzDwiINcyWt5ZZd0WiCiwIEXg30Is2sGhtIons", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "task.send_text", "id": "11902e64-94cc-4e02-89ce-3cea365294bb", "shadow": null, "eta": null, "expires": null, "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "11902e64-94cc-4e02-89ce-3cea365294bb", "parent_id": null, "argsrepr": "('123456', '000_000_000_000', '000000', 'DefaultId', '', '1')", "kwargsrepr": "{}", "origin": "gen74@f0eb8cc30754", "ignore_result": false}, "properties": {"correlation_id": "11902e64-94cc-4e02-89ce-3cea365294bb", "reply_to": "7fdc8466-0de3-3be1-8a0a-a48713e2769a", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "b5cc708a-8f28-4fb1-8714-1de4433c8621"}}"
1677744862.696078 [0 192.168.80.13:44076] "UNSUBSCRIBE" "celery-task-meta-11902e64-94cc-4e02-89ce-3cea365294bb

and then container 2 communicates below messages:

1677744993.925599 [0 192.168.80.12:43034] "GET" "celery-task-meta-11902e64-94cc-4e02-89ce-3cea365294bb"
1677744993.927298 [0 192.168.80.12:43034] "MULTI"
1677744993.927340 [0 192.168.80.12:43034] "SETEX" "celery-task-meta-11902e64-94cc-4e02-89ce-3cea365294bb" "86400" "{"status": "SUCCESS", "result": ["True", "", "123456"], "traceback": null, "children": [], "date_done": "2023-03-02T08:16:33.922182", "task_id": "11902e64-94cc-4e02-89ce-3cea365294bb"}"
1677744993.927480 [0 192.168.80.12:43034] "PUBLISH" "celery-task-meta-11902e64-94cc-4e02-89ce-3cea365294bb" "{"status": "SUCCESS", "result": ["True", "", "123456"], "traceback": null, "children": [], "date_done": "2023-03-02T08:16:33.922182", "task_id": "11902e64-94cc-4e02-89ce-3cea365294bb"}"
1677744993.927602 [0 192.168.80.12:43034] "EXEC"
1677744994.868230 [0 192.168.80.12:42992] "BRPOP" "celery" "celery\x06\x163" "celery\x06\x166" "celery\x06\x169" "1"

Now let's say 2nd container is down and above messages(starting from "GET") did not happen. How can I get list of all tasks e.g. celery-task-meta-11902e64-94cc-4e02-89ce-3cea365294bb that were now finished?
Even when I try redis-cli info in # Keyspace part the number of keys do not increase for every task that is created by 1st container.
Can you please help? I already tried Python code below but these kind of tasks still do not appear.

import redis
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)
task_ids = redis_client.keys('celery-task-meta-*')

unfinished_tasks = []
number_of_finished = []
for task_id in task_ids:
    task_result = redis_client.get(task_id)
    string = task_result.decode('utf-8')
    dictionary = json.loads(string)
    if "status" not in dictionary:
        unfinished_tasks.append(task_id)

print('Unfinished tasks:', unfinished_tasks)
print(len(unfinished_tasks))

答案1

得分: 0

我最终使用以下`bash`脚本:
```sh
#!/bin/bash

while true; do
    count=$(docker exec redis redis-cli llen celery)
    if [ $count -gt 100 ]; then
        echo "Redis列表长度超过100!"
    fi
    sleep 1
done

感谢Tim Richardson帮助我。
docker exec redis redis-cli llen celery只告诉你数量,但你可以使用docker exec redis redis-cli lrange celery 0 -1查看任务列表。
最后,如果你在使用Celery,任务队列的名称默认为celery


<details>
<summary>英文:</summary>

I ended up using below `bash` script:
```sh
#!/bin/bash

while true; do
    count=$(docker exec redis redis-cli llen celery)
    if [ $count -gt 100 ]; then
        echo &quot;Length of Redis list exceeded 100!&quot;
    fi
    sleep 1
done

Thanks Tim Richardson for helping me out.
docker exec redis redis-cli llen celery only tells you the number but you can view the list of tasks using docker exec redis redis-cli lrange celery 0 -1
And finally if you are using Celery, the name of the task queue is celery by default.

huangapple
  • 本文由 发表于 2023年3月4日 01:15:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/75630058.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定