获取异步任务中第一个非空结果的方法

huangapple go评论74阅读模式
英文:

How to get the first result from async tasks which is not None

问题

Here is the translated code portion:

我有一个情况需要在异步函数中停止任务条件是匹配 `task_1`、`task_2``task_3`,同时还要检查结果不是 `None`,如果结果是 `None`,那么我希望继续执行直到函数得到第一个不是 `None` 的结果以下是我目前拥有的最小可重现代码

import asyncio

async def task_1(_id):
    _ids = ["230327-12717", "230221-28276", "230214-06090"]

    for i in _ids:
        if i == _id:
            return f"在task_1中找到了 {_id}"

async def task_2(_id):
    _ids = ["230502-14191", "230425-17005", "230327-14434"]

    for i in _ids:
        if i == _id:
            return f"在task_2中找到了 {_id}"

async def task_3(_id):
    _ids = ["230404-23786", "230221-25729", "230221-28276"]

    for i in _ids:
        if i == _id:
            return f"在task_3中找到了 {_id}"

async def main(_id):
    tasks = [task_1(_id), task_2(_id), task_3(_id)]

    finished, unfinished = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED
    )

    print(finished)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main("230502-14191"))

请检查代码是否正确,并查看输出是否符合您的预期。

英文:

I have a situation where I need to stop the tasks in the async function where it matches the condition whether in task_1 or task_2 or task_3 and also check that result is not None if the results is None then I'd like to proceed till the function gets the first result which is not None. Here is the minimal reproducible code I have so far:

import asyncio


async def task_1(_id):
    _ids = ["230327-12717", "230221-28276", "230214-06090"]

    for i in _ids:
        if i == _id:
            return f"Found {_id} in task_1"


async def task_2(_id):
    _ids = ["230502-14191", "230425-17005", "230327-14434"]

    for i in _ids:
        if i == _id:
            return f"Found {_id} in task_2"


async def task_3(_id):
    _ids = ["230404-23786", "230221-25729", "230221-28276"]

    for i in _ids:
        if i == _id:
            return f"Found {_id} in task_3"


async def main(_id):
    tasks = [task_1(_id), task_2(_id), task_3(_id)]

    finished, unfinished = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED
    )

    print(finished)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main("230502-14191"))

which gives output:

{<Task finished name='Task-2' coro=<task_2() done, defined at /home/zerox/pipedrive_CRM/minimal.py:13> result='Found 230502-14191 in task_2'>, <Task finished name='Task-3' coro=<task_3() done, defined at /home/zerox/pipedrive_CRM/minimal.py:21> result=None>, <Task finished name='Task-4' coro=<task_1() done, defined at /home/zerox/pipedrive_CRM/minimal.py:5> result=None>}

And my expected output should be something like this:

{<Task finished name='Task-2' coro=<task_2() done, defined at /home/zerox/pipedrive_CRM/minimal.py:13> result='Found 230502-14191 in task_2'>}

How can I achieve that?

答案1

得分: 1

这似乎是一个 XY 问题,所以我会试着解释你实际想要达到的目的。

每当你需要协调多个任务的时候,最好从可用的同步原语开始。

在这种情况下,你希望对你特定的 ID 进行搜索,在其中一个任务中找到后,停止所有任务,这时候一个 Event 对象会很有用。主协程安排了“搜索”任务,然后简单地等待事件被设置。所有任务都携带对该事件对象的引用,一旦有任务找到了 ID,它就会设置该事件。主协程然后取消所有任务并清理。

演示代码:

from asyncio import Event, create_task, gather, run, sleep
from collections.abc import Iterable


async def _search(id_: str, ids: Iterable[str], found: Event, name: str) -> None:
    for i in ids:
        await sleep(0)
        print(f"{name} checking {i}")
        if i == id_:
            print(f"Found {id_} in {name}")
            found.set()


async def task_1(id_: str, found: Event) -> None:
    ids = ["230327-12717", "230221-28276", "230214-06090"]
    await _search(id_, ids, found, "task_1")


