Is there a difference between Starlette/FastAPI Background Tasks and simply using multiprocessing in Python?

huangapple go评论54阅读模式
英文:

Is there a difference between Starlette/FastAPI Background Tasks and simply using multiprocessing in Python?

问题

I am looking for different ways to queue up functions that will do things like copy files, scrape websites, and manipulate files (tasks that will take considerable time). I am using FastAPI as a backend API, and I came across FastAPI's background task documentation as well as Starlette's background task documentation and I fail to understand why I couldn't just use multiprocessing.

This is what I do currently using Multiprocessing and it works fine.

from multiprocessing import Process
from fastapi import FastAPI, File, UploadFile
app = FastAPI()

def handleFileUpload(file):
    print(file)
    #handle uploading file here

@app.post("/uploadFileToS3")
async def uploadToS3(bucket: str, file: UploadFile = File(...)):
    uploadProcess = Process(target=handleFileUpload, args(file))
    uploadProcess.start()
    return {
        "message": "Data has been queued for upload. You will be notified when it is ready.",
        "status": "OK"
    }

If this works why would FastAPI Background Tasks exist if I can do it just as simply as using Multiprocessing? My only guess is that it has to do with scaling? It may work for myself just testing, but I know that multiprocessing has to do with the number of cores a system has. I may be completely missing the point of multiprocessing. Please help me understand. Thanks.

英文:

I am looking for different ways to queue up functions that will do things like copy files, scrape websites, and manipulate files (tasks that will take considerable time). I am using FastAPI as a backend API, and I came across FastAPI's background task documentation as well as Starlette's background task documentation and I fail to understand why I couldn't just use multiprocessing.

This is what I do currently using Multiprocessing and it works fine.

from multiprocessing import Process
from fastapi import FastAPI, File, UploadFile
app = FastAPI()

def handleFileUpload(file):
    print(file)
    #handle uploading file here

@app.post("/uploadFileToS3")
async def uploadToS3(bucket: str, file: UploadFile = File(...)):
    uploadProcess = Process(target=handleFileUpload, args(file))
    uploadProcess.start()
    return {
        "message": "Data has been queued for upload. You will be notified when it is ready."
        "status": "OK"
    }

If this works why would FastAPI Background Tasks exist if I can do it just as simply as using Multiprocessing? My only guess is that it has to do with scaling? It may work for myself just testing, but I know that multiprocessing has to do with the number of cores a system has. I may be completely missing the point of multiprocessing. Please help me understand. Thanks.

答案1

得分: 2

TL;DR

这些后台任务将始终在与主应用程序相同的进程中执行。它们将在事件循环上异步运行,或者在一个单独的线程中运行。

对于非主要 I/O 操作,你可能应该避免使用它们,而改用 multiprocessing。


Details

如果需要,使用 multiprocessing(正确地)

我不明白为什么我不能简单地使用 multiprocessing。

文档不仅不阻止使用 multiprocessing,FastAPI 文档 明确建议 在需要进行计算密集型任务时使用它。

引用:(我强调)

如果你需要执行繁重的后台计算,而且 你不一定需要它在同一进程中运行(例如,你不需要共享内存、变量等),你可能会从使用其他更大的工具中受益[...]。

所以,你 可以。如果你想在后台执行与 CPU 绑定的工作,你几乎肯定 必须 使用自己的 multiprocessing 设置。

但在你在问题中展示的示例中,似乎你想在后台执行的操作是上传文件。这样的任务可能很适合基于 BackgroundTasks 的并发,因为它是 I/O 绑定的。启动另一个进程会引入额外的开销,可能会使它比 BackgroundTasks 所做的事情效率低。

此外,你在代码中没有展示何时以及如何 加入 那个新进程。这在 多进程的指南 中提到是重要的:

[...] 当进程结束但尚未加入时,它变成了僵尸。[...] 明智的做法可能是明确加入你启动的所有进程。

只是启动它然后忘记它可能是一个糟糕的主意,特别是在 每次请求该路由时

并且子进程不能只是 join 自己,因为那会导致死锁。


技术区别

正如你所知,FastAPI 的后台任务只是 Starlette 中 BackgroundTasks 类的重新导入(参见 文档)。FastAPI 只是以一种方式将它们整合到其路由处理设置中,用户无需在任何时候显式返回它们。

