有没有办法在fastapi中让单个作业实例在多个工作程序之间运行?

huangapple go评论66阅读模式
英文:

Is there a way to run a single job instance across multiple workers in fastapi

问题

我正在使用apscheduler创建后台作业在我的fastapi应用程序中运行,但是当我启动应用程序使用多个工作进程时,作业会在工作进程之间重复,导致各种错误和潜在的安全漏洞。

以下是一个简化的示例:

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import BackgroundTasks, FastAPI

def some_job():
    print("hello world")

app = FastAPI()
shed = AsyncIOScheduler()

shed.add_job(some_job, "interval", seconds=5)

@app.get("/test")
async def test():
    return {"Hello": "world"}

shed.start()

要运行它,请使用uvicorn main:app --workers 4

这将导致每次触发作业时都会打印hello world 4 次。

是否有一种方法可以跨所有作业调用一个实例(在父进程级别)?

我在网上研究了一些解决方案,但大多数都使用了celery和类似的模块,或者锁定文件和内存位置,但这两种选项都太复杂了,我更喜欢使用最少数量的模块。

英文:

I am using apscheduler to create background jobs to run in my fastapi application, however when I start the application with more than one worker the job is duplicated across the workers resulting in all kinds of error and potentially security flaws.

here is a simplified example


from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import BackgroundTasks, FastAPI

def some_job():
	print ("hello world")

app = FastAPI()
shed = AsyncIOScheduler()

shed.add_job(some_job, "interval", seconds=5)


@app.get("/test")
async def test():
	return {"Hello":"world"}

shed.start()

and to run it use uvicorn main:app --workers 4

this will result in hello world being printed 4 times every time the job is triggered.

Is there a way to call one instances across all the jobs (on the parent process level)?

I research some solutions online but most of them use celery and similar modules or lock files and memory locations but both options are too complicated and I prefer to use the minimum number of modules

答案1

得分: 1

最简单的解决方案是不在与FastAPI相同的进程中运行调度程序,而是作为两个不同的进程运行。类似这样:

# main.py
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

@app.get("/test")
async def test():
    return {"Hello": "world"}
# scheduler.py
import asyncio

from apscheduler.schedulers.asyncio import AsyncIOScheduler

def some_job():
    print("hello world")

shed = AsyncIOScheduler()
shed.add_job(some_job, "interval", seconds=5)

if __name__ == "__main__":
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

然后在一个终端中运行 uvicorn main:app --workers 4,在另一个终端中运行 python scheduler.py

英文:

The simplest solution would be to not run your scheduler in the same process as FastAPI, but as two different processes. Something like:

# main.py
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

@app.get("/test")
async def test():
    return {"Hello":"world"}
# scheduler.py
import asyncio

from apscheduler.schedulers.asyncio import AsyncIOScheduler

def some_job():
    print ("hello world")

shed = AsyncIOScheduler()
shed.add_job(some_job, "interval", seconds=5)

if __name__ == "__main__":
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

and in one terminal run uvicorn main:app --workers 4, and in another python scheduler.py.

答案2

得分: 1

你可以在父进程中运行调度器,然后使用Python的multiprocessing在一个单独的子进程中运行fastapi。

import multiprocessing
from apscheduler.schedulers.background import BackgroundScheduler
import uvicorn
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def home():
    return {"Hello": "world"}

def regular_function():
    print("Hello world")

def run_app():
    uvicorn.run("main:app", host="0.0.0.0", port=8000, workers=4)

def main():
    app_process = multiprocessing.Process(target=run_app)
    app_process.start()

    scheduler = BackgroundScheduler()
    scheduler.add_job(regular_function, 'interval', seconds=5, max_instances=1)
    scheduler.start()

    app_process.join()

if __name__ == "__main__":
    main()
英文:

you could run the scheduler in the parent process and the fastapi in a sperate child process by using python's multiprocessing

import multiprocessing
from apscheduler.schedulers.background import BackgroundScheduler
import uvicorn
from fastapi import FastAPI

app = FastAPI()
@app.get("/")
async def home():
    return {"Hello":"world"}

def regular_function():
    print("Hello world")

def run_app():
    uvicorn.run("main:app", host="0.0.0.0", port=8000, workers=4)

def main():
    app_process = multiprocessing.Process(target=run_app)
    app_process.start()

    scheduler = BackgroundScheduler()
    scheduler.add_job(regular_function, 'interval', seconds=5, max_instances=1) 
    scheduler.start()

    app_process.join()

if __name__ == "__main__":
    main()

huangapple
  • 本文由 发表于 2023年7月13日 16:44:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/76677485.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定