call_soon_threadsafe不会在async函数内部调用该函数。

huangapple go评论129阅读模式
英文:

call_soon_threadsafe never call the function if it is inside an async function

问题

我正在使用一个第三方库,该库将在另一个随机时间从另一个线程调用我提供的函数。它可以被建模为在另一个线程中延迟调用函数。

我想要它调用的函数是一个异步函数。该库没有专门用于异步函数的接口,而且我不希望像asyncio.run那样阻塞代码。总结一下:

我需要一种从非异步函数在后台运行异步函数的方法,该非异步函数将从另一个线程调用。

我在这个帖子中提出了这个问题,并提出了一个几乎完美的解决方案,直到遇到以下问题。

如果异步函数尝试以相同的方式运行另一个异步函数,它将无法工作。原因是call_soon_threadsafe永远不会在异步函数内部调用回调函数。

这是可以重现问题的代码:

import asyncio
import threading
import time
from typing import Coroutine, Any
import logging

logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, datefmt='%H:%M:%S')

coro_queue: asyncio.Queue[Coroutine[Any, Any, Any]] = asyncio.Queue()
task: asyncio.Task[Any] | None = None
task_ready = asyncio.Event()
loop: asyncio.AbstractEventLoop | None = None

async def start_coro_queue() -> None:
    global task, loop
    loop = asyncio.get_event_loop()
    while True:
        coro = await coro_queue.get()
        task = asyncio.create_task(coro)
        task_ready.set()
        
def put_coro_in_queue(coro: Coroutine[Any, Any, Any]) -> None:
    logging.info("put_coro_in_queue called.")
    coro_queue.put_nowait(coro)
    logging.info("put_coro_in_queue finished.")
        