但是 Starlette 文档 明确指出该类是

用于进程内后台任务。

如果我们看一下源代码,我们会发现它的底层 __call__ 实现实际上只做了两件事:

  1. 如果你传递的函数是异步的,它简单地 await 它。
  2. 如果你传递的函数是一个“常规”函数(不是 async),它在一个线程池中运行。 (如果深入挖掘,你会发现它利用了 anyio.to_thread.run_sync 协程。)

这意味着在任何时候都没有另一个进程参与。在情况 1)中,它甚至在与应用程序的其余部分相同的事件循环上安排,这意味着所有操作都在 一个线程 中进行。而在情况 2)中,附加的线程执行操作。

如果你在 Python 中有处理并发的经验,这些影响非常明显:如果你想在那里执行 CPU 绑定的操作,不要 使用 BackgroundTasks。它们会完全阻塞你的应用程序,因为它们要么 1)阻塞了唯一可用的线程中的事件循环,要么 2)导致全局解释器锁(GIL)锁定主线程。


合法用例

另一方面,如果你的任务执行一些 I/O 绑定的操作(文档中给出的一个例子是连接到电子邮件服务器发送邮件,在请求处理之后),BackgroundTasks 机制非常方便。

BackgroundTasks 相对于自定义设置的主要优势在于,你无需担心协程将在何时以及如何被等待,或者线程何时加入。所有这些都在路由处理程序的后面抽象掉了。你只需指定你希望在响应之后的 某个时间 执行的函数是什么。

可能 只需在路由处理程序函数结束前调用 asyncio.create_task。这 可能 会在请求处理后立即安排任务,并使其在后台运行。但有三个问题:

  1. 没有 保证 它会立即安排。如果有很多请求正在处理,可能需要一些时间。
  2. 除非你在路由处理程序之外开发某种机制来跟踪它,否则你无法实际 await 该任务并确保它实际完成(如预期或出现错误)。
  3. 由于事件循环仅保留对任务的弱引用,这样的任务可能会在完成之前被垃圾回收。 (这意味着它将直接消失。)
英文:

TL;DR

Those background tasks will always execute in the same process as your main application. They will either just run asynchronously on the event loop or in a separate thread.

For operations that are not primarily I/O, you should probably avoid using them and use multiprocessing instead.


Details

Use multiprocessing (correctly), if you want

> I fail to understand why I couldn't just use multiprocessing.

Not only does the documentation not discourage using multiprocessing, the FastAPI docs explicitly suggest it for computation intensive tasks.

Quote: (emphasis mine)

> If you need to perform heavy background computation and you don't necessarily need it to be run by the same process (for example, you don't need to share memory, variables, etc), you might benefit from using other bigger tools [...].

So you can. And if you want to do CPU-bound work in the background, you almost certainly have to use your own multiprocessing setup.

But in the example you showed in your question, it seems that the operation you want to perform in the background is to upload a file somewhere. Such a task will probably lend itself well to BackgroundTasks-based concurrency because it is I/O-bound. Spawning another process introduces additional overhead that might make it less efficient than what the BackgroundTasks do.

Also, you did not show in your code, when and how you are joining that new process. This is important and mentioned in the guidelines for multiprocessing:

> [...] when a process finishes but has not been joined it becomes a zombie. [...] it is probably good practice to explicitly join all the processes that you start.

Just spawning it and forgetting about it is probably a terrible idea, especially when that happens every time that route is requested.

And a child process can not just join itself because that would cause a deadlock.


Technical distinctions

As you know, the FastAPI background tasks are just a re-import of the BackgroundTasks class from Starlette (see docs). FastAPI just integrates them into its route handling setup in such a way that the user does not need to explicitly return them at any point.

But the Starlette docs clearly state that the class is

> for in-process background tasks.

And if we take a look at the source, we can see that under the hood it's __call__ implementation really just does one of two things:

  1. If the function you passed is asynchronous, it simply awaits it.
  2. If the function you passed is a "regular" function (not async), it runs it in a thread-pool. (If you go deeper, you'll see that it utilizes the anyio.to_thread.run_sync coroutine.)

This means that at no point is there another process in play. In case 1) it is even scheduled on the same exact event loop as the rest of the application, which means it is all happening in one thread. And in case 2), an additional thread performs the operation.

