How to schedule awaitables for sequential execution without awaiting, without prior knowing the number of awaitables?

huangapple go评论67阅读模式
英文:

How to schedule awaitables for sequential execution without awaiting, without prior knowing the number of awaitables?

问题

以下是翻译好的部分:

What I would like to do is essentially described here.

我想做的事情基本上在这里有描述。

However, I do not know how many awaitables I will execute.

但是,我不知道我会执行多少个可等待对象。

If I were to use threading my code would be like this:

如果我使用线程,我的代码将如下所示:

def foo():
   # some blocking call 

result_list = []
threads = []
for i in events:  # events is a list different every time
     newthread = threading.Thread(target=foo, args=())          
     threads.append(newthread)
     newthread.start()
print(result_list)

How can I turn this into async code?

我该如何将这个转换成异步代码?

I suppose foo() should look like this:

我想foo()应该是这样的:

async def foo():
   global result_list
   result = await blocking_call()
   result_list.append(result)

I have tried to create tasks in another thread, but this approach doesn't seem to function properly.

我尝试在另一个线程中创建任务,但这种方法似乎不能正常运行。

EDIT

I would like to do something like this:

我想做类似这样的事情:

def foo():
   # some blocking call 

result_list = []
threads = []
for i in events:  # events is a list different every time
     time.sleep(i)
     newthread = threading.Thread(target=foo, args=())          
     threads.append(newthread)
     newthread.start()
print(result_list)
英文:

What I would like to do is essentially described here.

However, I do not know how many awaitables I will execute.

If I were to use threading my code would be like this:

def foo():
   # some blocking call 

result_list = []
threads = []
for i in events:  # events is a list different every time
     newthread = threading.Thread(target=foo, args=())          
     threads.append(newthread)
     newthread.start()
print(result_list)

How can I turn this into async code?

I suppose foo() should look like this:

async def foo():
   global result_list
   result = await blocking_call()
   result_list.append(result)

I have tried to create tasks in another thread, but this approach doesn't seem to function properly.

EDIT

I would like to do something like this:

def foo():
   # some blocking call 

result_list = []
threads = []
for i in events:  # events is a list different every time
     time.sleep(i)
     newthread = threading.Thread(target=foo, args=())          
     threads.append(newthread)
     newthread.start()
print(result_list)

答案1

得分: 1

你链接的另一个问题已经有答案。唯一需要不同的是,如果你不知道协程的数量,就需要将协程的调用和await放在一个循环中。

但是,基于辅助任务的方法仍然是我认为最简单的解决方案。

这里有一个非常简单的演示:

from asyncio import create_task, run, sleep
from collections.abc import Iterable


async def foo(x: float) -> None:
    print(f"执行 foo({x=})")
    await sleep(x)
    print(f"完成 foo({x=})")


async def foo_sequentially(xs: Iterable[float]) -> None:
    for x in xs:
        await foo(x)


async def main() -> None:
    foo_inputs = [0.25, 0.5, 1., 1.5]
    foo_seq_task = create_task(foo_sequentially(foo_inputs))
    ...  # 做其他事情,而 `foo_seq_task` 被安排/执行
    print("  做其他事情...")
    await sleep(1)
    print("  做更多的事情...")
    await sleep(1)
    print("  完成其他事情!")
    await foo_seq_task


if __name__ == "__main__":
    run(main())

输出:

  做其他事情...
执行 foo(x=0.25)
完成 foo(x=0.25)
执行 foo(x=0.5)
完成 foo(x=0.5)
执行 foo(x=1.0)
  做更多的事情...
完成 foo(x=1.0)
执行 foo(x=1.5)
  完成其他事情!
完成 foo(x=1.5)

正如你所看到的,foo 协程都是按顺序执行的,相对于彼此,而作为一个整体(封装在 foo_sequentially 中),它们与 main 所做的其他事情并行执行。

这是因为按照定义,await 语句会阻塞包围它的协程(在这种情况下是 foo_sequentially),直到该语句中的可等待对象返回(在这种情况下是 foo 协程之一);但是,await 语句也会让出控制权,使事件循环能够继续执行其他协程(在这种情况下是 main)。

英文:

The other question you linked has the answer. The only thing you need to do differently, if you don't know the number of coroutines ahead of time, is put the calls and awaits of your coroutines in a loop.

But the helper task based approach is still the simplest solution in my opinion.

Here is a very simple demo:

from asyncio import create_task, run, sleep
from collections.abc import Iterable


async def foo(x: float) -> None:
    print(f"Executing foo({x=})")
    await sleep(x)
    print(f"Finished foo({x=})")


async def foo_sequentially(xs: Iterable[float]) -> None:
    for x in xs:
        await foo(x)


async def main() -> None:
    foo_inputs = [0.25, 0.5, 1., 1.5]
    foo_seq_task = create_task(foo_sequentially(foo_inputs))
    ...  # do other stuff, while `foo_seq_task` is scheduled/executes
    print("  Doing other stuff...")
    await sleep(1)
    print("  Doing more stuff...")
    await sleep(1)
    print("  Done with other stuff!")
    await foo_seq_task


if __name__ == "__main__":
    run(main())

Output:

  Doing other stuff...
Executing foo(x=0.25)
Finished foo(x=0.25)
Executing foo(x=0.5)
Finished foo(x=0.5)
Executing foo(x=1.0)
  Doing more stuff...
Finished foo(x=1.0)
Executing foo(x=1.5)
  Done with other stuff!
Finished foo(x=1.5)

As you can see the foo coroutines all executed sequentially with respect to one another, and viewed as a whole (wrapped in foo_sequentially) they executed concurrently to the rest of the things done by main.

That is because by definition an await statement will block the encompassing coroutine (in this case foo_sequentially), until the awaitable in that statement returns (in this case one of the foo coroutines); but the await statement will also yield control to the event loop allowing it to resume execution of some other coroutine (in this case main).

huangapple
  • 本文由 发表于 2023年6月13日 00:23:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/76458569.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定