异步[TCP]写入器关闭以避免资源泄漏

huangapple go评论69阅读模式
英文:

Async [TCP] writer close to avoid resource leaks

问题

我正在使用asyncio实现一个TCP客户端[Streams](https://docs.python.org/3/library/asyncio-stream.html#asyncio-streams)。 典型的示例代码如下:

```python
reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)
...
writer.close()
await writer.wait_closed()

... 在我的情况下是一个非平凡的async/await代码片段。为了防止资源泄漏(例如文件描述符),我认为我需要调用close()函数,因此我真的应该将它放在try/finally块中。这样正确吗,还是Python在异步循环结束时会像“完成异步生成器”一样神奇地处理资源清理?

如果不需要手动清理,是否有比定义一个新函数并使用@contextlib.asynccontextmanager更加规范/Pythonic的方法来实现这一点?

我尝试过contextlib.closing()contextlib.aclosing()但那不起作用,因为asyncio.open_connection()返回一个元组,而writer没有aclose(),只有close()


<details>
<summary>英文:</summary>

I&#39;m implementing a TCP client with asyncio [Streams](https://docs.python.org/3/library/asyncio-stream.html#asyncio-streams). Typical example code is:

reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
...
writer.close()
await writer.wait_closed()


The `...` is a non-trivial piece of async/await code in my case. To prevent resource leaks (e.g. fds), I believe I need to call that `close()` function, so I really should put it inside a try/finally. Is this correct or Python somehow magically handles resource cleanup, like it _finalizes asynchronous generators_, when async loop ends?

If not and manual cleanup is required, is there a more canonical/Pythonic way to implement this than defining a new function with [@contextlib.asynccontextmanager](https://docs.python.org/3/library/contextlib.html#contextlib.asynccontextmanager)?

I tried `contextlib.closing()` and `contextlib.aclosing()` but that doesn&#39;t work, since `asyncio.open_connection()` returns tuple and writer doesn&#39;t have `aclose()`, just `close()`.

</details>


# 答案1
**得分**: 2

你在完成时必须始终关闭流。

你可以使用 `try/finally`:

```python
async def main_try_catch():
    writer: Optional[StreamWriter] = None
    try:
        print("打开")
        reader, writer = await asyncio.open_connection("www.google.com", 443, ssl=True)
        print("已打开")
        data = await reader.readline()
        print(f"接收到: {data.decode()!r}")
    except:
        print("捕获到异常")
    finally:
        if writer is not None:
            print("关闭")
            writer.close()
            await writer.wait_closed()
            print("已关闭")

或者你可以使用 @contextlib.asynccontextmanager

@contextlib.asynccontextmanager
async def main_context_manager() -> Tuple[StreamReader, StreamWriter]:
    #  __aenter__
    print("打开")
    reader, writer = await asyncio.open_connection("www.google.com", 443, ssl=True)

    yield reader, writer

    #  __aexit__
    if writer is not None:
        print("关闭")
        writer.close()
        await writer.wait_closed()
        print("已关闭")

然后像这样使用:

import asyncio
import contextlib
from asyncio import StreamReader, StreamWriter
from typing import Optional, Tuple

async def main():
    # await main_try_catch()

    # 或者
    try:
        async with main_context_manager() as (reader, writer):
            print("已打开")
            data = await reader.readline()
            print(f"接收到: {data.decode()!r}")
    except:
        print("捕获到异常")


if __name__ == '__main__':
    asyncio.run(main())


# 成功情况
# 打开
# 已打开
# 接收到: '...'
# 关闭
# 已关闭

# 错误情况
# 打开
# 捕获到异常
英文:

You always have to close streams when done.

You can use try/finally

async def main_try_catch():
    writer: Optional[StreamWriter] = None
    try:
        print(&quot;Opening&quot;)
        reader, writer = await asyncio.open_connection(&quot;www.google.com&quot;, 443, ssl=True)
        print(&quot;Opened&quot;)
        data = await reader.readline()
        print(f&quot;Received: {data.decode()!r}&quot;)
    except:
        print(&quot;Caught exception&quot;)
    finally:
        if writer is not None:
            print(&quot;Closing&quot;)
            writer.close()
            await writer.wait_closed()
            print(&quot;Closed&quot;)

Or you can use @contextlib.asynccontextmanager:

@contextlib.asynccontextmanager
async def main_context_manager() -&gt; Tuple[StreamReader, StreamWriter]:
    #  __aenter__
    print(&quot;Opening&quot;)
    reader, writer = await asyncio.open_connection(&quot;www.google.com&quot;, 443, ssl=True)

    yield reader, writer

    #  __aexit__
    if writer is not None:
        print(&quot;Closing&quot;)
        writer.close()
        await writer.wait_closed()
        print(&quot;Closed&quot;)

And use it like:

import asyncio
import contextlib
from asyncio import StreamReader, StreamWriter
from typing import Optional, Tuple

async def main():
    # await main_try_catch()

    # OR
    try:
        async with main_context_manager() as (reader, writer):
            print(&quot;Opened&quot;)
            data = await reader.readline()
            print(f&quot;Received: {data.decode()!r}&quot;)
    except:
        print(&quot;Caught exception&quot;)


if __name__ == &#39;__main__&#39;:
    asyncio.run(main())


# SUCCESS CASE
# Opening
# Opened
# Received: &#39;...&#39;
# Closing
# Closed

# ERROR CASE
# Opening
# Caught exception

答案2

得分: 0

感谢有帮助的答案,最终我采用了以下实现,其他人可能会发现有用(通用参数和在上下文管理器中使用try/finally以确保清理):

import asyncio
import contextlib
from typing import Tuple

@contextlib.asynccontextmanager
async def open_connection(*args, **kwargs) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
    writer = None
    try:
        reader, writer = await asyncio.open_connection(*args, **kwargs)
        yield reader, writer
    finally:
        if writer is not None:
            writer.close()
            await writer.wait_closed()

用法示例:

async with open_connection("www.google.com", 443, ssl=True) as (reader, writer):
    ...
英文:

Thanks to the helpful answer, I went with the following implementation in the end, which someone else might find useful (generic args and try/finally in context manager to ensure cleanup):

import asyncio
import contextlib
from typing import Tuple

@contextlib.asynccontextmanager
async def open_connection(*args, **kwargs) -&gt; Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
    writer = None
    try:
        reader, writer = await asyncio.open_connection(*args, **kwargs)
        yield reader, writer
    finally:
        if writer is not None:
            writer.close()
            await writer.wait_closed()

Used as:

async with open_connection(&quot;www.google.com&quot;, 443, ssl=True) as (reader, writer):
    ...

huangapple
  • 本文由 发表于 2023年7月18日 15:18:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/76710341.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定