CancelledError异常处理程序未触发。

huangapple go评论67阅读模式
英文:

CancelledError exception handler not triggering

问题

我的异常处理程序对于简单情况触发正常,但在使用多个任务或asyncio.gather时未触发。使用asyncio.gather和多个子任务的情况下,CancelledError处理程序未触发的原因是,asyncio.gather在遇到第一个取消的子任务后,会立即取消所有其他子任务,并且只引发一个CancelledError异常。

输出:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray

预期输出:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
cancelled love! - This message should be shown when task is cancelled
Ending: love
英文:

My exception handler for tasks being cancelled seems to trigger for simple cases, but not when using multiple tasks or asyncio.gather.

Sample code:

import asyncio


async def sleep_func(statement, time, sabotage=False):
    print(f"Starting: {statement}")
    try:
        if sabotage:
            tasks = [
                asyncio.sleep(1000),
                asyncio.sleep(1000),
                asyncio.sleep(1000),
            ]
            await asyncio.gather(*tasks)
        await asyncio.sleep(time)
    except asyncio.CancelledError as e:
        print(f"cancelled {statement}! - {str(e)}")
    except Exception as e:
        print(f"Unhandled exception - {str(e)}")
    print(f"Ending: {statement}")


async def main():
    calls = [
        asyncio.ensure_future(sleep_func("eat", 3)),
        asyncio.ensure_future(sleep_func("pray", 8)),
        asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
    ]
    print("starting!")
    finished, unfinished = await asyncio.wait(calls, timeout=6)
    for task in unfinished:
        task.cancel("This message should be shown when task is cancelled")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

I have three tasks in my main function:

[
    asyncio.ensure_future(sleep_func("eat", 3)),
    asyncio.ensure_future(sleep_func("pray", 8)),
    asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
]

Expected:

  • My first task is not cancelled (this works)

  • My second task is cancelled and prints the message in the
    CancelledError handler correctly (this works)

  • My second task is cancelled and prints the message in the
    CancelledError handler correctly (this does NOT work)

Why is the CancelledError handler not triggering for the last task, which uses asyncio.gather and has a bunch of sub-tasks?

Output:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray

Expected output:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
cancelled love! - This message should be shown when task is cancelled
Ending: love

答案1

得分: 2

你没有看到love任务被取消的原因是你没有等待它

在取消for循环结束时,任务还没有被取消,它们只是在取消中,即正在取消的过程中。

调用Task.cancel方法不会立即取消任务。引用文档(加粗是我自己的强调),它

安排在下一个事件循环周期中向封装的协程抛出CancelledError异常。

为了允许下一个事件循环周期实际开始,你需要一个await表达式。这将允许发生到其中一个协程的上下文切换,从而实际引发CancelledError

但是你的main协程在没有另一个await的情况下结束,因此在事件循环中没有可能发生上下文切换。这就是为什么在Task.cancel文档下的示例代码片段中,要取消的任务在末尾是被等待的

因此,获取所需输出的最简单方法是在你的for循环中task.cancel(...)下面添加await task

...

async def main():
    calls = [
        asyncio.ensure_future(sleep_func("eat", 3)),
        asyncio.ensure_future(sleep_func("pray", 8)),
        asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
    ]
    print("starting!")
    finished, unfinished = await asyncio.wait(calls, timeout=6)
    for task in unfinished:
        task.cancel("This message should be shown when task is cancelled")
        await task  # <---

关于为什么你调用cancel的两个任务中的一个实际上设法将CancelledError发送到其协程,我不知道run_until_complete方法的内部工作原理,但怀疑它确实允许在返回之前进行另一个上下文切换。但我会推测这只是一些实现细节,而且不可靠(正如你的示例所示),尤其是在取消的任务数量增加时。

顺便说一下,由于其已被弃用,你可能不应再使用loop = asyncio.get_event_loop()的模式。通过asyncio.run运行异步的main函数是运行的规范方式。

此外,当有多个要取消的任务时,通常的模式是在请求取消后使用asyncio.gather