def run_coro_in_background(coro: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
    logging.info("run_coro_in_background called")
    global task
    assert loop is not None
    task_ready.clear()
    future = asyncio.run_coroutine_threadsafe(task_ready.wait(), loop)
    loop.call_soon_threadsafe(put_coro_in_queue, coro)
    future.result()
    assert task is not None
    output = task
    task = None
    logging.info("run_coro_in_background finished")
    return output

async def async_func() -> None:
    logging.info("async_func called.")
    await asyncio.sleep(2)
    logging.info("async_func finished.")
    
def delayed_async_func() -> None:
    logging.info("delayed_async_func called")
    time.sleep(5)
    run_coro_in_background(async_func())
    logging.info("delayed_async_func finished.")
    
async def nested_async_func() -> None:
    logging.info("nested_async_func called")
    await asyncio.sleep(3)
    run_coro_in_background(async_func())
    logging.info("nested_async_func finished")
    
def delayed_nested_async_func() -> None:
    logging.info("delayed_nested_async_func called")
    time.sleep(4)
    run_coro_in_background(nested_async_func())
    logging.info("delayed_nested_async_func finished")

async def main() -> None:
    t = threading.Thread(target=delayed_nested_async_func)
    t.start()
    await start_coro_queue()

asyncio.run(main())

结果是:

19:25:51 delayed_nested_async_func called
19:25:55 run_coro_in_background called
19:25:55 put_coro_in_queue called.
19:25:55 put_coro_in_queue finished.
19:25:55 nested_async_func called
19:25:55 run_coro_in_background finished
19:25:55 delayed_nested_async_func finished
19:25:58 run_coro_in_background called

然后它永远停在那里。

预期的输出应包括:

19:25:58 put_coro_in_queue called
19:25:58 put_coro_in_queue finished
19:25:58 async_func called
19:26:00 async_func finished

通过将main更改为非嵌套情况来测试:

async def main() -> None:
    t = threading.Thread(target=delayed_async_func)
    t.start()
    await start_coro_queue()

一切都按预期运行。

19:34:00 delayed_async_func called
19:34:05 run_coro_in_background called
19:34:05 put_coro_in_queue called.
19:34:05 put_coro_in_queue finished.
19:34:05 async_func called.
19:34:05 run_coro_in_background finished
19:34:05 delayed_async_func finished.
19:34:07 async_func finished.
英文:

I am working with a third party library that will call a function I gave it from another thread at some random time. It can be modelled as a delayed function call in another thread.

The function I want it to call is an async function. The library does not have a special interface for async function, also I do not want it to block the code like asyncio.run. To summarize it:

I need a way to run an async function in the background from a non-async function that will be called from another thread.

I asked the question in this post, and came up with a nearly perfect solution, until I run into the following problem.

If the async function tries to run another async function in the same way, it will not work. The cause is call_soon_threadsafe will never call the callback function if it is inside an async function.

This is the code that can recreate the problem:

import asyncio
import threading
import time
from typing import Coroutine, Any
import logging
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, datefmt='%H:%M:%S')
coro_queue: asyncio.Queue[Coroutine[Any, Any, Any]] = asyncio.Queue()
task: asyncio.Task[Any] | None = None
task_ready = asyncio.Event()
loop: asyncio.AbstractEventLoop | None = None
async def start_coro_queue() -> None:
global task, loop
loop = asyncio.get_event_loop()
while True:
coro = await coro_queue.get()
task = asyncio.create_task(coro)
task_ready.set()
def put_coro_in_queue(coro: Coroutine[Any, Any, Any]) -> None:
logging.info("put_coro_in_queue called.")
coro_queue.put_nowait(coro)
logging.info("put_coro_in_queue finished.")
def run_coro_in_background(coro: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
logging.info("run_coro_in_background called")
global task
assert loop is not None
task_ready.clear()
future = asyncio.run_coroutine_threadsafe(task_ready.wait(), loop)
loop.call_soon_threadsafe(put_coro_in_queue, coro)
future.result()
assert task is not None
output = task
task = None
logging.info("run_coro_in_background finished")
return output
async def async_func() -> None:
logging.info("async_func called.")
await asyncio.sleep(2)
logging.info("async_func finished.")
def delayed_async_func() -> None:
logging.info("delayed_async_func called")
time.sleep(5)
run_coro_in_background(async_func())
logging.info("delayed_async_func finished.")
async def nested_async_func() -> None:
logging.info("nested_async_func called")
await asyncio.sleep(3)
run_coro_in_background(async_func())
logging.info("nested_async_func finished")
def delayed_nested_async_func() -> None:
logging.info("delayed_nested_async_func called")
time.sleep(4)
run_coro_in_background(nested_async_func())
logging.info("delayed_nested_async_func finished")
async def main() -> None:
t = threading.Thread(target=delayed_nested_async_func)
t.start()
await start_coro_queue()
asyncio.run(main())

The result is:

19:25:51 delayed_nested_async_func called
19:25:55 run_coro_in_background called
19:25:55 put_coro_in_queue called.
19:25:55 put_coro_in_queue finished.
19:25:55 nested_async_func called
19:25:55 run_coro_in_background finished
19:25:55 delayed_nested_async_func finished
19:25:58 run_coro_in_background called

Then it stuck forever.

The expected output should contain:

19:25:58 put_coro_in_queue called
19:25:58 put_coro_in_queue finished
19:25:58 async_func called
19:26:00 async_func finished

When test the non-nested case by changing main into:

async def main() -> None:
t = threading.Thread(target=delayed_async_func)
t.start()
await start_coro_queue()

Everything runs as expected.

19:34:00 delayed_async_func called
19:34:05 run_coro_in_background called
19:34:05 put_coro_in_queue called.
19:34:05 put_coro_in_queue finished.
19:34:05 async_func called.
19:34:05 run_coro_in_background finished
19:34:05 delayed_async_func finished.
19:34:07 async_func finished.

答案1

得分: 1

Your thread t runs run_coro_in_background, and that's fine, because it's running in a different thread than the event loop.

Then nested_async_func tries to run run_coro_in_background in the event loop. run_coro_in_background was not written to be safely run from inside the event loop. It tries to do

future.result()

which deadlocks, because it's pausing the entire thread the event loop is running in, but it's waiting for another task to finish, and that task will never finish if the event loop is hung.

英文:

Your thread t runs run_coro_in_background, and that's fine, because it's running in a different thread than the event loop.

Then nested_async_func tries to run run_coro_in_background in the event loop. run_coro_in_background was not written to be safely run from inside the event loop. It tries to do

future.result()

which deadlocks, because it's pausing the entire thread the event loop is running in, but it's waiting for another task to finish, and that task will never finish if the event loop is hung.

huangapple
  • 本文由 发表于 2023年3月9日 19:37:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/75684073.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定