如何使用Python的异步方式并发运行阻塞操作循环?

huangapple go评论71阅读模式
英文:

How to run blocking operations loops concurrently using python async?

问题

I have two unrelated blocking operations that listen to different events. When any of them return, I need to do an appropriate handling of the underlying event they raised.

For some reason, no matter how I schedule them using AsyncIO, I never get to run them concurrently. Apparently, receive_json() seem to block indefinitely whenever the other loop is running; which is why, I suspect a concurrency problem on the websocket or the async loop without being able to really pinpoint what is or how to solve the issue.

My current code is illustrated below in this simplified example, but I've also tried other asyncio interfaces like running them in a single loop, using timeouts, or using asycio.wait() without any more success.

The techs used are uvicorn as a ASGI server, FastApi for web interface, Redis pubsub (redis-py connector) as one awaitable and the starlette Websocket as the other. They run in a docker container, hosted on a windows machine if that's of any interest.

英文:

I have two unrelated blocking operations that listen to different events. When any of them return, I need to do an appropriate handling of the underlying event they raised.

For some reason, no matter how I schedule them using AsyncIO, I never get to run them concurrently. Apparently, receive_json() seem to block indefinitely whenever the other loop is running; which is why, I suspect a concurrency problem on the websocket or the async loop without being able to really pinpoint what is or how to solve the issue.

My current code is illustrated below in this simplified example, but I've also tried other asyncio interfaces like running them in a single loop, using timeouts, or using asycio.wait() without any more success.

The techs used are uvicorn as a ASGI server, FastApi for web interface, Redis pubsub (redis-py connector) as one awaitable and the starlette Websocket as the other. They run in a docker container, hosted on a windows machine if that's of any interest.


async def await_redis(p):
    return str(p.get_message(timeout=None))

@router.websocket('/'):
def ws_endpoint(websocket Websocket):
    async def ws_loop():
        while True:
            data = await websocket.receive_json() # Blocks here whenever rd_loop runs
            messages = await handler(data)
            r.publish('some-channel', messages)

    async def rd_loop():
        r = Redis('host')
        p = r.pubsub('some-channel')
        while True:
            mess = await await_redis(p)
            if mess:
                await websocket.send_json([mess])
    # The strange thing is if rd_loop exits because of exception,
    # ws_loop starts to receive and handle messages.
    await asyncio.gather(ws_loop(), rd_loop()) 

答案1

得分: 2

The await_redis 函数会阻塞事件循环,redis-py 库的 get_message 方法不是异步的,所以会阻塞事件循环。让我们尝试使用 aioredis 库而不是 redis-py

首先,我们要安装它:pip install aioredis,然后这是您修改后的代码:

import aioredis

async def await_redis(p):
    return str(await p.get_message())

@router.websocket('/')
async def ws_endpoint(websocket: WebSocket):
    async def ws_loop():
        while True:
            data = await websocket.receive_json()
            messages = await handler(data)
            await r.publish('some-channel', messages)

    async def rd_loop():
        r = await aioredis.create_redis('redis://host')
        p = await r.pubsub()
        await p.subscribe('some-channel')
        while True:
            mess = await await_redis(p)
            if mess:
                await websocket.send_json([mess])

    # 奇怪的是,如果 rd_loop 因异常退出,
    # ws_loop 开始接收和处理消息。
    await asyncio.gather(ws_loop(), rd_loop())

希望这对您有帮助。

英文:

The await_redis function is blocking the event loop, get_message method from the redis-py library is not async, so it block the event loop. Lets try with aioredis library instead of redis-py.

First we install it pip install aioredis then here is your modified code:

import aioredis

async def await_redis(p):
    return str(await p.get_message())

@router.websocket('/')
async def ws_endpoint(websocket: WebSocket):
    async def ws_loop():
        while True:
            data = await websocket.receive_json()
            messages = await handler(data)
            await r.publish('some-channel', messages)

    async def rd_loop():
        r = await aioredis.create_redis('redis://host')
        p = await r.pubsub()
        await p.subscribe('some-channel')
        while True:
            mess = await await_redis(p)
            if mess:
                await websocket.send_json([mess])

    # The strange thing is if rd_loop exits because of exception,
    # ws_loop starts to receive and handle messages.
    await asyncio.gather(ws_loop(), rd_loop())

huangapple
  • 本文由 发表于 2023年4月17日 03:40:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/76029974.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定