英文:
Error: "cannot perform operation: another operation is in progress" only when running from pytest/TestClient
问题
这个问题看起来可能与异步事件循环的管理有关。在非上下文管理模式下,事件循环可能在某些情况下未正确初始化,导致"Event loop is closed"错误。使用TestClient
作为上下文管理器可以确保正确初始化事件循环。
根据你提供的信息,使用以下代码进行测试似乎可以正常工作:
with TestClient(app) as client:
response = client.get("/test")
print(response.status_code, response.content)
response = client.get("/test")
print(response.status_code, response.content)
这个工作方式可能是因为上下文管理器确保了事件循环的正确初始化和清理。至于为什么在非上下文管理模式下会出现问题,可能需要深入研究 FastAPI 和 asyncio 的内部工作机制来找出确切的原因。
总之,使用上下文管理器似乎是一个可行的解决方案,确保了正确的事件循环管理,而不会引发"Event loop is closed"错误。
英文:
This is coming up while trying to run unit tests on a simple FastAPI application using asyncpg. Minimal reproducer:
import asyncpg
class Database:
def __init__(self, dsn):
self.dsn = dsn
self.pool = None
# creates connection pool
async def connect(self):
if not self.pool:
self.pool = await asyncpg.create_pool(
min_size=1,
max_size=200,
max_inactive_connection_lifetime=10
command_timeout=60,
dsn=self.dsn
)
async def fetch(self, query: str, *args):
if not self.pool:
await self.connect()
async with self.pool.acquire() as conn:
prep_stmt = await conn.prepare(query)
result = await prep_stmt.fetch(*args)
return result
async def close(self):
if self.pool:
await self.pool.close()
Here's the app:
from fastapi import FastAPI, HTTPException, status
from app.database import Database
app = FastAPI(title="simple app")
db = Database("postgresql://postgres:postgres@10.201.0.110/postgres")
@app.on_event("startup")
async def startup_event():
await db.connect()
@app.on_event("shutdown")
async def shutdown_event():
await db.close()
@app.get("/test")
async def get_test():
try:
result = await db.fetch("SELECT 1 as foo")
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Got exception: {e}")
return result
Now this works perfectly when tested from the command line, eg:
$ curl http://127.0.0.1:8000/test && echo && curl http://127.0.0.1:8000/test && echo
[{"foo":1}]
[{"foo":1}]
but the second request fails when the call is made using TestClient inside pytest:
from fastapi.testclient import TestClient
from app.app import app
client = TestClient(app)
def test_get():
response = client.get("/test")
print(response.content)
assert response.status_code == 200
response = client.get("/test")
print(response.content)
assert response.status_code == 200
here's the output:
$ pytest -v -s tests/
======================================================= test session starts =======================================================
platform linux -- Python 3.10.11, pytest-7.3.1, pluggy-1.0.0 -- /usr/local/bin/python
cachedir: .pytest_cache
rootdir: /tmp/myapp
plugins: anyio-3.7.0
collected 1 item
tests/test_api.py::test_get b'[{"foo":1}]'
b'{"detail":"Got exception: cannot perform operation: another operation is in progress"}'
FAILED
============================================================ FAILURES =============================================================
____________________________________________________________ test_get _____________________________________________________________
def test_get():
response = client.get("/test")
print(response.content)
assert response.status_code == 200
response = client.get("/test")
print(response.content)
> assert response.status_code == 200
E assert 500 == 200
E + where 500 = <Response [500 Internal Server Error]>.status_code
tests/test_api.py:15: AssertionError
===================================================== short test summary info =====================================================
FAILED tests/test_api.py::test_get - assert 500 == 200
======================================================== 1 failed in 0.28s ========================================================
Future exception was never retrieved
future: <Future finished exception=ConnectionDoesNotExistError('connection was closed in the middle of operation')>
asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation
I can't figure out why, only with pytest/TestClient, I'm getting that cannot perform operation: another operation is in progress
response for the second request.
EDIT: turns out this is not related to pytest, only to the usage of TestClient
. The following code is already enough to trigger the problem:
from fastapi.testclient import TestClient
from app.app import app
client = TestClient(app)
response = client.get("/test")
print(response.status_code, response.content)
response = client.get("/test")
print(response.status_code, response.content)
EDIT 2: To see the full traceback, I just re-raised the exception in the app and I get this for the second - failing - request:
Traceback (most recent call last):
File "/tmp/myapp/app/database.py", line 27, in fetch
prep_stmt = await conn.prepare(query)
File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 565, in prepare
return await self._prepare(
File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 583, in _prepare
stmt = await self._get_statement(
File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 397, in _get_statement
statement = await self._protocol.prepare(
File "asyncpg/protocol/protocol.pyx", line 156, in prepare
File "asyncpg/protocol/protocol.pyx", line 741, in asyncpg.protocol.protocol.BaseProtocol._new_waiter
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 721, in call_later
timer = self.call_at(self.time() + delay, callback, *args,
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 732, in call_at
self._check_closed()
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/myapp/simpletest.py", line 9, in <module>
response = client.get("/test")
File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 499, in get
return super().get(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1041, in get
return self.request(
File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 465, in request
return super().request(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 814, in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 901, in send
response = self._send_handling_auth(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 929, in _send_handling_auth
response = self._send_handling_redirects(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 966, in _send_handling_redirects
response = self._send_single_request(request)
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1002, in _send_single_request
response = transport.handle_request(request)
File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 342, in handle_request
raise exc
File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 339, in handle_request
portal.call(self.app, scope, receive, send)
File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 277, in call
return cast(T_Retval, self.start_task_soon(func, *args).result())
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
return self.__get_result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 217, in _call_func
retval = await retval
File "/usr/local/lib/python3.10/site-packages/fastapi/applications.py", line 276, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.10/site-packages/starlette/applications.py", line 122, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 718, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
response = await func(request)
File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 237, in app
raw_response = await run_endpoint_function(
File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 163, in run_endpoint_function
return await dependant.call(**values)
File "/tmp/myapp/app/app.py", line 27, in get_test
result = await db.fetch("SELECT 1 as foo")
File "/tmp/myapp/app/database.py", line 26, in fetch
async with self.pool.acquire() as conn:
File "/usr/local/lib/python3.10/site-packages/asyncpg/pool.py", line 220, in release
raise ex
File "/usr/local/lib/python3.10/site-packages/asyncpg/pool.py", line 210, in release
await self._con.reset(timeout=budget)
File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 1366, in reset
await self.execute(reset_query, timeout=timeout)
File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 317, in execute
return await self._protocol.query(query, timeout)
File "asyncpg/protocol/protocol.pyx", line 323, in query
File "asyncpg/protocol/protocol.pyx", line 707, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
So it looks like the real problem is that Event loop is closed
error.
EDIT 3: Restructuring the testing code as follows works fine:
with TestClient(app) as client:
response = client.get("/test")
print(response.status_code, response.content)
response = client.get("/test")
print(response.status_code, response.content)
Still not sure where the original problem is, but at least now I have a workaround.
According to FastAPI documentation, using TestClient
as a context manager invokes the startup and shutdown events; that seems to be the only difference. I'm still unclear why it does not work in the "normal" (ie non context-manager) mode, as the function Database.fetch()
creates the pool anyway if it does not exist, so in principle there should be no practical difference compared to really executing the startup db.connect()
event. Obviously, that is not true, otherwise I wouldn't be seeing the error.
答案1
得分: 1
谢谢你提供了MRE,这非常好。
pytest和curl之间有什么不同?
pytest具有requests
网络客户端池,因此它可以保持TCP连接处于打开状态并重复使用它在第二个请求上。相比之下,每个curl在单独的子进程中运行,因此我们明显正在关闭旧的TCP连接并创建一个全新的连接。
而不是
async with self.pool.acquire() as conn:
我认为你想要这个:
async with self.pool.acquire() as conn:
async with conn.transaction():
这应该有两个有益的结果:
- 完成事务,以便下一个请求进入新的事务。
- 释放从刚刚读取的表上的读取器锁。
当例如DBA想要DROP或ALTER TABLE时,悬挂的读取器锁会变得非常明显,因为排他锁与读取器锁不兼容。
在分配准备好的语句时,看起来你有机会缓存这些查询。你的真正应用程序可能只有少数几个不同的查询,然后通过params
参数进行替换的WHERE子句过滤器。 (希望没有f" WHERE x = {x}"
字符串插值,这可能会打开可能的注入攻击的大门。)
英文:
Thank you for the
MRE,
that's very nice.
What's different between pytest and the curls?
Well, pytest has a requests
web client pool,
so it can hold a TCP connection open and reuse it
on the second request.
In contrast each curl runs in a separate child process,
so we're clearly tearing down the old TCP connection
and creating a brand new one.
Instead of
async with self.pool.acquire() as conn:
I think you want this:
async with self.pool.acquire() as conn:
async with conn.transaction():
This should have two beneficial results:
- finish the transaction so next request goes into a new transaction
- release reader locks on the table(s) your SELECT just read from
Lingering reader locks become very noticeable when for example
a DBA wants to DROP or ALTER TABLE, since an exclusive lock is not compatible with a reader lock.
When assigning the prepared statement,
it looks like you have an opportunity to
cache
such queries. Your real app probably only
has a handful of distinct queries,
and then WHERE clause filters get
substituted in via a params
argument.
(Hopefully there's no f" WHERE x = {x}"
string interpolation, which would open
the door to possible
injection attacks.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论