如何强制按接收顺序处理异步消息处理程序?

huangapple go评论74阅读模式
英文:

How to enforce in-order-of-receival processing of async message handlers?

问题

以下是您要翻译的内容:

假设我们有一个消息处理程序。消息可能会从TCP套接字接收并分派给处理程序:

async def on_recv(msg):
    await handle(msg)

while True:
    msg = await socket.recv()
    await on_recv(msg)

这是有问题的,因为handle可能会花费一些时间或发送另一条消息并等待套接字的响应,从而导致死锁(on_recv没有返回!)。为了防止这种情况发生,我们可以将处理程序设置为任务,以便它不会阻塞我们接收下一条消息:

async def on_recv(msg):
    asyncio.create_task(handle(msg))

然而,在这里,我相信我们失去了有关处理顺序的保证(具体来说是任务启动的顺序)。假设我想确保每个处理程序在消息接收的顺序中开始执行。我应该如何做到这一点?我的目标是引入更确定的行为(当处理程序遇到IO边界时,这一点没有被保证是可以的)。

在寻找解决方案时,这似乎像一个难题。我尝试的每一种方法都可能导致消息潜在地不会开始,或者涉及到一个调度程序,该调度程序本身具有与原始调度程序相同的问题(将顺序委托给asyncio.Task)。

英文:

Suppose we have a message handler. Messages might be received from a TCP socket and dispatched to a handler:

async def on_recv(msg):
    await handle(msg)

while True:
    msg = await socket.recv()
    await on_recv(msg)

This is problematic because handle may take some time or send another message and wait on the socket for a response, and become deadlocked (on_recv did not return!). To prevent this, we may make the handler a task so that it does not block us from receiving the next message:

async def on_recv(msg):
    asynico.Task(handle(msg))

However, here I believe its true that we lose guarantees about the order of processing (specifically the order in which the tasks start).
Suppose I wanted to make sure that each handler starts is execution in the order of message receival. How might I do that?
My goal here to introduce more deterministic behavior (it's OK where this isn't guaranteed when the handler hits IO boundaries).

In trying to find a solution, it looks like finger puzzle. Everything I try here either leaves messages potentially not starting, involved an scheduler which itself has the same issue of the orginal dispatcher (cedeing order to an asyncio.Task).

答案1

得分: 1

以下是您要翻译的内容:

"常见模式是与任意数量的生产者(将内容放入队列的)和消费者(从队列中取内容的)一起使用队列。"

"在这种情况下,显而易见的选择是asyncio.Queue。由于队列保持顺序,您可以(例如)让您的主协程收集消息并将它们放入队列中(从而设置顺序),然后一个消费者任务异步消费队列。"

"这里有一个小的演示给您:"

from asyncio import CancelledError, Queue, create_task, run, sleep
from random import randint

async def receive_message(number: int) -> str:
    seconds = randint(1, 5)
    await sleep(seconds)
    message = f"message {number}"
    print(f"{message} took {seconds} seconds to receive")
    return message

async def handle(message: str) -> None:
    seconds = randint(1, 5)
    await sleep(seconds)
    print(f"processed: {message} (took {seconds} seconds)")

async def consumer(q: Queue) -> None:
    while True:
        try:
            next_message = await q.get()
        except CancelledError:
            print("stopping consumer...")
            return
        await handle(next_message)

async def main() -> None:
    q = Queue()
    consumer_task = create_task(consumer(q))
    for i in range(5):
        message = await receive_message(i)
        q.put_nowait(message)
    await consumer_task  # will block forever

if __name__ == "__main__":
    run(main())

示例输出:(在最后一个processed消息之后使用 SIGINT 停止)

message 0 took 1 seconds to receive
processed: message 0 (took 1 seconds)
message 1 took 2 seconds to receive
message 2 took 1 seconds to receive
processed: message 1 (took 5 seconds)
message 3 took 4 seconds to receive
message 4 took 2 seconds to receive
processed: message 2 (took 3 seconds)
processed: message 3 (took 2 seconds)
processed: message 4 (took 2 seconds)
stopping consumer...

如您可以通过一些实验轻松验证的那样(并在理论上证明),处理将始终按照接收消息的顺序进行(从04)。

英文:

A common pattern is to use a queue together with any number of producers (that put stuff into the queue) and consumers (that take stuff out of the queue).

In this case the obvious choice would be the asyncio.Queue. Since a queue preserves order, you can have (for example) your main coroutine collect messages and put them in the queue (thus setting the order) and one consumer task asynchronously consuming the queue.

Here is a little demo for you:

from asyncio import CancelledError, Queue, create_task, run, sleep
from random import randint

async def receive_message(number: int) -> str:
    seconds = randint(1, 5)
    await sleep(seconds)
    message = f"message {number}"
    print(f"{message} took {seconds} seconds to receive")
    return message

async def handle(message: str) -> None:
    seconds = randint(1, 5)
    await sleep(seconds)
    print(f"processed: {message} (took {seconds} seconds)")

async def consumer(q: Queue) -> None:
    while True:
        try:
            next_message = await q.get()
        except CancelledError:
            print("stopping consumer...")
            return
        await handle(next_message)

async def main() -> None:
    q = Queue()
    consumer_task = create_task(consumer(q))
    for i in range(5):
        message = await receive_message(i)
        q.put_nowait(message)
    await consumer_task  # will block forever

if __name__ == "__main__":
    run(main())

Sample output: (stopped with SIGINT after the last processed message)

message 0 took 1 seconds to receive
processed: message 0 (took 1 seconds)
message 1 took 2 seconds to receive
message 2 took 1 seconds to receive
processed: message 1 (took 5 seconds)
message 3 took 4 seconds to receive
message 4 took 2 seconds to receive
processed: message 2 (took 3 seconds)
processed: message 3 (took 2 seconds)
processed: message 4 (took 2 seconds)
stopping consumer...

As you can easily verify with a few experiments (and prove theoretically), the processing will always be in the order the messages are received (0 to 4).

huangapple
  • 本文由 发表于 2023年5月8日 00:56:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/76195211.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定