使用loop.run_in_executor从异步函数中调用同步函数。

huangapple go评论82阅读模式
英文:

Using loop.run_in_executor to call sync functions from async ones

问题

我有3个函数:func_1func_2func_3。我想要以异步方式运行它们,这样我就不必等待func_1完成后再执行func_2

问题是,例如,func_1的定义看起来像这样:

  1. async def func_1(a, b):
  2. x = some_sync_func(a)
  3. y = some_other_sync_func(b)
  4. z = another_sync_func(x, y)
  5. return yet_another_sync_func(z)

我在func_1中调用的函数都是同步函数,不可等待。因此,它们将阻塞func_2func_3的执行。

我在这里阅读到可以使用loop.run_in_executor()来从异步函数中调用同步函数而不阻塞执行。因此,我修改了func_1的定义如下:

  1. async def func_1(a, b):
  2. loop = asyncio.get_event_loop()
  3. x = await loop.run_in_executor(None, some_sync_func, a)
  4. y = await loop.run_in_executor(None, some_other_sync_func, b)
  5. z = await loop.run_in_executor(None, lambda: another_sync_func(a,b))
  6. w = await loop.run_in_executor(None, yet_another_sync_func, z)
  7. return w

这是处理这个问题的正确方法吗?我是否正确使用了loop.run_in_executor()这里的文档提供了一个示例,似乎支持这种方法。我不知道线程是什么,也不知道"进程池"是什么,而且一直没有能够很好地理解文档。

英文:

I have 3 functions: func_1, func_2, and func_3. I would like to run these asynchronously, so that I do not have to wait for func_1 to finish before func_2 starts executing.

The problem is, that the definition of func_1 for example looks something like this:

  1. async def func_1(a, b):
  2. x = some_sync_func(a)
  3. y = some_other_sync_func(b)
  4. z = another_sync_func(x, y)
  5. return yet_another_sync_func(z)

The functions that I am calling within func_1 are all synchronous functions which are non-awaitable. Thus, they will block the execution of func_2 and func_3.

I read here that loop.run_in_executor() can be used to call synchronous functions from asynchronous functions without blocking the execution.
Thus, I modified the definition of func_1 as follows:

  1. async def func_1(a, b):
  2. loop = asyncio.get_event_loop()
  3. x = await loop.run_in_executor(None, some_sync_func, a)
  4. y = await loop.run_in_executor(None, some_other_sync_func, b)
  5. z = await loop.run_in_executor(None, lambda: another_sync_func(a,b))
  6. w = await loop.run_in_executor(None, yet_another_sync_func, z)
  7. return w

Is this the right way to deal with this problem? Am I using loop.run_in_executor() correctly?
Here, the docs provide an example which seems to support this. I don't know what threads are, or what a "process pool" is, and haven't really been able to make much sense of the docs.

答案1

得分: 1

几乎正确,但由于您在每次函数调用时都急切地等待,因此在每种情况下(在等待之后)下一行代码只有在带有await的行执行完成后才会被调用。

然而,如果您从其他地方并行调用func_1,那么两个func_1的实例将并行工作(我几乎确定这不是您想要的)。

因此,为了让这些其他函数实际并行运行(在其他线程中),您必须创建要运行每个函数的任务,但不立即等待它,而是收集所有要并行运行的任务,然后一次性等待它们(通常使用一个名为gather的适当命名的函数):

  1. ...
  2. async def func_1(a, b):
  3. loop = asyncio.get_event_loop()
  4. task_x = loop.run_in_executor(None, some_sync_func, a)
  5. task_y = loop.run_in_executor(None, some_other_sync_func, b)
  6. task_z = loop.run_in_executor(None, lambda: another_sync_func(a, b))
  7. x, y, z = await asyncio.gather(task_x, task_y, task_z)
  8. # 这取决于`z`,因此它不包括在gather中。
  9. # 如果它的返回值不重要,您可以省略等待,返回任务,然后稍后某个时候等待它。
  10. w = await loop.run_in_executor(None, yet_another_sync_func, z)
  11. return w
  12. ...
英文:

Almost right, but since you are awaiting eagerly at each function call, the next line of code in each case (after the await) will only be called when the line with await finishes execution.

However if you call func_1 in parallel from some other place, two instances of func_1 will work in parallel. (I am almost sure that is not what you want).

So, in order for these other functions to actually run in parallel (in other threads), you have to create the task to run each of them, but not await it immediately, instead, you gather all the tasks you want to run in parallel and await for them at once (usually with a function properly named gather ):

  1. ...
  2. async def func_1(a, b):
  3. loop = asyncio.get_event_loop()
  4. task_x = loop.run_in_executor(None, some_sync_func, a)
  5. task_y = loop.run_in_executor(None, some_other_sync_func, b)
  6. task_z = loop.run_in_executor(None, lambda: another_sync_func(a,b))
  7. x, y, z = await asyncio.gather(task_x, task_y, task_z)
  8. # this depends on `z` so, it is not included in the gather.
  9. # if its return value is not important, you can ommit the
  10. # await, return the task, and await for it sometime later.
  11. w = await loop.run_in_executor(None, yet_another_sync_func, z)
  12. return w
  13. ...

huangapple
  • 本文由 发表于 2023年6月11日 21:07:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/76450603.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定