最后,如评论中所述,最好的做法是在协程中重新引发捕获到的CancelledError。然后,通过在未完成的取消任务中简单地将return_exceptions=True传递给asyncio.gather,你可以避免在main函数中“进一步上链”。

因此,对你的示例代码的建议更改如下:

import asyncio

async def sleep_func(statement, time, sabotage=False):
    print(f"Starting: {statement}")
    try:
        if sabotage:
            tasks = [
                asyncio.sleep(1000),
                asyncio.sleep(1000),
                asyncio.sleep(1000),
            ]
            await asyncio.gather(*tasks)
        await asyncio.sleep(time)
    except asyncio.CancelledError as e:
        print(f"cancelled {statement}! - {str(e)}")
        raise e  # <---
    except Exception as e:
        print(f"Unhandled exception - {str(e)}")
    finally:  # <---
        print(f"Ending: {statement}")

async def main():
    calls = [
        asyncio.ensure_future(sleep_func("eat", 3)),
        asyncio.ensure_future(sleep_func("pray", 8)),
        asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
    ]
    print("starting!")
    finished, unfinished = await asyncio.wait(calls, timeout=6)
    for task in unfinished:
        task.cancel("This message should be shown when task is cancelled")
    await asyncio.gather(*unfinished, return_exceptions=True)  # <---

if __name__ == "__main__":
    asyncio.run(main())  # <---

输出:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
cancelled love! - This message should be shown when task is cancelled
Ending: love

PS: 从Python 3.11开始,我们有asyncio.TaskGroup类,为这种情况提供了一个方便的上下文管理器。利用这一点,我们可以像这样编写main函数:

...

async def main():
    print("starting!")
    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(sleep_func("eat", 3)),
            tg.create_task(sleep_func("pray", 8)),
            tg.create_task(sleep_func("love", 10, sabotage=True)),
        ]
        finished, unfinished = await asyncio.wait(tasks, timeout=6)
        for task in unfinished:
            task.cancel("This message should be shown when task is cancelled")

注意在这里我们不需要自己等待任务,因为

所有任务在上下文管理器退出时都会被等待。

而且我们也不需要担心传播的CancelledError,因为它不会在任务组上下文之外“泄漏”出来。

英文:

The reason you are not seeing the cancellation of the love task is that you are not awaiting it.

By the time you get to the end of your cancellation for-loop, the tasks have not been cancelled, they are just cancelling, i.e. in the process of being cancelled.

Calling the Task.cancel method does not immediately cancel the task. To quote the docs (with my own emphasis), it

> arranges for a CancelledError exception to be thrown into the wrapped coroutine on the next cycle of the event loop.

To allow the next event loop cycle to actually commence, you need an await expression. That will allow a context switch to one of the couroutines to happen and the CancelledError to actually be raised there.

But your main coroutine ends without another await, thus without the possibility of a context switch by the event loop. This is why in the example code snippet under the Task.cancel documentation, the tasks that is to be cancelled is awaited at the end.

Thus, the simplest way to get your desired output would be to simply add await task right below task.cancel(...) in your for-loop:

...

async def main():
    calls = [
        asyncio.ensure_future(sleep_func(&quot;eat&quot;, 3)),
        asyncio.ensure_future(sleep_func(&quot;pray&quot;, 8)),
        asyncio.ensure_future(sleep_func(&quot;love&quot;, 10, sabotage=True)),
    ]
    print(&quot;starting!&quot;)
    finished, unfinished = await asyncio.wait(calls, timeout=6)
    for task in unfinished:
        task.cancel(&quot;This message should be shown when task is cancelled&quot;)
        await task  # &lt;---

As to why one of the two tasks that you called cancel on actually managed to send the CancelledError to its coroutine, I don't know the internals of the loop's run_until_complete method well enough, but suspect that it does allow for another context switch inside it before returning. But I would speculate that this is just an implementation detail and not reliable at all (as demonstrated by your example), even less so, when the number of cancelled tasks increases.


