Python asyncio collect info from multiple tasks at timeout, then continue in parallel to main until all tasks finished

huangapple go评论68阅读模式
英文:

Python asyncio collect info from multiple tasks at timeout, then continue in parallel to main until all tasks finished

问题

我要运行N个任务并行执行
在特定时间(超时)之后,我需要知道那些在此之前已经完成的任务的结果。这是我的"主"线程。这已经在下面的代码片段中工作了。
在一个与主线程并行且独立于主线程的任务中,我想继续等待其余的任务完成。一旦所有任务都成功完成,我需要根据所有N个结果进行一些计算,并将计算结果存储在持久缓存中。"主"不需要等待它。

如何实现第3点的任何提示?

tasks = [
    asyncio.create_task(self.call_111(harmonized_address)),
    asyncio.create_task(self.call_222(harmonized_address)),
    asyncio.create_task(self.call_333(harmonized_address)),
    asyncio.create_task(self.call_444(harmonized_address))
]
await asyncio.wait(tasks, timeout=timeout.total_seconds())

res = []
for t in tasks:
    try:
        r = t.result()
    except asyncio.InvalidStateError as e:
        res.append(None)
    except Exception as e:
        res.append(e)
    else:
        res.append(r)

更新: (3) 可能需要更多上下文。我们正在使用Python开发AWS Lambda函数。我们的Lambda函数必须在给定的时间范围内响应,并且可以部分响应是可以接受的。但我们还想等待其余的任务完成,以更新一个缓存,以获取完整的响应。

基于Python的AWS Lambda函数不是异步的。也许我需要启动一个单独的线程,并在其中执行循环和任务(类似于larks在下面的答案中概述的方式)。并与dmfigol在一个有关"Python asyncio事件循环在单独线程中"的gist中发布的内容结合使用。

显然,新线程将比"Lambda"完成得晚。我希望AWS支持这种方式。

英文:

My use case:

  1. I want to run N tasks in parallel
  2. After a specific period (timeout) I need to know the results from the tasks already finished by then. This is my "main" thread. This is already working in the snippet below.
  3. In a task parallel and independent to main, I want to continue waiting for the remaining tasks to finish. Onces all tasks finished successfully I need to do some calculations based on all N results, and store the computation in a persistent cache. "main" does not need to wait it.

Any hints on how to achieve no. 3?

        tasks = [
            asyncio.create_task(self.call_111(harmonized_address)),
            asyncio.create_task(self.call_222(harmonized_address)),
            asyncio.create_task(self.call_333(harmonized_address)),
            asyncio.create_task(self.call_444(harmonized_address))
        ]
        await asyncio.wait(tasks, timeout=timeout.total_seconds())

        res = []
        for t in tasks:
            try:
                r = t.result()
            except asyncio.InvalidStateError as e:
                res.append(None)
            except Exception as e:
                res.append(e)
            else:
                res.append(r)

Update: (3) probably needs a bit more context. We are using python to develop AWS lambda functions. Our lambda function must respond within a given timebox, and it is acceptable for it to be a partial one. But we also want to wait for the remaining tasks to finish to update a cache with the full response.

Python based AWS Lambdas are not async. Maybe I need to start a separate thread, and execute the loop and tasks in their (similar to how larks outlined it in his answer below). And combine it with what dmfigol has posted in a gist about "Python asyncio event loop in a separate thread "

Obviously the new thread will finish later then the "lambda". I hope that is supported by AWS.

答案1

得分: 1

以下是您要求的翻译部分:

我并不100%确定我正确理解了你的问题,所以如果这不是你寻找的,请告诉我。以下代码运行一系列并发任务。它等待特定的超时时间以便某些任务完成,然后返回到目前为止已经收集到的任何结果。稍后,会进行第二次调用,以收集在超时之前未完成的任务的结果。

我们利用了asyncio.wait返回的元组done,pending(其中done是在超时之前完成的任务列表,而pending是仍在运行的任务列表)。

import asyncio
import random

