英文:
Async [TCP] writer close to avoid resource leaks
问题
我正在使用asyncio实现一个TCP客户端[Streams](https://docs.python.org/3/library/asyncio-stream.html#asyncio-streams)。 典型的示例代码如下:
```python
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
...
writer.close()
await writer.wait_closed()
...
在我的情况下是一个非平凡的async/await代码片段。为了防止资源泄漏(例如文件描述符),我认为我需要调用close()
函数,因此我真的应该将它放在try/finally块中。这样正确吗,还是Python在异步循环结束时会像“完成异步生成器”一样神奇地处理资源清理?
如果不需要手动清理,是否有比定义一个新函数并使用@contextlib.asynccontextmanager更加规范/Pythonic的方法来实现这一点?
我尝试过contextlib.closing()
和contextlib.aclosing()
但那不起作用,因为asyncio.open_connection()
返回一个元组,而writer
没有aclose()
,只有close()
。
<details>
<summary>英文:</summary>
I'm implementing a TCP client with asyncio [Streams](https://docs.python.org/3/library/asyncio-stream.html#asyncio-streams). Typical example code is:
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
...
writer.close()
await writer.wait_closed()
The `...` is a non-trivial piece of async/await code in my case. To prevent resource leaks (e.g. fds), I believe I need to call that `close()` function, so I really should put it inside a try/finally. Is this correct or Python somehow magically handles resource cleanup, like it _finalizes asynchronous generators_, when async loop ends?
If not and manual cleanup is required, is there a more canonical/Pythonic way to implement this than defining a new function with [@contextlib.asynccontextmanager](https://docs.python.org/3/library/contextlib.html#contextlib.asynccontextmanager)?
I tried `contextlib.closing()` and `contextlib.aclosing()` but that doesn't work, since `asyncio.open_connection()` returns tuple and writer doesn't have `aclose()`, just `close()`.
</details>
# 答案1
**得分**: 2
你在完成时必须始终关闭流。
你可以使用 `try/finally`:
```python
async def main_try_catch():
writer: Optional[StreamWriter] = None
try:
print("打开")
reader, writer = await asyncio.open_connection("www.google.com", 443, ssl=True)
print("已打开")
data = await reader.readline()
print(f"接收到: {data.decode()!r}")
except:
print("捕获到异常")
finally:
if writer is not None:
print("关闭")
writer.close()
await writer.wait_closed()
print("已关闭")
或者你可以使用 @contextlib.asynccontextmanager
:
@contextlib.asynccontextmanager
async def main_context_manager() -> Tuple[StreamReader, StreamWriter]:
# __aenter__
print("打开")
reader, writer = await asyncio.open_connection("www.google.com", 443, ssl=True)
yield reader, writer
# __aexit__
if writer is not None:
print("关闭")
writer.close()
await writer.wait_closed()
print("已关闭")
然后像这样使用:
import asyncio
import contextlib
from asyncio import StreamReader, StreamWriter
from typing import Optional, Tuple
async def main():
# await main_try_catch()
# 或者
try:
async with main_context_manager() as (reader, writer):
print("已打开")
data = await reader.readline()
print(f"接收到: {data.decode()!r}")
except:
print("捕获到异常")
if __name__ == '__main__':
asyncio.run(main())
# 成功情况
# 打开
# 已打开
# 接收到: '...'
# 关闭
# 已关闭
# 错误情况
# 打开
# 捕获到异常
英文:
You always have to close streams when done.
You can use try/finally
async def main_try_catch():
writer: Optional[StreamWriter] = None
try:
print("Opening")
reader, writer = await asyncio.open_connection("www.google.com", 443, ssl=True)
print("Opened")
data = await reader.readline()
print(f"Received: {data.decode()!r}")
except:
print("Caught exception")
finally:
if writer is not None:
print("Closing")
writer.close()
await writer.wait_closed()
print("Closed")
Or you can use @contextlib.asynccontextmanager
:
@contextlib.asynccontextmanager
async def main_context_manager() -> Tuple[StreamReader, StreamWriter]:
# __aenter__
print("Opening")
reader, writer = await asyncio.open_connection("www.google.com", 443, ssl=True)
yield reader, writer
# __aexit__
if writer is not None:
print("Closing")
writer.close()
await writer.wait_closed()
print("Closed")
And use it like:
import asyncio
import contextlib
from asyncio import StreamReader, StreamWriter
from typing import Optional, Tuple
async def main():
# await main_try_catch()
# OR
try:
async with main_context_manager() as (reader, writer):
print("Opened")
data = await reader.readline()
print(f"Received: {data.decode()!r}")
except:
print("Caught exception")
if __name__ == '__main__':
asyncio.run(main())
# SUCCESS CASE
# Opening
# Opened
# Received: '...'
# Closing
# Closed
# ERROR CASE
# Opening
# Caught exception
答案2
得分: 0
感谢有帮助的答案,最终我采用了以下实现,其他人可能会发现有用(通用参数和在上下文管理器中使用try/finally以确保清理):
import asyncio
import contextlib
from typing import Tuple
@contextlib.asynccontextmanager
async def open_connection(*args, **kwargs) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
writer = None
try:
reader, writer = await asyncio.open_connection(*args, **kwargs)
yield reader, writer
finally:
if writer is not None:
writer.close()
await writer.wait_closed()
用法示例:
async with open_connection("www.google.com", 443, ssl=True) as (reader, writer):
...
英文:
Thanks to the helpful answer, I went with the following implementation in the end, which someone else might find useful (generic args and try/finally in context manager to ensure cleanup):
import asyncio
import contextlib
from typing import Tuple
@contextlib.asynccontextmanager
async def open_connection(*args, **kwargs) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
writer = None
try:
reader, writer = await asyncio.open_connection(*args, **kwargs)
yield reader, writer
finally:
if writer is not None:
writer.close()
await writer.wait_closed()
Used as:
async with open_connection("www.google.com", 443, ssl=True) as (reader, writer):
...
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论