错误: “无法执行操作:另一个操作正在进行” 仅在从pytest/TestClient运行时发生

huangapple go评论94阅读模式
英文:

Error: "cannot perform operation: another operation is in progress" only when running from pytest/TestClient

问题

这个问题看起来可能与异步事件循环的管理有关。在非上下文管理模式下,事件循环可能在某些情况下未正确初始化,导致"Event loop is closed"错误。使用TestClient作为上下文管理器可以确保正确初始化事件循环。

根据你提供的信息,使用以下代码进行测试似乎可以正常工作:

  1. with TestClient(app) as client:
  2. response = client.get("/test")
  3. print(response.status_code, response.content)
  4. response = client.get("/test")
  5. print(response.status_code, response.content)

这个工作方式可能是因为上下文管理器确保了事件循环的正确初始化和清理。至于为什么在非上下文管理模式下会出现问题,可能需要深入研究 FastAPI 和 asyncio 的内部工作机制来找出确切的原因。

总之,使用上下文管理器似乎是一个可行的解决方案,确保了正确的事件循环管理,而不会引发"Event loop is closed"错误。

英文:

This is coming up while trying to run unit tests on a simple FastAPI application using asyncpg. Minimal reproducer:

  1. import asyncpg
  2. class Database:
  3. def __init__(self, dsn):
  4. self.dsn = dsn
  5. self.pool = None
  6. # creates connection pool
  7. async def connect(self):
  8. if not self.pool:
  9. self.pool = await asyncpg.create_pool(
  10. min_size=1,
  11. max_size=200,
  12. max_inactive_connection_lifetime=10
  13. command_timeout=60,
  14. dsn=self.dsn
  15. )
  16. async def fetch(self, query: str, *args):
  17. if not self.pool:
  18. await self.connect()
  19. async with self.pool.acquire() as conn:
  20. prep_stmt = await conn.prepare(query)
  21. result = await prep_stmt.fetch(*args)
  22. return result
  23. async def close(self):
  24. if self.pool:
  25. await self.pool.close()

Here's the app:

  1. from fastapi import FastAPI, HTTPException, status
  2. from app.database import Database
  3. app = FastAPI(title="simple app")
  4. db = Database("postgresql://postgres:postgres@10.201.0.110/postgres")
  5. @app.on_event("startup")
  6. async def startup_event():
  7. await db.connect()
  8. @app.on_event("shutdown")
  9. async def shutdown_event():
  10. await db.close()
  11. @app.get("/test")
  12. async def get_test():
  13. try:
  14. result = await db.fetch("SELECT 1 as foo")
  15. except Exception as e:
  16. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
  17. detail=f"Got exception: {e}")
  18. return result

Now this works perfectly when tested from the command line, eg:

  1. $ curl http://127.0.0.1:8000/test && echo && curl http://127.0.0.1:8000/test && echo
  2. [{"foo":1}]
  3. [{"foo":1}]

but the second request fails when the call is made using TestClient inside pytest:

  1. from fastapi.testclient import TestClient
  2. from app.app import app
  3. client = TestClient(app)
  4. def test_get():
  5. response = client.get("/test")
  6. print(response.content)
  7. assert response.status_code == 200
  8. response = client.get("/test")
  9. print(response.content)
  10. assert response.status_code == 200

here's the output:

  1. $ pytest -v -s tests/
  2. ======================================================= test session starts =======================================================
  3. platform linux -- Python 3.10.11, pytest-7.3.1, pluggy-1.0.0 -- /usr/local/bin/python
  4. cachedir: .pytest_cache
  5. rootdir: /tmp/myapp
  6. plugins: anyio-3.7.0
  7. collected 1 item
  8. tests/test_api.py::test_get b'[{"foo":1}]'
  9. b'{"detail":"Got exception: cannot perform operation: another operation is in progress"}'
  10. FAILED
  11. ============================================================ FAILURES =============================================================
  12. ____________________________________________________________ test_get _____________________________________________________________
  13. def test_get():
  14. response = client.get("/test")
  15. print(response.content)
  16. assert response.status_code == 200
  17. response = client.get("/test")
  18. print(response.content)
  19. > assert response.status_code == 200
  20. E assert 500 == 200
  21. E + where 500 = <Response [500 Internal Server Error]>.status_code
  22. tests/test_api.py:15: AssertionError
  23. ===================================================== short test summary info =====================================================
  24. FAILED tests/test_api.py::test_get - assert 500 == 200
  25. ======================================================== 1 failed in 0.28s ========================================================
  26. Future exception was never retrieved
  27. future: <Future finished exception=ConnectionDoesNotExistError('connection was closed in the middle of operation')>
  28. asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation

