英文:
CancelledError exception handler not triggering
问题
我的异常处理程序对于简单情况触发正常,但在使用多个任务或asyncio.gather
时未触发。使用asyncio.gather
和多个子任务的情况下,CancelledError
处理程序未触发的原因是,asyncio.gather
在遇到第一个取消的子任务后,会立即取消所有其他子任务,并且只引发一个CancelledError
异常。
输出:
starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
预期输出:
starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
cancelled love! - This message should be shown when task is cancelled
Ending: love
英文:
My exception handler for tasks being cancelled seems to trigger for simple cases, but not when using multiple tasks or asyncio.gather
.
Sample code:
import asyncio
async def sleep_func(statement, time, sabotage=False):
print(f"Starting: {statement}")
try:
if sabotage:
tasks = [
asyncio.sleep(1000),
asyncio.sleep(1000),
asyncio.sleep(1000),
]
await asyncio.gather(*tasks)
await asyncio.sleep(time)
except asyncio.CancelledError as e:
print(f"cancelled {statement}! - {str(e)}")
except Exception as e:
print(f"Unhandled exception - {str(e)}")
print(f"Ending: {statement}")
async def main():
calls = [
asyncio.ensure_future(sleep_func("eat", 3)),
asyncio.ensure_future(sleep_func("pray", 8)),
asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
]
print("starting!")
finished, unfinished = await asyncio.wait(calls, timeout=6)
for task in unfinished:
task.cancel("This message should be shown when task is cancelled")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
I have three tasks in my main function:
[
asyncio.ensure_future(sleep_func("eat", 3)),
asyncio.ensure_future(sleep_func("pray", 8)),
asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
]
Expected:
-
My first task is not cancelled (this works)
-
My second task is cancelled and prints the message in the
CancelledError
handler correctly (this works) -
My second task is cancelled and prints the message in the
CancelledError
handler correctly (this does NOT work)
Why is the CancelledError
handler not triggering for the last task, which uses asyncio.gather
and has a bunch of sub-tasks?
Output:
starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
Expected output:
starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
cancelled love! - This message should be shown when task is cancelled
Ending: love
答案1
得分: 2
你没有看到love
任务被取消的原因是你没有等待它。
在取消for
循环结束时,任务还没有被取消,它们只是在取消中,即正在取消的过程中。
调用Task.cancel
方法不会立即取消任务。引用文档(加粗是我自己的强调),它
安排在下一个事件循环周期中向封装的协程抛出
CancelledError
异常。
为了允许下一个事件循环周期实际开始,你需要一个await
表达式。这将允许发生到其中一个协程的上下文切换,从而实际引发CancelledError
。
但是你的main
协程在没有另一个await
的情况下结束,因此在事件循环中没有可能发生上下文切换。这就是为什么在Task.cancel
文档下的示例代码片段中,要取消的任务在末尾是被等待的。
因此,获取所需输出的最简单方法是在你的for
循环中task.cancel(...)
下面添加await task
:
...
async def main():
calls = [
asyncio.ensure_future(sleep_func("eat", 3)),
asyncio.ensure_future(sleep_func("pray", 8)),
asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
]
print("starting!")
finished, unfinished = await asyncio.wait(calls, timeout=6)
for task in unfinished:
task.cancel("This message should be shown when task is cancelled")
await task # <---
关于为什么你调用cancel
的两个任务中的一个实际上设法将CancelledError
发送到其协程,我不知道run_until_complete
方法的内部工作原理,但怀疑它确实允许在返回之前进行另一个上下文切换。但我会推测这只是一些实现细节,而且不可靠(正如你的示例所示),尤其是在取消的任务数量增加时。
顺便说一下,由于其已被弃用,你可能不应再使用loop = asyncio.get_event_loop()
的模式。通过asyncio.run
运行异步的main
函数是运行的规范方式。
此外,当有多个要取消的任务时,通常的模式是在请求取消后使用asyncio.gather
。
最后,如评论中所述,最好的做法是在协程中重新引发捕获到的CancelledError
。然后,通过在未完成的取消任务中简单地将return_exceptions=True
传递给asyncio.gather
,你可以避免在main
函数中“进一步上链”。
因此,对你的示例代码的建议更改如下:
import asyncio
async def sleep_func(statement, time, sabotage=False):
print(f"Starting: {statement}")
try:
if sabotage:
tasks = [
asyncio.sleep(1000),
asyncio.sleep(1000),
asyncio.sleep(1000),
]
await asyncio.gather(*tasks)
await asyncio.sleep(time)
except asyncio.CancelledError as e:
print(f"cancelled {statement}! - {str(e)}")
raise e # <---
except Exception as e:
print(f"Unhandled exception - {str(e)}")
finally: # <---
print(f"Ending: {statement}")
async def main():
calls = [
asyncio.ensure_future(sleep_func("eat", 3)),
asyncio.ensure_future(sleep_func("pray", 8)),
asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
]
print("starting!")
finished, unfinished = await asyncio.wait(calls, timeout=6)
for task in unfinished:
task.cancel("This message should be shown when task is cancelled")
await asyncio.gather(*unfinished, return_exceptions=True) # <---
if __name__ == "__main__":
asyncio.run(main()) # <---
输出:
starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
cancelled love! - This message should be shown when task is cancelled
Ending: love
PS: 从Python 3.11
开始,我们有asyncio.TaskGroup
类,为这种情况提供了一个方便的上下文管理器。利用这一点,我们可以像这样编写main
函数:
...
async def main():
print("starting!")
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(sleep_func("eat", 3)),
tg.create_task(sleep_func("pray", 8)),
tg.create_task(sleep_func("love", 10, sabotage=True)),
]
finished, unfinished = await asyncio.wait(tasks, timeout=6)
for task in unfinished:
task.cancel("This message should be shown when task is cancelled")
注意在这里我们不需要自己等待任务,因为
所有任务在上下文管理器退出时都会被等待。
而且我们也不需要担心传播的CancelledError
,因为它不会在任务组上下文之外“泄漏”出来。
英文:
The reason you are not seeing the cancellation of the love
task is that you are not awaiting it.
By the time you get to the end of your cancellation for
-loop, the tasks have not been cancelled, they are just cancelling, i.e. in the process of being cancelled.
Calling the Task.cancel
method does not immediately cancel the task. To quote the docs (with my own emphasis), it
> arranges for a CancelledError
exception to be thrown into the wrapped coroutine on the next cycle of the event loop.
To allow the next event loop cycle to actually commence, you need an await
expression. That will allow a context switch to one of the couroutines to happen and the CancelledError
to actually be raised there.
But your main
coroutine ends without another await
, thus without the possibility of a context switch by the event loop. This is why in the example code snippet under the Task.cancel
documentation, the tasks that is to be cancelled is awaited at the end.
Thus, the simplest way to get your desired output would be to simply add await task
right below task.cancel(...)
in your for
-loop:
...
async def main():
calls = [
asyncio.ensure_future(sleep_func("eat", 3)),
asyncio.ensure_future(sleep_func("pray", 8)),
asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
]
print("starting!")
finished, unfinished = await asyncio.wait(calls, timeout=6)
for task in unfinished:
task.cancel("This message should be shown when task is cancelled")
await task # <---
As to why one of the two tasks that you called cancel
on actually managed to send the CancelledError
to its coroutine, I don't know the internals of the loop's run_until_complete
method well enough, but suspect that it does allow for another context switch inside it before returning. But I would speculate that this is just an implementation detail and not reliable at all (as demonstrated by your example), even less so, when the number of cancelled tasks increases.
As an aside, you should probably not be using the loop = asyncio.get_event_loop()
pattern anymore due to its deprecation. The canonical way of running your asynchronous main
function is via asyncio.run
.
Also, when you have multiple tasks to be cancelled, the pattern is usually to asyncio.gather
them after requesting cancellation.
Lastly, as mentioned in a comment, it is best practice to re-raise a caught CancelledError
in a coroutine. You can then avoid it "further up the chain" in your main
function by simply passing return_exceptions=True
to asyncio.gather
for the unfinished cancelled tasks.
So the changes I would suggest to your example code would look as follows:
import asyncio
async def sleep_func(statement, time, sabotage=False):
print(f"Starting: {statement}")
try:
if sabotage:
tasks = [
asyncio.sleep(1000),
asyncio.sleep(1000),
asyncio.sleep(1000),
]
await asyncio.gather(*tasks)
await asyncio.sleep(time)
except asyncio.CancelledError as e:
print(f"cancelled {statement}! - {str(e)}")
raise e # <---
except Exception as e:
print(f"Unhandled exception - {str(e)}")
finally: # <---
print(f"Ending: {statement}")
async def main():
calls = [
asyncio.ensure_future(sleep_func("eat", 3)),
asyncio.ensure_future(sleep_func("pray", 8)),
asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
]
print("starting!")
finished, unfinished = await asyncio.wait(calls, timeout=6)
for task in unfinished:
task.cancel("This message should be shown when task is cancelled")
await asyncio.gather(*unfinished, return_exceptions=True) # <---
if __name__ == "__main__":
asyncio.run(main()) # <---
Output:
starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
cancelled love! - This message should be shown when task is cancelled
Ending: love
PS: Starting with Python 3.11
we have the asyncio.TaskGroup
class that provides a convenient context manager for such cases. Utilizing that we could write the main
function like this:
...
async def main():
print("starting!")
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(sleep_func("eat", 3)),
tg.create_task(sleep_func("pray", 8)),
tg.create_task(sleep_func("love", 10, sabotage=True)),
]
finished, unfinished = await asyncio.wait(tasks, timeout=6)
for task in unfinished:
task.cancel("This message should be shown when task is cancelled")
Notice how do not need to await the tasks ourselves here because
> all tasks are awaited when the context manager exits.
And we also don't need to worry about the propagated CancelledError
because that will not "leak" out of the task group context.
答案2
得分: 0
问题
这里的问题似乎是,没有足够的时间让收集的任务实际上通过取消并将CancellationError
抛出到包装的try_except_sleeper
中。
cancel
仅请求取消并立即返回 文档
解决方案
在你的main
函数的最后一行添加 await asyncio.wait(unfinished, timeout=1)
。这样协程将有足够的时间处理取消请求,输出将为:
starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - 当任务被取消时应显示此消息
cancelled love! -
顺便说一句:你可以看到取消消息未传递到包装器 - 我不知道为什么会这样,可能是个bug。
英文:
Problem
The problem here seems to be that there is no time for the gathered tasks to actually get through with the cancellation and throw the CancellationError to the wrapping try_except_sleeper
cancel
only requests the cancellation and returns immediately docs
Solution
Add await asyncio.wait(unfinished, timeout=1)
to the last line of your main
function. That way the coroutines will have enough time to process the cancelation request and the output will be:
starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
cancelled love! -
As a side note: You see the cancellation message has not been passed to the wrapper - I don't know why that is, might be a bug.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论