英文:
Python asyncio collect info from multiple tasks at timeout, then continue in parallel to main until all tasks finished
问题
我要运行N个任务并行执行
在特定时间(超时)之后,我需要知道那些在此之前已经完成的任务的结果。这是我的"主"线程。这已经在下面的代码片段中工作了。
在一个与主线程并行且独立于主线程的任务中,我想继续等待其余的任务完成。一旦所有任务都成功完成,我需要根据所有N个结果进行一些计算,并将计算结果存储在持久缓存中。"主"不需要等待它。
如何实现第3点的任何提示?
tasks = [
asyncio.create_task(self.call_111(harmonized_address)),
asyncio.create_task(self.call_222(harmonized_address)),
asyncio.create_task(self.call_333(harmonized_address)),
asyncio.create_task(self.call_444(harmonized_address))
]
await asyncio.wait(tasks, timeout=timeout.total_seconds())
res = []
for t in tasks:
try:
r = t.result()
except asyncio.InvalidStateError as e:
res.append(None)
except Exception as e:
res.append(e)
else:
res.append(r)
更新: (3) 可能需要更多上下文。我们正在使用Python开发AWS Lambda函数。我们的Lambda函数必须在给定的时间范围内响应,并且可以部分响应是可以接受的。但我们还想等待其余的任务完成,以更新一个缓存,以获取完整的响应。
基于Python的AWS Lambda函数不是异步的。也许我需要启动一个单独的线程,并在其中执行循环和任务(类似于larks在下面的答案中概述的方式)。并与dmfigol在一个有关"Python asyncio事件循环在单独线程中"的gist中发布的内容结合使用。
显然,新线程将比"Lambda"完成得晚。我希望AWS支持这种方式。
英文:
My use case:
- I want to run N tasks in parallel
- After a specific period (timeout) I need to know the results from the tasks already finished by then. This is my "main" thread. This is already working in the snippet below.
- In a task parallel and independent to main, I want to continue waiting for the remaining tasks to finish. Onces all tasks finished successfully I need to do some calculations based on all N results, and store the computation in a persistent cache. "main" does not need to wait it.
Any hints on how to achieve no. 3?
tasks = [
asyncio.create_task(self.call_111(harmonized_address)),
asyncio.create_task(self.call_222(harmonized_address)),
asyncio.create_task(self.call_333(harmonized_address)),
asyncio.create_task(self.call_444(harmonized_address))
]
await asyncio.wait(tasks, timeout=timeout.total_seconds())
res = []
for t in tasks:
try:
r = t.result()
except asyncio.InvalidStateError as e:
res.append(None)
except Exception as e:
res.append(e)
else:
res.append(r)
Update: (3) probably needs a bit more context. We are using python to develop AWS lambda functions. Our lambda function must respond within a given timebox, and it is acceptable for it to be a partial one. But we also want to wait for the remaining tasks to finish to update a cache with the full response.
Python based AWS Lambdas are not async. Maybe I need to start a separate thread, and execute the loop and tasks in their (similar to how larks outlined it in his answer below). And combine it with what dmfigol has posted in a gist about "Python asyncio event loop in a separate thread "
Obviously the new thread will finish later then the "lambda". I hope that is supported by AWS.
答案1
得分: 1
以下是您要求的翻译部分:
我并不100%确定我正确理解了你的问题,所以如果这不是你寻找的,请告诉我。以下代码运行一系列并发任务。它等待特定的超时时间以便某些任务完成,然后返回到目前为止已经收集到的任何结果。稍后,会进行第二次调用,以收集在超时之前未完成的任务的结果。
我们利用了asyncio.wait
返回的元组done,pending
(其中done
是在超时之前完成的任务列表,而pending
是仍在运行的任务列表)。
import asyncio
import random
class AsyncExample:
def __init__(self):
self.tasks = []
async def example_task(self, id: int):
sleeptime = random.randint(1, 20)
print(f"开始任务 {id},睡眠时间={sleeptime}")
await asyncio.sleep(random.randint(1, 20))
print(f"结束任务 {id}")
return (id, sleeptime)
async def start_tasks(self):
tasks = []
for x in range(10):
tasks.append(asyncio.create_task(self.example_task(x)))
done, self.tasks = await asyncio.wait(tasks, timeout=10)
res = []
for t in done:
res.append(t.result())
return res
async def collect_tasks(self):
res = []
done, _ = await asyncio.wait(self.tasks)
for t in done:
res.append(t.result())
return res
async def main():
app = AsyncExample()
res = await app.start_tasks()
print("处理早期结果")
for i in res:
print("得到早期结果:", i)
# ... 做其他事情...
await asyncio.sleep(5)
print("处理附加结果")
for i in await app.collect_tasks():
print("得到晚期结果:", i)
asyncio.run(main())
运行这段代码会产生类似以下的输出:
开始任务 0,睡眠时间=3
开始任务 1,睡眠时间=17
开始任务 2,睡眠时间=10
开始任务 3,睡眠时间=12
开始任务 4,睡眠时间=20
开始任务 5,睡眠时间=15
开始任务 6,睡眠时间=19
开始任务 7,睡眠时间=15
开始任务 8,睡眠时间=4
开始任务 9,睡眠时间=6
结束任务 5
结束任务 8
结束任务 6
处理早期结果
得到早期结果: (8, 4)
得到早期结果: (6, 19)
得到早期结果: (5, 15)
结束任务 3
结束任务 1
结束任务 0
结束任务 2
结束任务 9
收集附加结果
结束任务 4
结束任务 7
得到晚期结果: (3, 12)
得到晚期结果: (9, 6)
得到晚期结果: (1, 17)
得到晚期结果: (0, 3)
得到晚期结果: (4, 20)
得到晚期结果: (7, 15)
得到晚期结果: (2, 10)
英文:
I'm not 100% sure I understood your questions correctly, so please let me know if this isn't what you were looking for. The following code runs a series of concurrent tasks. It waits up to a specific timeout for some tasks to complete, at which time it returns any results that have been collected so far. At some point later, a second call is made that collects the results for any tasks that did not finish before the timeout.
We take advantage of the fact that asyncio.wait
returns the tuple done,pending
(where done
is a list of tasks that completed before the timeout, and pending
is the list of tasks that are still running).
import asyncio
import random
class AsyncExample:
def __init__(self):
self.tasks = []
async def example_task(self, id: int):
sleeptime = random.randint(1, 20)
print(f"start task {id}, sleeptime={sleeptime}")
await asyncio.sleep(random.randint(1, 20))
print(f"end task {id}")
return (id, sleeptime)
async def start_tasks(self):
tasks = []
for x in range(10):
tasks.append(asyncio.create_task(self.example_task(x)))
done, self.tasks = await asyncio.wait(tasks, timeout=10)
res = []
for t in done:
res.append(t.result())
return res
async def collect_tasks(self):
res = []
done, _ = await asyncio.wait(self.tasks)
for t in done:
res.append(t.result())
return res
async def main():
app = AsyncExample()
res = await app.start_tasks()
print("processing early results")
for i in res:
print("got early result:", i)
# ... do something else...
await asyncio.sleep(5)
print("processing additional results")
for i in await app.collect_tasks():
print("got late result:", i)
asyncio.run(main())
Running this will produce output something like:
start task 0, sleeptime=3
start task 1, sleeptime=17
start task 2, sleeptime=10
start task 3, sleeptime=12
start task 4, sleeptime=20
start task 5, sleeptime=15
start task 6, sleeptime=19
start task 7, sleeptime=15
start task 8, sleeptime=4
start task 9, sleeptime=6
end task 5
end task 8
end task 6
processing early results
got early result: (8, 4)
got early result: (6, 19)
got early result: (5, 15)
end task 3
end task 1
end task 0
end task 2
end task 9
gathering additional results
end task 4
end task 7
got late result: (3, 12)
got late result: (9, 6)
got late result: (1, 17)
got late result: (0, 3)
got late result: (4, 20)
got late result: (7, 15)
got late result: (2, 10)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论