如何在asyncio.as_completed(task_list)中追加额外任务。

huangapple go评论59阅读模式
英文:

How to append additional tasks in asyncio.as_completed(task_list)

问题

Here's the translated code portion:

我有一个协程函数来从URL获取结果

```python
async def create_task_from_url(aio_session, form_url: str) -> [Future]:
    loop = get_running_loop()
    task = loop.create_task(async_get_results(form_url, aio_session))
    return task

我尝试创建初始任务,并使用 as_completed 来获取结果。在这个过程中,根据一些逻辑,我想要追加到 tasks 列表中,以便 as_completed 也可以获取结果。

这就是我的问题所在,for 循环在第二个任务完成之前就结束了。如何正确地做到这一点?

tasks = [await create_task_from_url(self.aio_session, form_url)]

print(len(tasks))

for each_task in as_completed(tasks):
    print(await each_task)
    tasks.append(await create_task_from_url(self.aio_session, form_url))
    print(len(tasks))

输出:

1
# 打印任务1的结果。
2  # 所以我们知道 tasks.append 是有效的。但结果从未被检索到。
错误消息:任务已销毁,但仍在等待中!

This is the translated code portion without the request for translation.

<details>
<summary>英文:</summary>

I have a coroutine function to fetch results from a URL: 

async def create_task_form_url(aio_session, form_url: str) -> [Future]:
loop = get_running_loop()
task = loop.create_task(async_get_results(form_url, aio_session))
return task


I&#39;m trying to create the initial task, and use `as_completed` to get the result. Along the way, *depending on some logic* I would want to append to the `tasks` list so that `as_completed` can fetch the result as well. 

This is where my problem comes in, where the `for` loop closes out before the 2nd task completes. How can I do this correctly?

tasks = [await create_task_form_url(self.aio_session, form_url)]

print(len(tasks))

for each_task in as_completed(tasks):
print(await each_task)
tasks.append(await create_task_form_url(self.aio_session, form_url))
print(len(tasks))


Output:

```none
1
# Task 1 result is printed.
2  # So we know that the tasks.append works. But the result was never retrieved.
Error msg: Task was destroyed but it is pending!

答案1

得分: 2

Use asyncio.wait instead - https://docs.python.org/3/library/asyncio-task.html#asyncio.wait

After your desired iteration (it can also accept a timeout), it returns 2 sets: one with the completed tasks, and another with those still pending. You can add new tasks to the "pending" ones, and iterate again:

async def main():
    tasks = {await create_task_form_url(self.aio_session, form_url), }

    print(len(tasks))
    
    while tasks:
        done, pending = asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

        for each_task in done:
            print(await each_task.result())
        tasks = pending
        ...
        if <condition_to_add_new_task>:
            tasks.add(await create_task_form_url(self.aio_session, form_url))
        print(len(tasks))
        
asyncio.run(main())
英文:

Use asyncio.wait instead - https://docs.python.org/3/library/asyncio-task.html#asyncio.wait

After your desired iteration (it can also accept a timeout), it returns 2 sets: one with the completed tasks, and another with those still pending. You can add new tasks to the "pending" ones, and iterate again:


async def main():
    tasks = {await create_task_form_url(self.aio_session, form_url), }

    print(len(tasks))
    
    while tasks:
        done, pending = asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

        for each_task in done:
            print(await each_task.result())
        tasks = pending
        ...
        if &lt;condition_to_add_new_task&gt;:
            tasks.add(await create_task_form_url(self.aio_session, form_url))
        print(len(tasks))
        
asyncio.run(main())

答案2

得分: 0

I don't think you need as_completed for your use case. In your example program you already await for all tasks to complete (at the first line inside the list comprehension), thus as_completed won't perform anything useful here.

If you do not need the results as they are completed you can await for some of them and then schedule other tasks following that. If you need concurrent execution, you may use asyncio.gather().

However, iff you are using Python 3.11 you can use a TaskGroup to manage concurrent tasks and schedule new ones as they go.

Inside the task group async-with context manager you can create as many concurrent requests as you want. Then, you can wait for completion of some of them using the task handles you get when submitting a task to the group. finally you can schedule other tasks to the task group depending on the output of the previous tasks.

Here is an example:

import asyncio


async def get_hello() -> str:
    await asyncio.sleep(1.0)
    return "Hello"


async def get_comma() -> str:
    await asyncio.sleep(1.0)
    return ", "


async def get_world() -> str:
    await asyncio.sleep(1.0)
    return "world!"


async def main():
    async with asyncio.TaskGroup() as tg:
        # Launch two concurrent tasks.
        tasks = [
            tg.create_task(get_hello()),
            tg.create_task(get_comma()),
        ]

        # Wait for the completion of some of them and get the value.
        first_word = await tasks[0]

        # Launch other tasks depending on the value.
        if first_word == "Hello":
            task = tg.create_task(get_world())
        else:
            raise ValueError("first word was not 'Hello'")

    # All submitted tasks are guaranted completed after the
    # task group closing, i.e. after the `async-with` statement.
    comma = await tasks[1]
    second_word = await task

    print(first_word, comma, second_word, sep="")


if __name__ == "__main__":
    asyncio.run(main())

Alternatively you can use the solution given by @jsbuino which uses wait() or you could use queues (asyncio.Queue).

英文:

I don't think you need as_completed for your use case. In your example program you already awaits for all tasks to complete (at the first line inside the list comprehension), thus as_completed won't perform anything useful here.

If you do not need the results as they are completed you can await for some of them and then schedule other tasks following that. If you need concurrent execution, you may use asyncio.gather().

However, iff you are using Python 3.11 you can use a TaskGroup to manage concurrent tasks and schedule new ones as they go.

Inside the task group async-with context manager you can create as many concurrent requests as you want. Then, you can wait for completion of some of them using the task handles you get when submitting a task to the group. finally you can schedule other tasks to the task group depending on the output of the previous tasks.

Here is an example:

import asyncio


async def get_hello() -&gt; str:
    await asyncio.sleep(1.0)
    return &quot;Hello&quot;


async def get_comma() -&gt; str:
    await asyncio.sleep(1.0)
    return &quot;, &quot;


async def get_world() -&gt; str:
    await asyncio.sleep(1.0)
    return &quot;world!&quot;


async def main():
    async with asyncio.TaskGroup() as tg:
        # Launch two concurrent tasks.
        tasks = [
            tg.create_task(get_hello()),
            tg.create_task(get_comma()),
        ]

        # Wait for the completion of some of them and get the value.
        first_word = await tasks[0]

        # Launch other tasks depending on the value.
        if first_word == &quot;Hello&quot;:
            task = tg.create_task(get_world())
        else:
            raise ValueError(&quot;first word was not &#39;Hello&#39;&quot;)

    # All submitted tasks are guaranted completed after the
    # task group closing, i.e. after the `async-with` statement.
    comma = await tasks[1]
    second_word = await task

    print(first_word, comma, second_word, sep=&quot;&quot;)


if __name__ == &quot;__main__&quot;:
    asyncio.run(main())

Alternatively you can use the solution given by @jsbuino which uses wait() or you could use queues (asyncio.Queue).

huangapple
  • 本文由 发表于 2023年5月26日 00:29:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/76334452.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定