如何等待队列或事件?

huangapple go评论95阅读模式
英文:

How should I wait for a queue or an event?

问题

在Python中,我想知道如何等待queue.get()event.wait()中的第一个事件。

目前,我正在使用asyncio.wait()来实现这一点,但这会产生一个弃用警告。我不明白我应该如何修改我的代码,以便它与未来版本的Python兼容。

以下代码是可用的,但会产生警告DeprecationWarning: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8, and scheduled for removal in Python 3.11.

  1. import random
  2. import asyncio
  3. event = asyncio.Event()
  4. queue = asyncio.Queue()
  5. async def producer():
  6. for i in range(5):
  7. print(f"Putting {i}")
  8. await queue.put(i)
  9. await asyncio.sleep(random.random())
  10. # Check if we should terminate
  11. if event.is_set():
  12. break
  13. print("Producer done")
  14. async def terminator():
  15. await asyncio.sleep(random.random() * 5)
  16. print("Terminating")
  17. event.set()
  18. print("Terminator done")
  19. async def consumer():
  20. while True:
  21. print("Waiting on the result of either the queue or the event")
  22. done, _ = await asyncio.wait(
  23. [queue.get(), event.wait()],
  24. return_when=asyncio.FIRST_COMPLETED
  25. )
  26. # Check if we should terminate
  27. if event is set():
  28. break
  29. # Otherwise, we got a queue item
  30. item = done.pop().result()
  31. print(f"got {item}")
  32. print("Consumer done")
  33. async def main():
  34. await asyncio.gather(producer(), terminator(), consumer())
  35. asyncio.run(main())

示例输出:

  1. Putting 0
  2. Waiting on the result of either the queue or the event
  3. got 0
  4. Waiting on the result of either the queue or the event
  5. Putting 1
  6. got 1
  7. Waiting on the result of either the queue or the event
  8. Terminating
  9. Terminator done
  10. Consumer done
  11. Producer done
英文:

In Python, I would like to know how to wait for the first of either queue.get() or event.wait().

At the moment, I am using asyncio.wait() to achieve this, but this is producing a deprecation warning. I do not understand how I should alter my code so that it will be compatible with future versions of Python.

The following code is functional, however it gives the warning DeprecationWarning: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8, and scheduled for removal in Python 3.11.

  1. import random
  2. import asyncio
  3. event = asyncio.Event()
  4. queue = asyncio.Queue()
  5. async def producer():
  6. for i in range(5):
  7. print(f"Putting {i}")
  8. await queue.put(i)
  9. await asyncio.sleep(random.random())
  10. # Check if we should terminate
  11. if event.is_set():
  12. break
  13. print("Producer done")
  14. async def terminator():
  15. await asyncio.sleep(random.random() * 5)
  16. print("Terminating")
  17. event.set()
  18. print("Terminator done")
  19. async def consumer():
  20. while True:
  21. print(f"Waiting on the result of either the queue or the event")
  22. done, _ = await asyncio.wait(
  23. [queue.get(), event.wait()],
  24. return_when=asyncio.FIRST_COMPLETED
  25. )
  26. # Check if we should terminate
  27. if event.is_set():
  28. break
  29. # Otherwise, we got a queue item
  30. item = done.pop().result()
  31. print(f"got {item}")
  32. print("Consumer done")
  33. async def main():
  34. await asyncio.gather(producer(), terminator(), consumer())
  35. asyncio.run(main())

Example output:

  1. Putting 0
  2. Waiting on the result of either the queue or the event
  3. got 0
  4. Waiting on the result of either the queue or the event
  5. Putting 1
  6. got 1
  7. Waiting on the result of either the queue or the event
  8. Terminating
  9. Terminator done
  10. Consumer done
  11. Producer done

答案1

得分: 3

