Python异步库问题

huangapple go评论94阅读模式
英文:

python async library issue

问题

以下是翻译好的代码部分:

  1. import asyncio
  2. import logging
  3. async def first_function(queue1, queue2):
  4. while True:
  5. # do something
  6. result1 = "some result"
  7. result2 = {"key": "value"}
  8. await queue1.put(result1)
  9. await queue2.put(result2)
  10. await asyncio.sleep(1) # 等待1秒
  11. async def second_function(queue1, queue2, queue3):
  12. while True:
  13. result1 = await queue1.get()
  14. result2 = await queue2.get()
  15. # 对结果执行操作
  16. logging.info(f"Second function received: {result1}, {result2}")
  17. await queue3.put(result1)
  18. await asyncio.sleep(1) # 等待1秒
  19. async def third_function(queue):
  20. while True:
  21. result = await queue.get()
  22. await asyncio.sleep(40) # 等待40秒
  23. # 对结果执行操作
  24. logging.info(f"Third function received: {result}")
  25. async def main():
  26. queue1 = asyncio.Queue()
  27. queue2 = asyncio.Queue()
  28. queue3 = asyncio.Queue()
  29. task1 = asyncio.create_task(first_function(queue1, queue2))
  30. task2 = asyncio.create_task(second_function(queue1, queue2, queue3))
  31. task3 = asyncio.create_task(third_function(queue3))
  32. await asyncio.gather(task1, task2, task3)

希望这能帮助您解决问题。如果您有其他问题,请随时提出。

英文:

I have the following script, but it work incorecctly. I want to first function always repeat and second function should start if first function return result1, result2. I want to add one more function which should start 40 seconds after second function finished job with result1 from first function. Third function should do something with result2. I want to work it asynchronously using asyncio library because i need the first function to not block for 40 seconds while third function sleep. I tried to use chatgpt, but this tool recommend me wrong things. How to do that?

  1. import asyncio
  2. import logging
  3. async def first_function(queue1, queue2):
  4. while True:
  5. # do something
  6. result1 = "some result"
  7. result2 = {"key": "value"}
  8. await queue1.put(result1)
  9. await queue2.put(result2)
  10. await asyncio.sleep(1) # wait for 1 second
  11. async def second_function(queue1, queue2, queue3):
  12. while True:
  13. result1 = await queue1.get()
  14. result2 = await queue2.get()
  15. # do something with the results
  16. logging.info(f"Second function received: {result1}, {result2}")
  17. await queue3.put(result1)
  18. await asyncio.sleep(1) # wait for 1 second
  19. async def third_function(queue):
  20. while True:
  21. result = await queue.get()
  22. await asyncio.sleep(40) # wait for 40 seconds
  23. # do something with the result
  24. logging.info(f"Third function received: {result}")
  25. async def main():
  26. queue1 = asyncio.Queue()
  27. queue2 = asyncio.Queue()
  28. queue3 = asyncio.Queue()
  29. task1 = asyncio.create_task(first_function(queue1, queue2))
  30. task2 = asyncio.create_task(second_function(queue1, queue2, queue3))
  31. task3 = asyncio.create_task(third_function(queue3))
  32. await asyncio.gather(task1, task2, task3)

I tried everything i googled and it always block my code for 40 seconds when reach third function

答案1

得分: 0

存在多个问题和反模式:

  • 无需使用asyncio.create_task(),因为当对象是协程时,AsyncIO会自动完成这个操作,
  • 你使用了无界队列,这意味着程序的内存消耗可能会无限增长,
  • third_function 需要40秒,而 first_function 每12秒产生一次元素,这意味着前者无法快速处理元素,
  • 如果 result1result_2 应该同时产生并同时消耗,使用一个队列来存放这两个元素。

你可以使用类似以下的代码:

  1. import asyncio
  2. async def first_function(queue_in: asyncio.Queue):
  3. while True:
  4. result1 = "some result"
  5. result2 = {"key": "value"}
  6. await queue_in.put((result1, result2))
  7. async def second_function(queue_in: asyncio.Queue, queue_out: asyncio.Queue):
  8. while True:
  9. result1, result2 = await queue_in.get()
  10. print(f"Second function received: {result1}, {result2}")
  11. await queue_out.put(result1)
  12. async def third_function(queue_out: asyncio.Queue):
  13. while True:
  14. result = await queue_out.get()
  15. await asyncio.sleep(4)
  16. print(f"Third function received: {result}")
  17. async def main():
  18. queue_in = asyncio.Queue(maxsize=1)
  19. queue_out = asyncio.Queue(maxsize=1)
  20. task1 = first_function(queue_in)
  21. task2 = second_function(queue_in, queue_out)
  22. task3 = third_function(queue_out)
  23. await asyncio.gather(task1, task2, task3)
  24. if __name__ == "__main__":
  25. asyncio.run(main())

通过指定 maxsize=1,你限制了 first_function 的生产速率,因为现在 third_function 定义了消费速率:只有在 third_function 完成对上一个元素的处理后,first_function(和 second_function)才被允许产生新的元素。

如果你无法控制 first_function 产生元素的速率,你应该实现一种反压策略,例如,如果你无法以足够快的速度处理它们,可以丢弃元素。如果想要在放弃元素之前允许一些等待,可以使用较高的 max size 值。

英文:

There is multiple issues and anti-patterns:

  • there is no need to use asyncio.create_task() since AsyncIO will do that automatically for you when the objects are coroutines,
  • you use unbounded queues, which means that the memory consumption or your program could grow unlimited,
  • third_function takes 40 seconds while first_function produces elements every 12 seconds which means that the former won't have time to process elements fast enough,
  • if result1 and result_2 should be produces at the same time and consumed at the same time, use a single queue to put both elements.

You could use something like this:

  1. import asyncio
  2. async def first_function(queue_in: asyncio.Queue):
  3. while True:
  4. result1 = "some result"
  5. result2 = {"key": "value"}
  6. await queue_in.put((result1, result2))
  7. async def second_function(queue_in: asyncio.Queue, queue_out: asyncio.Queue):
  8. while True:
  9. result1, result2 = await queue_in.get()
  10. print(f"Second function received: {result1}, {result2}")
  11. await queue_out.put(result1)
  12. async def third_function(queue_out: asyncio.Queue):
  13. while True:
  14. result = await queue_out.get()
  15. await asyncio.sleep(4)
  16. print(f"Third function received: {result}")
  17. async def main():
  18. queue_in = asyncio.Queue(maxsize=1)
  19. queue_out = asyncio.Queue(maxsize=1)
  20. task1 = first_function(queue_in)
  21. task2 = second_function(queue_in, queue_out)
  22. task3 = third_function(queue_out)
  23. await asyncio.gather(task1, task2, task3)
  24. if __name__ == "__main__":
  25. asyncio.run(main())

By specifying maxsize=1 you limit the production of first_function because now third_function defines the consumption rate: first_function (and second_function to) is only allowed to produce a new element when third_function finished processing the last one.

If you cannot control the rate at which first_function produces elements, you should implement a back-pressure strategy where for instance you drop elements if you cannot process them fast enough. You can use a higher value for max size if you want to allow some sort of waiting before elements are dropped.

huangapple
  • 本文由 发表于 2023年2月27日 17:22:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/75578655.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定