英文:
python async library issue
问题
以下是翻译好的代码部分:
import asyncio
import logging
async def first_function(queue1, queue2):
while True:
# do something
result1 = "some result"
result2 = {"key": "value"}
await queue1.put(result1)
await queue2.put(result2)
await asyncio.sleep(1) # 等待1秒
async def second_function(queue1, queue2, queue3):
while True:
result1 = await queue1.get()
result2 = await queue2.get()
# 对结果执行操作
logging.info(f"Second function received: {result1}, {result2}")
await queue3.put(result1)
await asyncio.sleep(1) # 等待1秒
async def third_function(queue):
while True:
result = await queue.get()
await asyncio.sleep(40) # 等待40秒
# 对结果执行操作
logging.info(f"Third function received: {result}")
async def main():
queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()
task1 = asyncio.create_task(first_function(queue1, queue2))
task2 = asyncio.create_task(second_function(queue1, queue2, queue3))
task3 = asyncio.create_task(third_function(queue3))
await asyncio.gather(task1, task2, task3)
希望这能帮助您解决问题。如果您有其他问题,请随时提出。
英文:
I have the following script, but it work incorecctly. I want to first function always repeat and second function should start if first function return result1, result2. I want to add one more function which should start 40 seconds after second function finished job with result1 from first function. Third function should do something with result2. I want to work it asynchronously using asyncio library because i need the first function to not block for 40 seconds while third function sleep. I tried to use chatgpt, but this tool recommend me wrong things. How to do that?
import asyncio
import logging
async def first_function(queue1, queue2):
while True:
# do something
result1 = "some result"
result2 = {"key": "value"}
await queue1.put(result1)
await queue2.put(result2)
await asyncio.sleep(1) # wait for 1 second
async def second_function(queue1, queue2, queue3):
while True:
result1 = await queue1.get()
result2 = await queue2.get()
# do something with the results
logging.info(f"Second function received: {result1}, {result2}")
await queue3.put(result1)
await asyncio.sleep(1) # wait for 1 second
async def third_function(queue):
while True:
result = await queue.get()
await asyncio.sleep(40) # wait for 40 seconds
# do something with the result
logging.info(f"Third function received: {result}")
async def main():
queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()
task1 = asyncio.create_task(first_function(queue1, queue2))
task2 = asyncio.create_task(second_function(queue1, queue2, queue3))
task3 = asyncio.create_task(third_function(queue3))
await asyncio.gather(task1, task2, task3)
I tried everything i googled and it always block my code for 40 seconds when reach third function
答案1
得分: 0
存在多个问题和反模式:
- 无需使用
asyncio.create_task()
,因为当对象是协程时,AsyncIO会自动完成这个操作, - 你使用了无界队列,这意味着程序的内存消耗可能会无限增长,
third_function
需要40秒,而first_function
每12秒产生一次元素,这意味着前者无法快速处理元素,- 如果
result1
和result_2
应该同时产生并同时消耗,使用一个队列来存放这两个元素。
你可以使用类似以下的代码:
import asyncio
async def first_function(queue_in: asyncio.Queue):
while True:
result1 = "some result"
result2 = {"key": "value"}
await queue_in.put((result1, result2))
async def second_function(queue_in: asyncio.Queue, queue_out: asyncio.Queue):
while True:
result1, result2 = await queue_in.get()
print(f"Second function received: {result1}, {result2}")
await queue_out.put(result1)
async def third_function(queue_out: asyncio.Queue):
while True:
result = await queue_out.get()
await asyncio.sleep(4)
print(f"Third function received: {result}")
async def main():
queue_in = asyncio.Queue(maxsize=1)
queue_out = asyncio.Queue(maxsize=1)
task1 = first_function(queue_in)
task2 = second_function(queue_in, queue_out)
task3 = third_function(queue_out)
await asyncio.gather(task1, task2, task3)
if __name__ == "__main__":
asyncio.run(main())
通过指定 maxsize=1
,你限制了 first_function
的生产速率,因为现在 third_function
定义了消费速率:只有在 third_function
完成对上一个元素的处理后,first_function
(和 second_function
)才被允许产生新的元素。
如果你无法控制 first_function
产生元素的速率,你应该实现一种反压策略,例如,如果你无法以足够快的速度处理它们,可以丢弃元素。如果想要在放弃元素之前允许一些等待,可以使用较高的 max size
值。
英文:
There is multiple issues and anti-patterns:
- there is no need to use
asyncio.create_task()
since AsyncIO will do that automatically for you when the objects are coroutines, - you use unbounded queues, which means that the memory consumption or your program could grow unlimited,
third_function
takes 40 seconds whilefirst_function
produces elements every 12 seconds which means that the former won't have time to process elements fast enough,- if
result1
andresult_2
should be produces at the same time and consumed at the same time, use a single queue to put both elements.
You could use something like this:
import asyncio
async def first_function(queue_in: asyncio.Queue):
while True:
result1 = "some result"
result2 = {"key": "value"}
await queue_in.put((result1, result2))
async def second_function(queue_in: asyncio.Queue, queue_out: asyncio.Queue):
while True:
result1, result2 = await queue_in.get()
print(f"Second function received: {result1}, {result2}")
await queue_out.put(result1)
async def third_function(queue_out: asyncio.Queue):
while True:
result = await queue_out.get()
await asyncio.sleep(4)
print(f"Third function received: {result}")
async def main():
queue_in = asyncio.Queue(maxsize=1)
queue_out = asyncio.Queue(maxsize=1)
task1 = first_function(queue_in)
task2 = second_function(queue_in, queue_out)
task3 = third_function(queue_out)
await asyncio.gather(task1, task2, task3)
if __name__ == "__main__":
asyncio.run(main())
By specifying maxsize=1
you limit the production of first_function
because now third_function
defines the consumption rate: first_function
(and second_function
to) is only allowed to produce a new element when third_function
finished processing the last one.
If you cannot control the rate at which first_function
produces elements, you should implement a back-pressure strategy where for instance you drop elements if you cannot process them fast enough. You can use a higher value for max size
if you want to allow some sort of waiting
before elements are dropped.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论