英文:
How to append additional tasks in asyncio.as_completed(task_list)
问题
Here's the translated code portion:
我有一个协程函数来从URL获取结果:
```python
async def create_task_from_url(aio_session, form_url: str) -> [Future]:
loop = get_running_loop()
task = loop.create_task(async_get_results(form_url, aio_session))
return task
我尝试创建初始任务,并使用 as_completed
来获取结果。在这个过程中,根据一些逻辑,我想要追加到 tasks
列表中,以便 as_completed
也可以获取结果。
这就是我的问题所在,for
循环在第二个任务完成之前就结束了。如何正确地做到这一点?
tasks = [await create_task_from_url(self.aio_session, form_url)]
print(len(tasks))
for each_task in as_completed(tasks):
print(await each_task)
tasks.append(await create_task_from_url(self.aio_session, form_url))
print(len(tasks))
输出:
1
# 打印任务1的结果。
2 # 所以我们知道 tasks.append 是有效的。但结果从未被检索到。
错误消息:任务已销毁,但仍在等待中!
This is the translated code portion without the request for translation.
<details>
<summary>英文:</summary>
I have a coroutine function to fetch results from a URL:
async def create_task_form_url(aio_session, form_url: str) -> [Future]:
loop = get_running_loop()
task = loop.create_task(async_get_results(form_url, aio_session))
return task
I'm trying to create the initial task, and use `as_completed` to get the result. Along the way, *depending on some logic* I would want to append to the `tasks` list so that `as_completed` can fetch the result as well.
This is where my problem comes in, where the `for` loop closes out before the 2nd task completes. How can I do this correctly?
tasks = [await create_task_form_url(self.aio_session, form_url)]
print(len(tasks))
for each_task in as_completed(tasks):
print(await each_task)
tasks.append(await create_task_form_url(self.aio_session, form_url))
print(len(tasks))
Output:
```none
1
# Task 1 result is printed.
2 # So we know that the tasks.append works. But the result was never retrieved.
Error msg: Task was destroyed but it is pending!
答案1
得分: 2
Use asyncio.wait
instead - https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
After your desired iteration (it can also accept a timeout), it returns 2 sets: one with the completed tasks, and another with those still pending. You can add new tasks to the "pending" ones, and iterate again:
async def main():
tasks = {await create_task_form_url(self.aio_session, form_url), }
print(len(tasks))
while tasks:
done, pending = asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for each_task in done:
print(await each_task.result())
tasks = pending
...
if <condition_to_add_new_task>:
tasks.add(await create_task_form_url(self.aio_session, form_url))
print(len(tasks))
asyncio.run(main())
英文:
Use asyncio.wait
instead - https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
After your desired iteration (it can also accept a timeout), it returns 2 sets: one with the completed tasks, and another with those still pending. You can add new tasks to the "pending" ones, and iterate again:
async def main():
tasks = {await create_task_form_url(self.aio_session, form_url), }
print(len(tasks))
while tasks:
done, pending = asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for each_task in done:
print(await each_task.result())
tasks = pending
...
if <condition_to_add_new_task>:
tasks.add(await create_task_form_url(self.aio_session, form_url))
print(len(tasks))
asyncio.run(main())
答案2
得分: 0
I don't think you need as_completed
for your use case. In your example program you already await
for all tasks to complete (at the first line inside the list comprehension), thus as_completed
won't perform anything useful here.
If you do not need the results as they are completed you can await
for some of them and then schedule other tasks following that. If you need concurrent execution, you may use asyncio.gather()
.
However, iff you are using Python 3.11 you can use a TaskGroup
to manage concurrent tasks and schedule new ones as they go.
Inside the task group async-with
context manager you can create as many concurrent requests as you want. Then, you can wait for completion of some of them using the task handles you get when submitting a task to the group. finally you can schedule other tasks to the task group depending on the output of the previous tasks.
Here is an example:
import asyncio
async def get_hello() -> str:
await asyncio.sleep(1.0)
return "Hello"
async def get_comma() -> str:
await asyncio.sleep(1.0)
return ", "
async def get_world() -> str:
await asyncio.sleep(1.0)
return "world!"
async def main():
async with asyncio.TaskGroup() as tg:
# Launch two concurrent tasks.
tasks = [
tg.create_task(get_hello()),
tg.create_task(get_comma()),
]
# Wait for the completion of some of them and get the value.
first_word = await tasks[0]
# Launch other tasks depending on the value.
if first_word == "Hello":
task = tg.create_task(get_world())
else:
raise ValueError("first word was not 'Hello'")
# All submitted tasks are guaranted completed after the
# task group closing, i.e. after the `async-with` statement.
comma = await tasks[1]
second_word = await task
print(first_word, comma, second_word, sep="")
if __name__ == "__main__":
asyncio.run(main())
Alternatively you can use the solution given by @jsbuino which uses wait()
or you could use queues (asyncio.Queue
).
英文:
I don't think you need as_completed
for your use case. In your example program you already await
s for all tasks to complete (at the first line inside the list comprehension), thus as_completed
won't perform anything useful here.
If you do not need the results as they are completed you can await
for some of them and then schedule other tasks following that. If you need concurrent execution, you may use asyncio.gather()
.
However, iff you are using Python 3.11 you can use a TaskGroup
to manage concurrent tasks and schedule new ones as they go.
Inside the task group async-with
context manager you can create as many concurrent requests as you want. Then, you can wait for completion of some of them using the task handles you get when submitting a task to the group. finally you can schedule other tasks to the task group depending on the output of the previous tasks.
Here is an example:
import asyncio
async def get_hello() -> str:
await asyncio.sleep(1.0)
return "Hello"
async def get_comma() -> str:
await asyncio.sleep(1.0)
return ", "
async def get_world() -> str:
await asyncio.sleep(1.0)
return "world!"
async def main():
async with asyncio.TaskGroup() as tg:
# Launch two concurrent tasks.
tasks = [
tg.create_task(get_hello()),
tg.create_task(get_comma()),
]
# Wait for the completion of some of them and get the value.
first_word = await tasks[0]
# Launch other tasks depending on the value.
if first_word == "Hello":
task = tg.create_task(get_world())
else:
raise ValueError("first word was not 'Hello'")
# All submitted tasks are guaranted completed after the
# task group closing, i.e. after the `async-with` statement.
comma = await tasks[1]
second_word = await task
print(first_word, comma, second_word, sep="")
if __name__ == "__main__":
asyncio.run(main())
Alternatively you can use the solution given by @jsbuino which uses wait()
or you could use queues (asyncio.Queue
).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论