class AsyncExample:
    def __init__(self):
        self.tasks = []

    async def example_task(self, id: int):
        sleeptime = random.randint(1, 20)
        print(f"开始任务 {id},睡眠时间={sleeptime}")
        await asyncio.sleep(random.randint(1, 20))
        print(f"结束任务 {id}")
        return (id, sleeptime)

    async def start_tasks(self):
        tasks = []
        for x in range(10):
            tasks.append(asyncio.create_task(self.example_task(x)))

        done, self.tasks = await asyncio.wait(tasks, timeout=10)

        res = []
        for t in done:
            res.append(t.result())

        return res

    async def collect_tasks(self):
        res = []
        done, _ = await asyncio.wait(self.tasks)
        for t in done:
            res.append(t.result())

        return res

async def main():
    app = AsyncExample()
    res = await app.start_tasks()
    print("处理早期结果")
    for i in res:
        print("得到早期结果:", i)

    # ... 做其他事情...
    await asyncio.sleep(5)

    print("处理附加结果")
    for i in await app.collect_tasks():
        print("得到晚期结果:", i)

asyncio.run(main())

运行这段代码会产生类似以下的输出:

开始任务 0睡眠时间=3
开始任务 1睡眠时间=17
开始任务 2睡眠时间=10
开始任务 3睡眠时间=12
开始任务 4睡眠时间=20
开始任务 5睡眠时间=15
开始任务 6睡眠时间=19
开始任务 7睡眠时间=15
开始任务 8睡眠时间=4
开始任务 9睡眠时间=6
结束任务 5
结束任务 8
结束任务 6
处理早期结果
得到早期结果: (8, 4)
得到早期结果: (6, 19)
得到早期结果: (5, 15)
结束任务 3
结束任务 1
结束任务 0
结束任务 2
结束任务 9
收集附加结果
结束任务 4
结束任务 7
得到晚期结果: (3, 12)
得到晚期结果: (9, 6)
得到晚期结果: (1, 17)
得到晚期结果: (0, 3)
得到晚期结果: (4, 20)
得到晚期结果: (7, 15)
得到晚期结果: (2, 10)
英文:

I'm not 100% sure I understood your questions correctly, so please let me know if this isn't what you were looking for. The following code runs a series of concurrent tasks. It waits up to a specific timeout for some tasks to complete, at which time it returns any results that have been collected so far. At some point later, a second call is made that collects the results for any tasks that did not finish before the timeout.

We take advantage of the fact that asyncio.wait returns the tuple done,pending (where done is a list of tasks that completed before the timeout, and pending is the list of tasks that are still running).

import asyncio
import random


class AsyncExample:
    def __init__(self):
        self.tasks = []

    async def example_task(self, id: int):
        sleeptime = random.randint(1, 20)
        print(f"start task {id}, sleeptime={sleeptime}")
        await asyncio.sleep(random.randint(1, 20))
        print(f"end task {id}")
        return (id, sleeptime)

    async def start_tasks(self):
        tasks = []
        for x in range(10):
            tasks.append(asyncio.create_task(self.example_task(x)))

        done, self.tasks = await asyncio.wait(tasks, timeout=10)

        res = []
        for t in done:
            res.append(t.result())

        return res

    async def collect_tasks(self):
        res = []
        done, _ = await asyncio.wait(self.tasks)
        for t in done:
            res.append(t.result())

        return res


async def main():
    app = AsyncExample()
    res = await app.start_tasks()
    print("processing early results")
    for i in res:
        print("got early result:", i)

    # ... do something else...
    await asyncio.sleep(5)

    print("processing additional results")
    for i in await app.collect_tasks():
        print("got late result:", i)


asyncio.run(main())

Running this will produce output something like:

start task 0, sleeptime=3
start task 1, sleeptime=17
start task 2, sleeptime=10
start task 3, sleeptime=12
start task 4, sleeptime=20
start task 5, sleeptime=15
start task 6, sleeptime=19
start task 7, sleeptime=15
start task 8, sleeptime=4
start task 9, sleeptime=6
end task 5
end task 8
end task 6
processing early results
got early result: (8, 4)
got early result: (6, 19)
got early result: (5, 15)
end task 3
end task 1
end task 0
end task 2
end task 9
gathering additional results
end task 4
end task 7
got late result: (3, 12)
got late result: (9, 6)
got late result: (1, 17)
got late result: (0, 3)
got late result: (4, 20)
got late result: (7, 15)
got late result: (2, 10)

huangapple
  • 本文由 发表于 2023年6月8日 04:46:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/76427014.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定