英文:
Is there a way to run a single job instance across multiple workers in fastapi
问题
我正在使用apscheduler创建后台作业在我的fastapi应用程序中运行,但是当我启动应用程序使用多个工作进程时,作业会在工作进程之间重复,导致各种错误和潜在的安全漏洞。
以下是一个简化的示例:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import BackgroundTasks, FastAPI
def some_job():
print("hello world")
app = FastAPI()
shed = AsyncIOScheduler()
shed.add_job(some_job, "interval", seconds=5)
@app.get("/test")
async def test():
return {"Hello": "world"}
shed.start()
要运行它,请使用uvicorn main:app --workers 4
。
这将导致每次触发作业时都会打印hello world
4 次。
是否有一种方法可以跨所有作业调用一个实例(在父进程级别)?
我在网上研究了一些解决方案,但大多数都使用了celery和类似的模块,或者锁定文件和内存位置,但这两种选项都太复杂了,我更喜欢使用最少数量的模块。
英文:
I am using apscheduler to create background jobs to run in my fastapi application, however when I start the application with more than one worker the job is duplicated across the workers resulting in all kinds of error and potentially security flaws.
here is a simplified example
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import BackgroundTasks, FastAPI
def some_job():
print ("hello world")
app = FastAPI()
shed = AsyncIOScheduler()
shed.add_job(some_job, "interval", seconds=5)
@app.get("/test")
async def test():
return {"Hello":"world"}
shed.start()
and to run it use uvicorn main:app --workers 4
this will result in hello world
being printed 4 times every time the job is triggered.
Is there a way to call one instances across all the jobs (on the parent process level)?
I research some solutions online but most of them use celery and similar modules or lock files and memory locations but both options are too complicated and I prefer to use the minimum number of modules
答案1
得分: 1
最简单的解决方案是不在与FastAPI相同的进程中运行调度程序,而是作为两个不同的进程运行。类似这样:
# main.py
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
@app.get("/test")
async def test():
return {"Hello": "world"}
# scheduler.py
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
def some_job():
print("hello world")
shed = AsyncIOScheduler()
shed.add_job(some_job, "interval", seconds=5)
if __name__ == "__main__":
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
然后在一个终端中运行 uvicorn main:app --workers 4
,在另一个终端中运行 python scheduler.py
。
英文:
The simplest solution would be to not run your scheduler in the same process as FastAPI, but as two different processes. Something like:
# main.py
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
@app.get("/test")
async def test():
return {"Hello":"world"}
# scheduler.py
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
def some_job():
print ("hello world")
shed = AsyncIOScheduler()
shed.add_job(some_job, "interval", seconds=5)
if __name__ == "__main__":
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
and in one terminal run uvicorn main:app --workers 4
, and in another python scheduler.py
.
答案2
得分: 1
你可以在父进程中运行调度器,然后使用Python的multiprocessing在一个单独的子进程中运行fastapi。
import multiprocessing
from apscheduler.schedulers.background import BackgroundScheduler
import uvicorn
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def home():
return {"Hello": "world"}
def regular_function():
print("Hello world")
def run_app():
uvicorn.run("main:app", host="0.0.0.0", port=8000, workers=4)
def main():
app_process = multiprocessing.Process(target=run_app)
app_process.start()
scheduler = BackgroundScheduler()
scheduler.add_job(regular_function, 'interval', seconds=5, max_instances=1)
scheduler.start()
app_process.join()
if __name__ == "__main__":
main()
英文:
you could run the scheduler in the parent process and the fastapi in a sperate child process by using python's multiprocessing
import multiprocessing
from apscheduler.schedulers.background import BackgroundScheduler
import uvicorn
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def home():
return {"Hello":"world"}
def regular_function():
print("Hello world")
def run_app():
uvicorn.run("main:app", host="0.0.0.0", port=8000, workers=4)
def main():
app_process = multiprocessing.Process(target=run_app)
app_process.start()
scheduler = BackgroundScheduler()
scheduler.add_job(regular_function, 'interval', seconds=5, max_instances=1)
scheduler.start()
app_process.join()
if __name__ == "__main__":
main()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论