The implications are very obvious, if you have some experience dealing with concurrency in Python: Do not use BackgroundTasks, if you want to perform CPU-bound operations there. Those would completely block your application because they will either 1) block the event loop in the only available thread or 2) cause the GIL to lock up the main thread.


Legitimate use cases

On the flip side, if your tasks perform some I/O-bound operations (an example given in the docs is connecting to an email server to send something, after the request was processed), the BackgroundTasks machinery is very convenient.

The main benefit of BackgroundTasks to a custom setup in my opinion is that you do not need to worry about how and when exactly the coroutines will be awaited or the threads joined. That is all abstracted away behind the route handler. You just need to specify what function you want executed some time after the response.

You could just e.g. call asyncio.create_task just before the end of your route handler function. That would probably schedule the task right after the request is processed and effectively make it run in the background. But there are three problems with that:

  1. There is no guarantee it will be scheduled immediately after. It may take a while, if there are a lot of requests being processed.
  2. You have no chance to actually await that task and ensure it actually finishes (as expected or with an error), unless you develop some mechanism yourself to keep track of it outside the route handler.
  3. Since the event loop only keeps weak references to tasks, such a task might get garbage collected before it is finished. (That means it will just straight up disappear.)

答案2

得分: 1

Multiprocessing Process 可以充分利用可用的硬件资源,比如多个 CPU 内核。通过将工作负载分布到不同进程中,可以利用并行性,实现更快的执行时间。

FastAPI 中的 BackgroundTask 功能在处理 HTTP 请求时,当您想要在后台异步执行某些函数或方法时非常有用。它允许您安排并执行可能需要更长时间完成或涉及 I/O 操作的任务,而不会阻塞 API 响应。它在处理 I/O 绑定任务或定期/计划任务时非常有用。

然而,您可以将它们结合使用,在 FastAPI 应用程序中实现任务的并行性和异步执行。

def handleFileUpload(file: UploadFile) -> None:
    print(file)

def check_worker_status(p: Process) -> None:
    while p.is_alive():
        print('Worker is still running...')
        time.sleep(5)
    p.terminate()
    print('Worker terminated')

@router.post("/uploadFileToS3")
async def uploadToS3(background_task: BackgroundTasks, bucket: str, file: UploadFile = File(...)) -> dict:
    uploadProcess = Process(target=handleFileUpload, args=(file,))
    uploadProcess.start()
    background_task.add_task(check_worker_status, uploadProcess)
    return {"message": "File uploaded successfully"}

multiprocessing.Process 类的 join() 方法用于阻塞调用它的进程,直到调用 join() 方法的进程终止。如果要避免阻塞调用进程,可以使用 multiprocessing.Process 类的 is_alive() 方法来检查进程是否仍在运行,然后使用同一类的 terminate() 方法来终止它。

英文:

Multiprocessing Process enables you to make full use of available hardware resources, such as multiple CPU cores. By distributing workload across processes, you can take advantage of parallelism and achieve faster execution times.

The BackgroundTask feature in FastAPI is useful when you want to execute certain functions or methods asynchronously in the background while handling HTTP requests. It allows you to schedule and perform tasks that might take longer to complete or involve I/O operations without blocking the API response. It is useful to use on I/O bound tasks or Periodic/Scheduled Tasks.

However, you can use both together to achieve parallelism and asynchronous execution of tasks in a FastAPI application.

def handleFileUpload(file: UploadFile) -> None:
    print(file)

def check_worker_status(p: Process) -> None:
    while p.is_alive():
        print('Worker is still running...')
        time.sleep(5)
    p.terminate()
    print('Worker terminated')

@router.post("/uploadFileToS3")
async def uploadToS3(background_task: BackgroundTasks, bucket: str, file: UploadFile = File(...)) -> dict:
    uploadProcess = Process(target=handleFileUpload, args=(file,))
    uploadProcess.start()
    background_task.add_task(check_worker_status, uploadProcess)
    return {"message": "File uploaded successfully"}

The join() method of the multiprocessing.Process class is used to block the calling process until the process whose join() method is called terminates. If you want to avoid blocking the calling process, you can use the is_alive() method of the multiprocessing.Process class to check if the process is still running and then terminate it using the terminate() method of the same class.

huangapple
  • 本文由 发表于 2023年5月25日 12:17:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/76328904.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定