英文:
Intelligent distribution of tasks among workers in Celery
问题
在一周的尝试和搜索之后,我没有得到任何结果,我将非常感谢您的帮助。
摘要:
我有4名工作者,每个工作者都有一个app.task。
每天,这4名工作者必须完成近千个任务。
问题是如何智能地在这4名工作者之间分配任务。
更多细节:
我的当前代码将1,000个任务分成4份,然后每个工作者被分配250个任务。为什么要分割?因为我必须在工作开始时apply_async任务(因此每个工作者都有一个单独的队列)。
工作者执行任务没有任何问题,但当一些工作者执行任务更快并且最终没有任务可执行时,一些工作者可能会执行他们的任务更长时间。
我在寻找什么?
我们正在寻找一种将所有1,000个任务放在一个队列中,而不分割它们,并自动从这1,000个队列中移除这4个工作者的任务并执行它们的方法,在这种情况下,其他工作者将几乎同时完成他们的任务。
我的代码分为4个文件:
celery.py:
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('celery_app')
app.config_from_object('celery_app.celeryconfig')
if __name__ == '__main__':
app.start()
celeryconfig.py:
broker_url = 'amqp://guest@localhost//'
result_backend = 'rpc://'
include = ['celery_app.tasks']
worker_prefetch_multiplier = 1
task_routes = {
'celery_app.tasks.app_1000': {'queue': 'q_1000'},
'celery_app.tasks.app_1000': {'queue': 'q_1002'},
'celery_app.tasks.app_1000': {'queue': 'q_1004'},
'celery_app.tasks.app_1000': {'queue': 'q_1006'},
'celery_app.tasks.app_timeout': {'queue': 'q_timeout'},
}
tasks.py:
tasks.py文件中有许多代码,请查看此链接GitHub链接。
api.py:
这是一个模拟的API,例如,如果我发送数字1000,就好像它必须执行1000个任务,并将这1000个任务分配给4个应用程序。
from fastapi import FastAPI, HTTPException, Request
from celery_app.tasks import app_1000, app_1002, app_1004, app_1006, get_active_queue
import random
app = FastAPI()
@app.get("/run_tasks")
async def run_tasks(num_of_tasks: int):
try:
app_list = [app_1000, app_1002, app_1004, app_1006]
for i in range(0, num_of_tasks, 4):
app_list[0].apply_async()
app_list[1].apply_async()
app_list[2].apply_async()
app_list[3].apply_async()
return 'ok'
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/get_active_queue")
async def get_active_q():
res = get_active_queue()
print(res)
return 'ok'
请指导我如何实现这一点?
如果有什么愚蠢的地方,请评论,这样我就可以进一步解释。
我的代码在以下链接中:https://github.com/arezooebrahimi/celery_distribution_tasks
英文:
After a week of trying and searching, I didn't get any results and I would appreciate your help.
Summary:<br>
I have 4 workers and there is an app.task inside each worker.<br>
Every day, these 4 workers have to do nearly thousand tasks.<br>
The problem is how to intelligently divide tasks among these 4 workers.<br>
<br>
more details:<br>
My current code divides 1,000 tasks by 4, and then each worker is given 250 tasks. Why do I share? Because I have to apply_async the tasks at the beginning of the work. (so each worker has a separate queue)<br>
Workers execute tasks without any problems, but a challenge arises when some workers execute tasks faster and end up without tasks, while some workers may be executing their tasks for more hours.
<br><br>
<br>
What am I looking for?<br>
We are looking for a way to have all 1,000 tasks in one queue without dividing them and automatically remove these 4 worker tasks from this 1,000 queue and execute them, in this case the other workers will finish their tasks almost together.
<br>
My codes are in 4 files:
<br>
celery.py:
from __future__ import absolute_import,unicode_literals
from celery import Celery
app=Celery('celery_app')
app.config_from_object('celery_app.celeryconfig')
if __name__ == '__main__':
app.start()
celeryconfig.py:
broker_url='amqp://guest@localhost//'
result_backend='rpc://'
include=['celery_app.tasks']
worker_prefetch_multiplier = 1
task_routes={
'celery_app.tasks.app_1000':{'queue':'q_1000'},
'celery_app.tasks.app_1000':{'queue':'q_1002'},
'celery_app.tasks.app_1000':{'queue':'q_1004'},
'celery_app.tasks.app_1000':{'queue':'q_1006'},
'celery_app.tasks.app_timeout':{'queue':'q_timeout'},
}
tasks.py:<br>
There are many codes in the tasks.py file, see this link in GitHub.
api.py:<br>
This is a simulated api, for example, if I send the number 1000, it is like it has to do 1000 tasks and this thousand tasks are divided between 4 apps.<br>
from fastapi import FastAPI, HTTPException,Request
from celery_app.tasks import app_1000,app_1002,app_1004,app_1006,get_active_queue
import random
app = FastAPI()
@app.get("/run_tasks")
async def run_tasks(num_of_tasks:int):
try:
app_list = [app_1000,app_1002,app_1004,app_1006]
for i in range(0,num_of_tasks, 4):
app_list[0].apply_async()
app_list[1].apply_async()
app_list[2].apply_async()
app_list[3].apply_async()
return 'ok'
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/get_active_queue")
async def get_active_q():
res = get_active_queue()
print(res)
return 'ok'
<br>
Please guide me how to do this?
If something was dumb, comment so I can explain more.
<br>
My code is in the following link: https://github.com/arezooebrahimi/celery_distribution_tasks
答案1
得分: 2
如果我理解正确,您希望工作人员在空闲时立即接手任务,并且所有工作人员都执行相同的任务。使用单个队列,所有工作人员都监听它应该可以正常工作,这是默认情况下的使用方式。如果您有长时间运行的任务,您可能希望在Celery的配置中设置worker_prefetch_multiplier=1
。我不明白为什么您要从10个不同的队列开始,而不是使用一个单一的队列来分配任务给所有工作人员。
英文:
If I understand it correctly, you want your workers to pick up tasks as soon as they are free and all the workers have same tasks. Using a single queue, with all the workers listening on it should work and this is how it is used by default. If you are having long running tasks, you might want to set worker_prefetch_multiplier=1
in the celery configuration. I don't understand what made you start with 10 different queues instead of a single queue for all the workers.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论