如何等待队列或事件?

huangapple go评论67阅读模式
英文:

How should I wait for a queue or an event?

问题

在Python中,我想知道如何等待queue.get()event.wait()中的第一个事件。

目前,我正在使用asyncio.wait()来实现这一点,但这会产生一个弃用警告。我不明白我应该如何修改我的代码,以便它与未来版本的Python兼容。

以下代码是可用的,但会产生警告DeprecationWarning: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8, and scheduled for removal in Python 3.11.

import random
import asyncio

event = asyncio.Event()
queue = asyncio.Queue()

async def producer():
    for i in range(5):
        print(f"Putting {i}")
        await queue.put(i)
        await asyncio.sleep(random.random())
        # Check if we should terminate
        if event.is_set():
            break
    print("Producer done")

async def terminator():
    await asyncio.sleep(random.random() * 5)
    print("Terminating")
    event.set()
    print("Terminator done")

async def consumer():
    while True:
        print("Waiting on the result of either the queue or the event")
        done, _ = await asyncio.wait(
            [queue.get(), event.wait()],
            return_when=asyncio.FIRST_COMPLETED
        )
        # Check if we should terminate
        if event is set():
            break
        # Otherwise, we got a queue item
        item = done.pop().result()
        print(f"got {item}")
    print("Consumer done")

async def main():
    await asyncio.gather(producer(), terminator(), consumer())

asyncio.run(main())

示例输出:

Putting 0
Waiting on the result of either the queue or the event
got 0
Waiting on the result of either the queue or the event
Putting 1
got 1
Waiting on the result of either the queue or the event
Terminating
Terminator done
Consumer done
Producer done
英文:

In Python, I would like to know how to wait for the first of either queue.get() or event.wait().

At the moment, I am using asyncio.wait() to achieve this, but this is producing a deprecation warning. I do not understand how I should alter my code so that it will be compatible with future versions of Python.

The following code is functional, however it gives the warning DeprecationWarning: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8, and scheduled for removal in Python 3.11.

import random
import asyncio

event = asyncio.Event()
queue = asyncio.Queue()

async def producer():
    for i in range(5):
        print(f"Putting {i}")
        await queue.put(i)
        await asyncio.sleep(random.random())
        # Check if we should terminate
        if event.is_set():
            break
    print("Producer done")

async def terminator():
    await asyncio.sleep(random.random() * 5)
    print("Terminating")
    event.set()
    print("Terminator done")

async def consumer():
    while True:
        print(f"Waiting on the result of either the queue or the event")
        done, _ = await asyncio.wait(
            [queue.get(), event.wait()],
            return_when=asyncio.FIRST_COMPLETED
        )
        # Check if we should terminate
        if event.is_set():
            break
        # Otherwise, we got a queue item
        item = done.pop().result()
        print(f"got {item}")
    print("Consumer done")

async def main():
    await asyncio.gather(producer(), terminator(), consumer())

asyncio.run(main())

Example output:

Putting 0
Waiting on the result of either the queue or the event
got 0
Waiting on the result of either the queue or the event
Putting 1
got 1
Waiting on the result of either the queue or the event
Terminating
Terminator done
Consumer done
Producer done

答案1

得分: 3

以下是您提供的代码的翻译部分:

import asyncio

async def producer(queue: asyncio.Queue, event: asyncio.Event) -> None:
    for i in range(5):
        print(f"Putting {i}")
        await queue.put(i)
        await asyncio.sleep(1)

        if event.is_set():  # Check if we should terminate
            break

    print("Producer done")

async def terminator(event: asyncio.Event) -> None:
    await asyncio.sleep(3)

    print("Terminating")
    event.set()
    print("Terminator done")

