英文:
How to get the first result from async tasks which is not None
问题
Here is the translated code portion:
我有一个情况,需要在异步函数中停止任务,条件是匹配 `task_1`、`task_2` 或 `task_3`,同时还要检查结果不是 `None`,如果结果是 `None`,那么我希望继续执行,直到函数得到第一个不是 `None` 的结果。以下是我目前拥有的最小可重现代码:
import asyncio
async def task_1(_id):
_ids = ["230327-12717", "230221-28276", "230214-06090"]
for i in _ids:
if i == _id:
return f"在task_1中找到了 {_id}"
async def task_2(_id):
_ids = ["230502-14191", "230425-17005", "230327-14434"]
for i in _ids:
if i == _id:
return f"在task_2中找到了 {_id}"
async def task_3(_id):
_ids = ["230404-23786", "230221-25729", "230221-28276"]
for i in _ids:
if i == _id:
return f"在task_3中找到了 {_id}"
async def main(_id):
tasks = [task_1(_id), task_2(_id), task_3(_id)]
finished, unfinished = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
print(finished)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main("230502-14191"))
请检查代码是否正确,并查看输出是否符合您的预期。
英文:
I have a situation where I need to stop the tasks in the async function where it matches the condition whether in task_1
or task_2
or task_3
and also check that result is not None
if the results is None
then I'd like to proceed till the function gets the first result which is not None
. Here is the minimal reproducible code I have so far:
import asyncio
async def task_1(_id):
_ids = ["230327-12717", "230221-28276", "230214-06090"]
for i in _ids:
if i == _id:
return f"Found {_id} in task_1"
async def task_2(_id):
_ids = ["230502-14191", "230425-17005", "230327-14434"]
for i in _ids:
if i == _id:
return f"Found {_id} in task_2"
async def task_3(_id):
_ids = ["230404-23786", "230221-25729", "230221-28276"]
for i in _ids:
if i == _id:
return f"Found {_id} in task_3"
async def main(_id):
tasks = [task_1(_id), task_2(_id), task_3(_id)]
finished, unfinished = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
print(finished)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main("230502-14191"))
which gives output:
{<Task finished name='Task-2' coro=<task_2() done, defined at /home/zerox/pipedrive_CRM/minimal.py:13> result='Found 230502-14191 in task_2'>, <Task finished name='Task-3' coro=<task_3() done, defined at /home/zerox/pipedrive_CRM/minimal.py:21> result=None>, <Task finished name='Task-4' coro=<task_1() done, defined at /home/zerox/pipedrive_CRM/minimal.py:5> result=None>}
And my expected output should be something like this:
{<Task finished name='Task-2' coro=<task_2() done, defined at /home/zerox/pipedrive_CRM/minimal.py:13> result='Found 230502-14191 in task_2'>}
How can I achieve that?
答案1
得分: 1
这似乎是一个 XY 问题,所以我会试着解释你实际想要达到的目的。
每当你需要协调多个任务的时候,最好从可用的同步原语开始。
在这种情况下,你希望对你特定的 ID 进行搜索,在其中一个任务中找到后,停止所有任务,这时候一个 Event
对象会很有用。主协程安排了“搜索”任务,然后简单地等待事件被设置。所有任务都携带对该事件对象的引用,一旦有任务找到了 ID,它就会设置该事件。主协程然后取消所有任务并清理。
演示代码:
from asyncio import Event, create_task, gather, run, sleep
from collections.abc import Iterable
async def _search(id_: str, ids: Iterable[str], found: Event, name: str) -> None:
for i in ids:
await sleep(0)
print(f"{name} checking {i}")
if i == id_:
print(f"Found {id_} in {name}")
found.set()
async def task_1(id_: str, found: Event) -> None:
ids = ["230327-12717", "230221-28276", "230214-06090"]
await _search(id_, ids, found, "task_1")
async def task_2(id_: str, found: Event) -> None:
ids = ["230502-14191", "230425-17005", "230327-14434"]
await _search(id_, ids, found, "task_2")
async def task_3(id_: str, found: Event) -> None:
ids = ["230404-23786", "230221-25729", "230221-28276"]
await _search(id_, ids, found, "task_3")
async def main(id_: str) -> None:
found = Event()
tasks = [
create_task(task_1(id_, found)),
create_task(task_2(id_, found)),
create_task(task_3(id_, found)),
]
await found.wait()
for task in tasks:
task.cancel()
await gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
run(main("230502-14191"))
输出:
task_1 checking 230327-12717
task_2 checking 230502-14191
Found 230502-14191 in task_2
task_3 checking 230404-23786
task_1 checking 230221-28276
需要注意的一些事项:
- 在
for
循环内部必须有一些await
以确保发生上下文切换。否则任务将会按顺序执行。这就是为什么我加了await asyncio.sleep(0)
的原因。 - 如你所见,即使 ID 是由任务 2 找到的,任务 1 和任务 3 仍然各自进行了一次检查。没有办法阻止这个,因为事件循环自己决定了在任何给定时刻要切换到哪个协程。在我们的
main
协程中的await found.wait()
只保证它将阻塞直到事件被设置,而不保证它会立即接管控制。但你会注意到,没有一个任务实际上完成,也就是说,在它们被取消之前,它们都没有经历完它们所有的 ID。 - 我在
asyncio.gather
中使用了return_exceptions=True
,因为我不想让任何CancelledError
传播到main
协程。 - 严格来说,你根本不需要
gather
,因为协程将被取消,但让悬空的未等待的任务存在是不好的做法。你应该在某个时候await
你的任务。gather
在这里似乎是最简单的解决方案。
英文:
This seems like an XY Problem, so I'll try and interpret what you actually want to accomplish.
Whenever you are dealing with multiple tasks that need to be coordinated in some way, it is a good idea to start with the available synchronization primitives.
In this case, you want the search for your specific ID to stop in all tasks, as soon as possible, after it has been found in one of them.
An Event
object can be very useful for such a situation. The main coroutine schedules the "searching" tasks and then simply waits for the event to be set. The tasks all carry a reference to that event object and once the ID is found by one of the tasks, it sets the event. The main coroutine then simple cancels all the tasks and cleans up.
Demo:
from asyncio import Event, create_task, gather, run, sleep
from collections.abc import Iterable
async def _search(id_: str, ids: Iterable[str], found: Event, name: str) -> None:
for i in ids:
await sleep(0)
print(f"{name} checking {i}")
if i == id_:
print(f"Found {id_} in {name}")
found.set()
async def task_1(id_: str, found: Event) -> None:
ids = ["230327-12717", "230221-28276", "230214-06090"]
await _search(id_, ids, found, "task_1")
async def task_2(id_: str, found: Event) -> None:
ids = ["230502-14191", "230425-17005", "230327-14434"]
await _search(id_, ids, found, "task_2")
async def task_3(id_: str, found: Event) -> None:
ids = ["230404-23786", "230221-25729", "230221-28276"]
await _search(id_, ids, found, "task_3")
async def main(id_: str) -> None:
found = Event()
tasks = [
create_task(task_1(id_, found)),
create_task(task_2(id_, found)),
create_task(task_3(id_, found)),
]
await found.wait()
for task in tasks:
task.cancel()
await gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
run(main("230502-14191"))
Output:
task_1 checking 230327-12717
task_2 checking 230502-14191
Found 230502-14191 in task_2
task_3 checking 230404-23786
task_1 checking 230221-28276
A few things to note:
- Some
await
within thefor
-loop is necessary for this simple example to ensure that a context switch can happen. Otherwise the tasks will simply execute sequentially. That is why I added theawait asyncio.sleep(0)
. - As you can see, even though the ID was found by task 2, both task 1 and task 3 each still had one more go. There is no way to prevent that because the event loop decides itself, which coroutine to switch to at any given moment. And
await found.wait()
in ourmain
coroutine only guarantees that it will block until the event is set, not that it receives control right away. But you'll notice that none of the tasks actually finished, i.e. none of them went though all their IDs because they were cancelled before they could. - I am using
asyncio.gather
withreturn_exceptions=True
because I do not want any of theCancelledError
s to propagate up to themain
coroutine. - Technically you don't need the
gather
at all because the coroutines will be cancelled regardless, but it is bad form to leave dangling un-await
-ed tasks lying around. You should alwaysawait
your tasks at some point. Andgather
seems like the easiest solution here.
PS: Returning results from asyncio.gather
If you want the task_
functions to return something, gather
will collect those results for you too:
from asyncio import Event, create_task, gather, run, sleep
from collections.abc import Iterable
async def _search(id_: str, ids: Iterable[str], found: Event, name: str) -> str:
for i in ids:
await sleep(0)
print(f"{name} checking {i}")
if i == id_:
found.set()
return f"Found {id_} in {name}"
async def task_1(id_: str, found: Event) -> str:
ids = ["230327-12717", "230221-28276", "230214-06090"]
return await _search(id_, ids, found, "task_1")
async def task_2(id_: str, found: Event) -> str:
ids = ["230502-14191", "230425-17005", "230327-14434"]
return await _search(id_, ids, found, "task_2")
async def task_3(id_: str, found: Event) -> str:
ids = ["230404-23786", "230221-25729", "230221-28276"]
return await _search(id_, ids, found, "task_3")
async def main(id_: str) -> None:
found = Event()
tasks = [
create_task(task_1(id_, found)),
create_task(task_2(id_, found)),
create_task(task_3(id_, found)),
]
await found.wait()
for task in tasks:
task.cancel()
results = await gather(*tasks, return_exceptions=True)
for result in results:
if not isinstance(result, Exception):
print(result)
if __name__ == "__main__":
run(main("230502-14191"))
PPS: Use a Queue
instead of an Event
Alternatively you can set up a Queue
to put the result in and simply await that result in the main
coroutine. That may be a more elegant solution alltogether, but I suppose that is a matter of preference:
from asyncio import Queue, create_task, gather, run, sleep
from collections.abc import Iterable
async def _search(id_: str, ids: Iterable[str], q: Queue, name: str) -> None:
for i in ids:
await sleep(0)
print(f"{name} checking {i}")
if i == id_:
q.put_nowait(f"Found {id_} in {name}")
async def task_1(id_: str, q: Queue) -> None:
ids = ["230327-12717", "230221-28276", "230214-06090"]
await _search(id_, ids, q, "task_1")
async def task_2(id_: str, q: Queue) -> None:
ids = ["230502-14191", "230425-17005", "230327-14434"]
await _search(id_, ids, q, "task_2")
async def task_3(id_: str, q: Queue) -> None:
ids = ["230404-23786", "230221-25729", "230221-28276"]
await _search(id_, ids, q, "task_3")
async def main(id_: str) -> None:
result_queue = Queue()
tasks = [
create_task(task_1(id_, result_queue)),
create_task(task_2(id_, result_queue)),
create_task(task_3(id_, result_queue)),
]
result = await result_queue.get()
for task in tasks:
task.cancel()
await gather(*tasks, return_exceptions=True)
print(result)
if __name__ == "__main__":
run(main("230502-14191"))
Awaiting Queue.get
will block until there is an item in the queue, then return that item. So this accomplishes essentially the same as the Event
from before, but it also gives you the result right away.
You should still clean up (i.e. cancel and await) the other tasks though.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论