async def task_2(id_: str, found: Event) -> None:
    ids = ["230502-14191", "230425-17005", "230327-14434"]
    await _search(id_, ids, found, "task_2")


async def task_3(id_: str, found: Event) -> None:
    ids = ["230404-23786", "230221-25729", "230221-28276"]
    await _search(id_, ids, found, "task_3")


async def main(id_: str) -> None:
    found = Event()
    tasks = [
        create_task(task_1(id_, found)),
        create_task(task_2(id_, found)),
        create_task(task_3(id_, found)),
    ]
    await found.wait()
    for task in tasks:
        task.cancel()
    await gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    run(main("230502-14191"))

输出:

task_1 checking 230327-12717
task_2 checking 230502-14191
Found 230502-14191 in task_2
task_3 checking 230404-23786
task_1 checking 230221-28276

需要注意的一些事项:

  • for循环内部必须有一些await以确保发生上下文切换。否则任务将会按顺序执行。这就是为什么我加了await asyncio.sleep(0)的原因。
  • 如你所见,即使 ID 是由任务 2 找到的,任务 1 和任务 3 仍然各自进行了一次检查。没有办法阻止这个,因为事件循环自己决定了在任何给定时刻要切换到哪个协程。在我们的 main 协程中的 await found.wait() 只保证它将阻塞直到事件被设置,而不保证它会立即接管控制。但你会注意到,没有一个任务实际上完成,也就是说,在它们被取消之前,它们都没有经历完它们所有的 ID。
  • 我在 asyncio.gather 中使用了 return_exceptions=True,因为我不想让任何 CancelledError 传播到 main 协程。
  • 严格来说,你根本不需要 gather,因为协程将被取消,但让悬空的未等待的任务存在是不好的做法。你应该在某个时候await你的任务。gather 在这里似乎是最简单的解决方案。
英文:

This seems like an XY Problem, so I'll try and interpret what you actually want to accomplish.

Whenever you are dealing with multiple tasks that need to be coordinated in some way, it is a good idea to start with the available synchronization primitives.

In this case, you want the search for your specific ID to stop in all tasks, as soon as possible, after it has been found in one of them.

An Event object can be very useful for such a situation. The main coroutine schedules the "searching" tasks and then simply waits for the event to be set. The tasks all carry a reference to that event object and once the ID is found by one of the tasks, it sets the event. The main coroutine then simple cancels all the tasks and cleans up.

Demo:

from asyncio import Event, create_task, gather, run, sleep
from collections.abc import Iterable


async def _search(id_: str, ids: Iterable[str], found: Event, name: str) -> None:
    for i in ids:
        await sleep(0)
        print(f"{name} checking {i}")
        if i == id_:
            print(f"Found {id_} in {name}")
            found.set()


async def task_1(id_: str, found: Event) -> None:
    ids = ["230327-12717", "230221-28276", "230214-06090"]
    await _search(id_, ids, found, "task_1")


async def task_2(id_: str, found: Event) -> None:
    ids = ["230502-14191", "230425-17005", "230327-14434"]
    await _search(id_, ids, found, "task_2")


async def task_3(id_: str, found: Event) -> None:
    ids = ["230404-23786", "230221-25729", "230221-28276"]
    await _search(id_, ids, found, "task_3")


async def main(id_: str) -> None:
    found = Event()
    tasks = [
        create_task(task_1(id_, found)),
        create_task(task_2(id_, found)),
        create_task(task_3(id_, found)),
    ]
    await found.wait()
    for task in tasks:
        task.cancel()
    await gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    run(main("230502-14191"))

Output:

task_1 checking 230327-12717
task_2 checking 230502-14191
Found 230502-14191 in task_2
task_3 checking 230404-23786
task_1 checking 230221-28276