I can't figure out why, only with pytest/TestClient, I'm getting that cannot perform operation: another operation is in progress response for the second request.

EDIT: turns out this is not related to pytest, only to the usage of TestClient. The following code is already enough to trigger the problem:

  1. from fastapi.testclient import TestClient
  2. from app.app import app
  3. client = TestClient(app)
  4. response = client.get("/test")
  5. print(response.status_code, response.content)
  6. response = client.get("/test")
  7. print(response.status_code, response.content)

EDIT 2: To see the full traceback, I just re-raised the exception in the app and I get this for the second - failing - request:

  1. Traceback (most recent call last):
  2. File "/tmp/myapp/app/database.py", line 27, in fetch
  3. prep_stmt = await conn.prepare(query)
  4. File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 565, in prepare
  5. return await self._prepare(
  6. File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 583, in _prepare
  7. stmt = await self._get_statement(
  8. File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 397, in _get_statement
  9. statement = await self._protocol.prepare(
  10. File "asyncpg/protocol/protocol.pyx", line 156, in prepare
  11. File "asyncpg/protocol/protocol.pyx", line 741, in asyncpg.protocol.protocol.BaseProtocol._new_waiter
  12. File "/usr/local/lib/python3.10/asyncio/base_events.py", line 721, in call_later
  13. timer = self.call_at(self.time() + delay, callback, *args,
  14. File "/usr/local/lib/python3.10/asyncio/base_events.py", line 732, in call_at
  15. self._check_closed()
  16. File "/usr/local/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
  17. raise RuntimeError('Event loop is closed')
  18. RuntimeError: Event loop is closed
  19. During handling of the above exception, another exception occurred:
  20. Traceback (most recent call last):
  21. File "/tmp/myapp/simpletest.py", line 9, in <module>
  22. response = client.get("/test")
  23. File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 499, in get
  24. return super().get(
  25. File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1041, in get
  26. return self.request(
  27. File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 465, in request
  28. return super().request(
  29. File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 814, in request
  30. return self.send(request, auth=auth, follow_redirects=follow_redirects)
  31. File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 901, in send
  32. response = self._send_handling_auth(
  33. File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 929, in _send_handling_auth
  34. response = self._send_handling_redirects(
  35. File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 966, in _send_handling_redirects
  36. response = self._send_single_request(request)
  37. File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1002, in _send_single_request
  38. response = transport.handle_request(request)
  39. File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 342, in handle_request
  40. raise exc
  41. File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 339, in handle_request
  42. portal.call(self.app, scope, receive, send)
  43. File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 277, in call
  44. return cast(T_Retval, self.start_task_soon(func, *args).result())
  45. File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
  46. return self.__get_result()
  47. File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
  48. raise self._exception
  49. File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 217, in _call_func
  50. retval = await retval
  51. File "/usr/local/lib/python3.10/site-packages/fastapi/applications.py", line 276, in __call__
  52. await super().__call__(scope, receive, send)
  53. File "/usr/local/lib/python3.10/site-packages/starlette/applications.py", line 122, in __call__
  54. await self.middleware_stack(scope, receive, send)
  55. File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
  56. raise exc
  57. File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
  58. await self.app(scope, receive, _send)
  59. File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
  60. raise exc
  61. File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
  62. await self.app(scope, receive, sender)
  63. File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
  64. raise e
  65. File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
  66. await self.app(scope, receive, send)
  67. File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 718, in __call__
  68. await route.handle(scope, receive, send)
  69. File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
  70. await self.app(scope, receive, send)
  71. File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
  72. response = await func(request)
  73. File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 237, in app
  74. raw_response = await run_endpoint_function(
  75. File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 163, in run_endpoint_function
  76. return await dependant.call(**values)
  77. File "/tmp/myapp/app/app.py", line 27, in get_test
  78. result = await db.fetch("SELECT 1 as foo")
  79. File "/tmp/myapp/app/database.py", line 26, in fetch
  80. async with self.pool.acquire() as conn:
  81. File "/usr/local/lib/python3.10/site-packages/asyncpg/pool.py", line 220, in release
  82. raise ex
  83. File "/usr/local/lib/python3.10/site-packages/asyncpg/pool.py", line 210, in release
  84. await self._con.reset(timeout=budget)
  85. File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 1366, in reset
  86. await self.execute(reset_query, timeout=timeout)
  87. File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 317, in execute
  88. return await self._protocol.query(query, timeout)
  89. File "asyncpg/protocol/protocol.pyx", line 323, in query
  90. File "asyncpg/protocol/protocol.pyx", line 707, in asyncpg.protocol.protocol.BaseProtocol._check_state
  91. asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

So it looks like the real problem is that Event loop is closed error.

EDIT 3: Restructuring the testing code as follows works fine:

  1. with TestClient(app) as client:
  2. response = client.get("/test")
  3. print(response.status_code, response.content)
  4. response = client.get("/test")
  5. print(response.status_code, response.content)

Still not sure where the original problem is, but at least now I have a workaround.

According to FastAPI documentation, using TestClient as a context manager invokes the startup and shutdown events; that seems to be the only difference. I'm still unclear why it does not work in the "normal" (ie non context-manager) mode, as the function Database.fetch() creates the pool anyway if it does not exist, so in principle there should be no practical difference compared to really executing the startup db.connect() event. Obviously, that is not true, otherwise I wouldn't be seeing the error.

答案1

得分: 1

谢谢你提供了MRE,这非常好。


pytest和curl之间有什么不同?

pytest具有requests网络客户端池,因此它可以保持TCP连接处于打开状态并重复使用它在第二个请求上。相比之下,每个curl在单独的子进程中运行,因此我们明显正在关闭旧的TCP连接并创建一个全新的连接。


而不是

  1. async with self.pool.acquire() as conn:

我认为你想要这个:

  1. async with self.pool.acquire() as conn:
  2. async with conn.transaction():

这应该有两个有益的结果:

  • 完成事务,以便下一个请求进入新的事务。
  • 释放从刚刚读取的表上的读取器锁。

当例如DBA想要DROP或ALTER TABLE时,悬挂的读取器锁会变得非常明显,因为排他锁与读取器锁不兼容。


在分配准备好的语句时,看起来你有机会缓存这些查询。你的真正应用程序可能只有少数几个不同的查询,然后通过params参数进行替换的WHERE子句过滤器。 (希望没有f" WHERE x = {x}"字符串插值,这可能会打开可能的注入攻击的大门。)

英文:

Thank you for the
MRE,
that's very nice.


What's different between pytest and the curls?

Well, pytest has a requests web client pool,
so it can hold a TCP connection open and reuse it
on the second request.
In contrast each curl runs in a separate child process,
so we're clearly tearing down the old TCP connection
and creating a brand new one.


Instead of

  1. async with self.pool.acquire() as conn:

I think you want this:

  1. async with self.pool.acquire() as conn:
  2. async with conn.transaction():

This should have two beneficial results:

  • finish the transaction so next request goes into a new transaction
  • release reader locks on the table(s) your SELECT just read from

Lingering reader locks become very noticeable when for example
a DBA wants to DROP or ALTER TABLE, since an exclusive lock is not compatible with a reader lock.


When assigning the prepared statement,
it looks like you have an opportunity to
cache
such queries. Your real app probably only
has a handful of distinct queries,
and then WHERE clause filters get
substituted in via a params argument.
(Hopefully there's no f" WHERE x = {x}"
string interpolation, which would open
the door to possible
injection attacks.

huangapple
  • 本文由 发表于 2023年5月30日 03:43:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/76359971.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定