As an aside, you should probably not be using the loop = asyncio.get_event_loop() pattern anymore due to its deprecation. The canonical way of running your asynchronous main function is via asyncio.run.

Also, when you have multiple tasks to be cancelled, the pattern is usually to asyncio.gather them after requesting cancellation.

Lastly, as mentioned in a comment, it is best practice to re-raise a caught CancelledError in a coroutine. You can then avoid it "further up the chain" in your main function by simply passing return_exceptions=True to asyncio.gather for the unfinished cancelled tasks.

So the changes I would suggest to your example code would look as follows:

import asyncio

async def sleep_func(statement, time, sabotage=False):
    print(f&quot;Starting: {statement}&quot;)
    try:
        if sabotage:
            tasks = [
                asyncio.sleep(1000),
                asyncio.sleep(1000),
                asyncio.sleep(1000),
            ]
            await asyncio.gather(*tasks)
        await asyncio.sleep(time)
    except asyncio.CancelledError as e:
        print(f&quot;cancelled {statement}! - {str(e)}&quot;)
        raise e  # &lt;---
    except Exception as e:
        print(f&quot;Unhandled exception - {str(e)}&quot;)
    finally:  # &lt;---
        print(f&quot;Ending: {statement}&quot;)

async def main():
    calls = [
        asyncio.ensure_future(sleep_func(&quot;eat&quot;, 3)),
        asyncio.ensure_future(sleep_func(&quot;pray&quot;, 8)),
        asyncio.ensure_future(sleep_func(&quot;love&quot;, 10, sabotage=True)),
    ]
    print(&quot;starting!&quot;)
    finished, unfinished = await asyncio.wait(calls, timeout=6)
    for task in unfinished:
        task.cancel(&quot;This message should be shown when task is cancelled&quot;)
    await asyncio.gather(*unfinished, return_exceptions=True)  # &lt;---

if __name__ == &quot;__main__&quot;:
    asyncio.run(main())  # &lt;---

Output:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
cancelled love! - This message should be shown when task is cancelled
Ending: love

PS: Starting with Python 3.11 we have the asyncio.TaskGroup class that provides a convenient context manager for such cases. Utilizing that we could write the main function like this:

...

async def main():
    print(&quot;starting!&quot;)
    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(sleep_func(&quot;eat&quot;, 3)),
            tg.create_task(sleep_func(&quot;pray&quot;, 8)),
            tg.create_task(sleep_func(&quot;love&quot;, 10, sabotage=True)),
        ]
        finished, unfinished = await asyncio.wait(tasks, timeout=6)
        for task in unfinished:
            task.cancel(&quot;This message should be shown when task is cancelled&quot;)

Notice how do not need to await the tasks ourselves here because

> all tasks are awaited when the context manager exits.

And we also don't need to worry about the propagated CancelledError because that will not "leak" out of the task group context.

答案2

得分: 0

问题

这里的问题似乎是,没有足够的时间让收集的任务实际上通过取消并将CancellationError抛出到包装的try_except_sleeper中。

cancel仅请求取消并立即返回 文档

解决方案

在你的main函数的最后一行添加 await asyncio.wait(unfinished, timeout=1)。这样协程将有足够的时间处理取消请求,输出将为:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - 当任务被取消时应显示此消息
cancelled love! - 

顺便说一句:你可以看到取消消息未传递到包装器 - 我不知道为什么会这样,可能是个bug。

英文:

Problem

The problem here seems to be that there is no time for the gathered tasks to actually get through with the cancellation and throw the CancellationError to the wrapping try_except_sleeper

cancel only requests the cancellation and returns immediately docs

Solution

Add await asyncio.wait(unfinished, timeout=1) to the last line of your main function. That way the coroutines will have enough time to process the cancelation request and the output will be:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
cancelled love! - 

As a side note: You see the cancellation message has not been passed to the wrapper - I don't know why that is, might be a bug.

huangapple
  • 本文由 发表于 2023年4月20日 03:40:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76058246.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定