A few things to note:

  • Some await within the for-loop is necessary for this simple example to ensure that a context switch can happen. Otherwise the tasks will simply execute sequentially. That is why I added the await asyncio.sleep(0).
  • As you can see, even though the ID was found by task 2, both task 1 and task 3 each still had one more go. There is no way to prevent that because the event loop decides itself, which coroutine to switch to at any given moment. And await found.wait() in our main coroutine only guarantees that it will block until the event is set, not that it receives control right away. But you'll notice that none of the tasks actually finished, i.e. none of them went though all their IDs because they were cancelled before they could.
  • I am using asyncio.gather with return_exceptions=True because I do not want any of the CancelledErrors to propagate up to the main coroutine.
  • Technically you don't need the gather at all because the coroutines will be cancelled regardless, but it is bad form to leave dangling un-await-ed tasks lying around. You should always await your tasks at some point. And gather seems like the easiest solution here.

PS: Returning results from asyncio.gather

If you want the task_ functions to return something, gather will collect those results for you too:

from asyncio import Event, create_task, gather, run, sleep
from collections.abc import Iterable


async def _search(id_: str, ids: Iterable[str], found: Event, name: str) -> str:
    for i in ids:
        await sleep(0)
        print(f"{name} checking {i}")
        if i == id_:
            found.set()
            return f"Found {id_} in {name}"


async def task_1(id_: str, found: Event) -> str:
    ids = ["230327-12717", "230221-28276", "230214-06090"]
    return await _search(id_, ids, found, "task_1")


async def task_2(id_: str, found: Event) -> str:
    ids = ["230502-14191", "230425-17005", "230327-14434"]
    return await _search(id_, ids, found, "task_2")


async def task_3(id_: str, found: Event) -> str:
    ids = ["230404-23786", "230221-25729", "230221-28276"]
    return await _search(id_, ids, found, "task_3")


async def main(id_: str) -> None:
    found = Event()
    tasks = [
        create_task(task_1(id_, found)),
        create_task(task_2(id_, found)),
        create_task(task_3(id_, found)),
    ]
    await found.wait()
    for task in tasks:
        task.cancel()
    results = await gather(*tasks, return_exceptions=True)
    for result in results:
        if not isinstance(result, Exception):
            print(result)


if __name__ == "__main__":
    run(main("230502-14191"))

PPS: Use a Queue instead of an Event

Alternatively you can set up a Queue to put the result in and simply await that result in the main coroutine. That may be a more elegant solution alltogether, but I suppose that is a matter of preference:

from asyncio import Queue, create_task, gather, run, sleep
from collections.abc import Iterable


async def _search(id_: str, ids: Iterable[str], q: Queue, name: str) -> None:
    for i in ids:
        await sleep(0)
        print(f"{name} checking {i}")
        if i == id_:
            q.put_nowait(f"Found {id_} in {name}")


async def task_1(id_: str, q: Queue) -> None:
    ids = ["230327-12717", "230221-28276", "230214-06090"]
    await _search(id_, ids, q, "task_1")


async def task_2(id_: str, q: Queue) -> None:
    ids = ["230502-14191", "230425-17005", "230327-14434"]
    await _search(id_, ids, q, "task_2")


async def task_3(id_: str, q: Queue) -> None:
    ids = ["230404-23786", "230221-25729", "230221-28276"]
    await _search(id_, ids, q, "task_3")


async def main(id_: str) -> None:
    result_queue = Queue()
    tasks = [
        create_task(task_1(id_, result_queue)),
        create_task(task_2(id_, result_queue)),
        create_task(task_3(id_, result_queue)),
    ]
    result = await result_queue.get()
    for task in tasks:
        task.cancel()
    await gather(*tasks, return_exceptions=True)
    print(result)


if __name__ == "__main__":
    run(main("230502-14191"))

Awaiting Queue.get will block until there is an item in the queue, then return that item. So this accomplishes essentially the same as the Event from before, but it also gives you the result right away.

You should still clean up (i.e. cancel and await) the other tasks though.

huangapple
  • 本文由 发表于 2023年5月13日 21:41:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/76243026.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定