async def consumer(queue: asyncio.Queue, event: asyncio.Event) -> None:
    while True:
        print("Waiting on the result of either the queue or the event")

        # you should create Tasks, rather than just sending coroutines to asyncio.wait
        # It was the reason of Warning message
        task_queue_get = asyncio.create_task(queue.get())
        task_event_get = asyncio.create_task(event.wait())

        done_tasks, pending_tasks = await asyncio.wait(
            [task_queue_get, task_event_get],
            return_when=asyncio.FIRST_COMPLETED
        )

        # Check if we should terminate
        if event.is_set():
            [i.cancel() for i in pending_tasks]   # cancel pending tasks
            # return_exceptions=True - prevent raise of asyncio.CancelledError
            await asyncio.gather(*pending_tasks, return_exceptions=True)
            break

        # Otherwise, we got a queue item
        try:
            res = [i.result() for i in done_tasks]
            # Attention!
            # only one task is expected, but can cause errors,
            # if result has exception instead of normal result inside
            item = res[0]
            print(f"got {item}")
        except Exception as ex:
            print("Unexpected Exception!!!")
            raise ex

        [i.cancel() for i in pending_tasks]  # cancel pending tasks
        # return_exceptions=True - prevent raise of asyncio.CancelledError
        await asyncio.gather(*pending_tasks, return_exceptions=True)

    print("Consumer done")

async def main():
    # event and queue should be created inside main,
    # otherwise could cause "different loops" conflict
    event = asyncio.Event()
    queue = asyncio.Queue()
    await asyncio.gather(
        producer(queue=queue, event=event),
        terminator(event=event),
        consumer(queue=queue, event=event),
    )

if __name__ == '__main__':
    asyncio.run(main())

请注意,翻译的内容中代码部分没有翻译。

英文:

I made the following solution, please check and advise, I left many comments to explain what I do and what potential problem I found in your code:

import asyncio


async def producer(queue: asyncio.Queue, event: asyncio.Event) -> None:
    for i in range(5):
        print(f"Putting {i}")
        await queue.put(i)
        await asyncio.sleep(1)

        if event.is_set():  # Check if we should terminate
            break

    print("Producer done")


async def terminator(event: asyncio.Event) -> None:
    await asyncio.sleep(3)

    print("Terminating")
    event.set()
    print("Terminator done")


async def consumer(queue: asyncio.Queue, event: asyncio.Event) -> None:
    while True:
        print(f"Waiting on the result of either the queue or the event")

        # you should create Tasks, rather than just sending coroutines to asyncio.wait
        # It was the reason of Warning message
        task_queue_get = asyncio.create_task(queue.get())
        task_event_get = asyncio.create_task(event.wait())

        done_tasks, pending_tasks = await asyncio.wait(
            [task_queue_get, task_event_get],
            return_when=asyncio.FIRST_COMPLETED
        )

        # Check if we should terminate
        if event.is_set():
            [i.cancel() for i in pending_tasks]   # cancel pending tasks
            # return_exceptions=True - prevent raise of asyncio.CancelledError
            await asyncio.gather(*pending_tasks, return_exceptions=True)
            break

        # Otherwise, we got a queue item
        try:
            res = [i.result() for i in done_tasks]
            # Attention!
            # only one task is expected, but can cause errors,
            # if result has exception instead of normal result inside
            item = res[0]
            print(f"got {item}")
        except Exception as ex:
            print("Unexpected Exception!!!")
            raise ex

        [i.cancel() for i in pending_tasks]  # cancel pending tasks
        # return_exceptions=True - prevent raise of asyncio.CancelledError
        await asyncio.gather(*pending_tasks, return_exceptions=True)

    print("Consumer done")


async def main():
    # event and queue should be created inside main,
    # otherwise could cause "different loops" conflict
    event = asyncio.Event()
    queue = asyncio.Queue()
    await asyncio.gather(
        producer(queue=queue, event=event),
        terminator(event=event),
        consumer(queue=queue, event=event),
    )


if __name__ == '__main__':
    asyncio.run(main())

huangapple
  • 本文由 发表于 2023年3月1日 14:28:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/75600223.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定