Python异步库问题

huangapple go评论66阅读模式
英文:

python async library issue

问题

以下是翻译好的代码部分:

import asyncio
import logging

async def first_function(queue1, queue2):
    while True:
        # do something
        result1 = "some result"
        result2 = {"key": "value"}
        await queue1.put(result1)
        await queue2.put(result2)
        await asyncio.sleep(1)  # 等待1秒

async def second_function(queue1, queue2, queue3):
    while True:
        result1 = await queue1.get()
        result2 = await queue2.get()
        # 对结果执行操作
        logging.info(f"Second function received: {result1}, {result2}")
        await queue3.put(result1)
        await asyncio.sleep(1)  # 等待1秒

async def third_function(queue):
    while True:
        result = await queue.get()
        await asyncio.sleep(40) # 等待40秒
        # 对结果执行操作
        logging.info(f"Third function received: {result}")

async def main():
    queue1 = asyncio.Queue()
    queue2 = asyncio.Queue()
    queue3 = asyncio.Queue()
    task1 = asyncio.create_task(first_function(queue1, queue2))
    task2 = asyncio.create_task(second_function(queue1, queue2, queue3))
    task3 = asyncio.create_task(third_function(queue3))
    await asyncio.gather(task1, task2, task3)

希望这能帮助您解决问题。如果您有其他问题,请随时提出。

英文:

I have the following script, but it work incorecctly. I want to first function always repeat and second function should start if first function return result1, result2. I want to add one more function which should start 40 seconds after second function finished job with result1 from first function. Third function should do something with result2. I want to work it asynchronously using asyncio library because i need the first function to not block for 40 seconds while third function sleep. I tried to use chatgpt, but this tool recommend me wrong things. How to do that?

import asyncio
import logging



async def first_function(queue1, queue2):
    while True:
        # do something
        result1 = "some result"
        result2 = {"key": "value"}
        await queue1.put(result1)
        await queue2.put(result2)
        await asyncio.sleep(1)  # wait for 1 second

async def second_function(queue1, queue2, queue3):
    while True:

           result1 = await queue1.get()
           result2 = await queue2.get()
           # do something with the results
           logging.info(f"Second function received: {result1}, {result2}")
           await queue3.put(result1)
           await asyncio.sleep(1)  # wait for 1 second


async def third_function(queue):
    while True:

           result = await queue.get()
           await asyncio.sleep(40) # wait for 40 seconds
           # do something with the result
           logging.info(f"Third function received: {result}")


async def main():
    queue1 = asyncio.Queue()
    queue2 = asyncio.Queue()
    queue3 = asyncio.Queue()
    task1 = asyncio.create_task(first_function(queue1, queue2))
    task2 = asyncio.create_task(second_function(queue1, queue2, queue3))
    task3 = asyncio.create_task(third_function(queue3))
    await asyncio.gather(task1, task2, task3)

I tried everything i googled and it always block my code for 40 seconds when reach third function

答案1

得分: 0

存在多个问题和反模式:

  • 无需使用asyncio.create_task(),因为当对象是协程时,AsyncIO会自动完成这个操作,
  • 你使用了无界队列,这意味着程序的内存消耗可能会无限增长,
  • third_function 需要40秒,而 first_function 每12秒产生一次元素,这意味着前者无法快速处理元素,
  • 如果 result1result_2 应该同时产生并同时消耗,使用一个队列来存放这两个元素。

你可以使用类似以下的代码:

import asyncio

async def first_function(queue_in: asyncio.Queue):
    while True:
        result1 = "some result"
        result2 = {"key": "value"}
        await queue_in.put((result1, result2))

async def second_function(queue_in: asyncio.Queue, queue_out: asyncio.Queue):
    while True:
        result1, result2 = await queue_in.get()
        print(f"Second function received: {result1}, {result2}")
        await queue_out.put(result1)

async def third_function(queue_out: asyncio.Queue):
    while True:
        result = await queue_out.get()
        await asyncio.sleep(4)
        print(f"Third function received: {result}")

async def main():
    queue_in = asyncio.Queue(maxsize=1)
    queue_out = asyncio.Queue(maxsize=1)
    
    task1 = first_function(queue_in)
    task2 = second_function(queue_in, queue_out)
    task3 = third_function(queue_out)
    
    await asyncio.gather(task1, task2, task3)
    
if __name__ == "__main__":
    asyncio.run(main())

通过指定 maxsize=1,你限制了 first_function 的生产速率,因为现在 third_function 定义了消费速率:只有在 third_function 完成对上一个元素的处理后,first_function(和 second_function)才被允许产生新的元素。

如果你无法控制 first_function 产生元素的速率,你应该实现一种反压策略,例如,如果你无法以足够快的速度处理它们,可以丢弃元素。如果想要在放弃元素之前允许一些等待,可以使用较高的 max size 值。

英文:

There is multiple issues and anti-patterns:

  • there is no need to use asyncio.create_task() since AsyncIO will do that automatically for you when the objects are coroutines,
  • you use unbounded queues, which means that the memory consumption or your program could grow unlimited,
  • third_function takes 40 seconds while first_function produces elements every 12 seconds which means that the former won't have time to process elements fast enough,
  • if result1 and result_2 should be produces at the same time and consumed at the same time, use a single queue to put both elements.

You could use something like this:

import asyncio


async def first_function(queue_in: asyncio.Queue):
    while True:
        result1 = "some result"
        result2 = {"key": "value"}
        await queue_in.put((result1, result2))


async def second_function(queue_in: asyncio.Queue, queue_out: asyncio.Queue):
    while True:
        result1, result2 = await queue_in.get()
        print(f"Second function received: {result1}, {result2}")
        await queue_out.put(result1)


async def third_function(queue_out: asyncio.Queue):
    while True:
        result = await queue_out.get()
        await asyncio.sleep(4)
        print(f"Third function received: {result}")


async def main():
    queue_in = asyncio.Queue(maxsize=1)
    queue_out = asyncio.Queue(maxsize=1)
    
    task1 = first_function(queue_in)
    task2 = second_function(queue_in, queue_out)
    task3 = third_function(queue_out)
    
    await asyncio.gather(task1, task2, task3)
    
    
if __name__ == "__main__":
    asyncio.run(main())

By specifying maxsize=1 you limit the production of first_function because now third_function defines the consumption rate: first_function (and second_function to) is only allowed to produce a new element when third_function finished processing the last one.

If you cannot control the rate at which first_function produces elements, you should implement a back-pressure strategy where for instance you drop elements if you cannot process them fast enough. You can use a higher value for max size if you want to allow some sort of waiting before elements are dropped.

huangapple
  • 本文由 发表于 2023年2月27日 17:22:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/75578655.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定