以下是您提供的代码的翻译部分:

  1. import asyncio
  2. async def producer(queue: asyncio.Queue, event: asyncio.Event) -> None:
  3. for i in range(5):
  4. print(f"Putting {i}")
  5. await queue.put(i)
  6. await asyncio.sleep(1)
  7. if event.is_set(): # Check if we should terminate
  8. break
  9. print("Producer done")
  10. async def terminator(event: asyncio.Event) -> None:
  11. await asyncio.sleep(3)
  12. print("Terminating")
  13. event.set()
  14. print("Terminator done")
  15. async def consumer(queue: asyncio.Queue, event: asyncio.Event) -> None:
  16. while True:
  17. print("Waiting on the result of either the queue or the event")
  18. # you should create Tasks, rather than just sending coroutines to asyncio.wait
  19. # It was the reason of Warning message
  20. task_queue_get = asyncio.create_task(queue.get())
  21. task_event_get = asyncio.create_task(event.wait())
  22. done_tasks, pending_tasks = await asyncio.wait(
  23. [task_queue_get, task_event_get],
  24. return_when=asyncio.FIRST_COMPLETED
  25. )
  26. # Check if we should terminate
  27. if event.is_set():
  28. [i.cancel() for i in pending_tasks] # cancel pending tasks
  29. # return_exceptions=True - prevent raise of asyncio.CancelledError
  30. await asyncio.gather(*pending_tasks, return_exceptions=True)
  31. break
  32. # Otherwise, we got a queue item
  33. try:
  34. res = [i.result() for i in done_tasks]
  35. # Attention!
  36. # only one task is expected, but can cause errors,
  37. # if result has exception instead of normal result inside
  38. item = res[0]
  39. print(f"got {item}")
  40. except Exception as ex:
  41. print("Unexpected Exception!!!")
  42. raise ex
  43. [i.cancel() for i in pending_tasks] # cancel pending tasks
  44. # return_exceptions=True - prevent raise of asyncio.CancelledError
  45. await asyncio.gather(*pending_tasks, return_exceptions=True)
  46. print("Consumer done")
  47. async def main():
  48. # event and queue should be created inside main,
  49. # otherwise could cause "different loops" conflict
  50. event = asyncio.Event()
  51. queue = asyncio.Queue()
  52. await asyncio.gather(
  53. producer(queue=queue, event=event),
  54. terminator(event=event),
  55. consumer(queue=queue, event=event),
  56. )
  57. if __name__ == '__main__':
  58. asyncio.run(main())

请注意,翻译的内容中代码部分没有翻译。

英文:

I made the following solution, please check and advise, I left many comments to explain what I do and what potential problem I found in your code:

  1. import asyncio
  2. async def producer(queue: asyncio.Queue, event: asyncio.Event) -> None:
  3. for i in range(5):
  4. print(f"Putting {i}")
  5. await queue.put(i)
  6. await asyncio.sleep(1)
  7. if event.is_set(): # Check if we should terminate
  8. break
  9. print("Producer done")
  10. async def terminator(event: asyncio.Event) -> None:
  11. await asyncio.sleep(3)
  12. print("Terminating")
  13. event.set()
  14. print("Terminator done")
  15. async def consumer(queue: asyncio.Queue, event: asyncio.Event) -> None:
  16. while True:
  17. print(f"Waiting on the result of either the queue or the event")
  18. # you should create Tasks, rather than just sending coroutines to asyncio.wait
  19. # It was the reason of Warning message
  20. task_queue_get = asyncio.create_task(queue.get())
  21. task_event_get = asyncio.create_task(event.wait())
  22. done_tasks, pending_tasks = await asyncio.wait(
  23. [task_queue_get, task_event_get],
  24. return_when=asyncio.FIRST_COMPLETED
  25. )
  26. # Check if we should terminate
  27. if event.is_set():
  28. [i.cancel() for i in pending_tasks] # cancel pending tasks
  29. # return_exceptions=True - prevent raise of asyncio.CancelledError
  30. await asyncio.gather(*pending_tasks, return_exceptions=True)
  31. break
  32. # Otherwise, we got a queue item
  33. try:
  34. res = [i.result() for i in done_tasks]
  35. # Attention!
  36. # only one task is expected, but can cause errors,
  37. # if result has exception instead of normal result inside
  38. item = res[0]
  39. print(f"got {item}")
  40. except Exception as ex:
  41. print("Unexpected Exception!!!")
  42. raise ex
  43. [i.cancel() for i in pending_tasks] # cancel pending tasks
  44. # return_exceptions=True - prevent raise of asyncio.CancelledError
  45. await asyncio.gather(*pending_tasks, return_exceptions=True)
  46. print("Consumer done")
  47. async def main():
  48. # event and queue should be created inside main,
  49. # otherwise could cause "different loops" conflict
  50. event = asyncio.Event()
  51. queue = asyncio.Queue()
  52. await asyncio.gather(
  53. producer(queue=queue, event=event),
  54. terminator(event=event),
  55. consumer(queue=queue, event=event),
  56. )
  57. if __name__ == '__main__':
  58. asyncio.run(main())

huangapple
  • 本文由 发表于 2023年3月1日 14:28:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/75600223.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定