错误: “无法执行操作:另一个操作正在进行” 仅在从pytest/TestClient运行时发生

huangapple go评论60阅读模式
英文:

Error: "cannot perform operation: another operation is in progress" only when running from pytest/TestClient

问题

这个问题看起来可能与异步事件循环的管理有关。在非上下文管理模式下,事件循环可能在某些情况下未正确初始化,导致"Event loop is closed"错误。使用TestClient作为上下文管理器可以确保正确初始化事件循环。

根据你提供的信息,使用以下代码进行测试似乎可以正常工作:

with TestClient(app) as client:
    response = client.get("/test")
    print(response.status_code, response.content)

    response = client.get("/test")
    print(response.status_code, response.content)

这个工作方式可能是因为上下文管理器确保了事件循环的正确初始化和清理。至于为什么在非上下文管理模式下会出现问题,可能需要深入研究 FastAPI 和 asyncio 的内部工作机制来找出确切的原因。

总之,使用上下文管理器似乎是一个可行的解决方案,确保了正确的事件循环管理,而不会引发"Event loop is closed"错误。

英文:

This is coming up while trying to run unit tests on a simple FastAPI application using asyncpg. Minimal reproducer:

import asyncpg


class Database:
    def __init__(self, dsn):
        self.dsn = dsn
        self.pool = None

    # creates connection pool
    async def connect(self):
        if not self.pool:
            self.pool = await asyncpg.create_pool(
                min_size=1,
                max_size=200,
                max_inactive_connection_lifetime=10
                command_timeout=60,
                dsn=self.dsn
            )

    async def fetch(self, query: str, *args):
        if not self.pool:
            await self.connect()

        async with self.pool.acquire() as conn:
            prep_stmt = await conn.prepare(query)
            result = await prep_stmt.fetch(*args)

        return result

    async def close(self):
        if self.pool:
            await self.pool.close()

Here's the app:

from fastapi import FastAPI, HTTPException, status

from app.database import Database

app = FastAPI(title="simple app")
db = Database("postgresql://postgres:postgres@10.201.0.110/postgres")


@app.on_event("startup")
async def startup_event():
    await db.connect()


@app.on_event("shutdown")
async def shutdown_event():
    await db.close()


@app.get("/test")
async def get_test():

    try:
        result = await db.fetch("SELECT 1 as foo")
    except Exception as e:
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                            detail=f"Got exception: {e}")

    return result

Now this works perfectly when tested from the command line, eg:

$ curl http://127.0.0.1:8000/test && echo && curl http://127.0.0.1:8000/test && echo 
[{"foo":1}]
[{"foo":1}]

but the second request fails when the call is made using TestClient inside pytest:

from fastapi.testclient import TestClient

from app.app import app

client = TestClient(app)


def test_get():

    response = client.get("/test")
    print(response.content)
    assert response.status_code == 200
    response = client.get("/test")
    print(response.content)
    assert response.status_code == 200

here's the output:

$ pytest -v -s tests/
======================================================= test session starts =======================================================
platform linux -- Python 3.10.11, pytest-7.3.1, pluggy-1.0.0 -- /usr/local/bin/python
cachedir: .pytest_cache
rootdir: /tmp/myapp
plugins: anyio-3.7.0
collected 1 item                                                                                                                  

tests/test_api.py::test_get b'[{"foo":1}]'
b'{"detail":"Got exception: cannot perform operation: another operation is in progress"}'
FAILED

============================================================ FAILURES =============================================================
____________________________________________________________ test_get _____________________________________________________________

    def test_get():
    
        response = client.get("/test")
        print(response.content)
        assert response.status_code == 200
        response = client.get("/test")
        print(response.content)
>       assert response.status_code == 200
E       assert 500 == 200
E        +  where 500 = <Response [500 Internal Server Error]>.status_code

tests/test_api.py:15: AssertionError
===================================================== short test summary info =====================================================
FAILED tests/test_api.py::test_get - assert 500 == 200
======================================================== 1 failed in 0.28s ========================================================
Future exception was never retrieved
future: <Future finished exception=ConnectionDoesNotExistError('connection was closed in the middle of operation')>
asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation

I can't figure out why, only with pytest/TestClient, I'm getting that cannot perform operation: another operation is in progress response for the second request.

EDIT: turns out this is not related to pytest, only to the usage of TestClient. The following code is already enough to trigger the problem:

from fastapi.testclient import TestClient

from app.app import app

client = TestClient(app)

response = client.get("/test")
print(response.status_code, response.content)
response = client.get("/test")
print(response.status_code, response.content)

EDIT 2: To see the full traceback, I just re-raised the exception in the app and I get this for the second - failing - request:

Traceback (most recent call last):
  File "/tmp/myapp/app/database.py", line 27, in fetch
    prep_stmt = await conn.prepare(query)
  File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 565, in prepare
    return await self._prepare(
  File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 583, in _prepare
    stmt = await self._get_statement(
  File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 397, in _get_statement
    statement = await self._protocol.prepare(
  File "asyncpg/protocol/protocol.pyx", line 156, in prepare
  File "asyncpg/protocol/protocol.pyx", line 741, in asyncpg.protocol.protocol.BaseProtocol._new_waiter
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 721, in call_later
    timer = self.call_at(self.time() + delay, callback, *args,
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 732, in call_at
    self._check_closed()
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmp/myapp/simpletest.py", line 9, in <module>
    response = client.get("/test")
  File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 499, in get
    return super().get(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1041, in get
    return self.request(
  File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 465, in request
    return super().request(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 814, in request
    return self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 901, in send
    response = self._send_handling_auth(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 929, in _send_handling_auth
    response = self._send_handling_redirects(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 966, in _send_handling_redirects
    response = self._send_single_request(request)
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1002, in _send_single_request
    response = transport.handle_request(request)
  File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 342, in handle_request
    raise exc
  File "/usr/local/lib/python3.10/site-packages/starlette/testclient.py", line 339, in handle_request
    portal.call(self.app, scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 277, in call
    return cast(T_Retval, self.start_task_soon(func, *args).result())
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 217, in _call_func
    retval = await retval
  File "/usr/local/lib/python3.10/site-packages/fastapi/applications.py", line 276, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
    response = await func(request)
  File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 237, in app
    raw_response = await run_endpoint_function(
  File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 163, in run_endpoint_function
    return await dependant.call(**values)
  File "/tmp/myapp/app/app.py", line 27, in get_test
    result = await db.fetch("SELECT 1 as foo")
  File "/tmp/myapp/app/database.py", line 26, in fetch
    async with self.pool.acquire() as conn:
  File "/usr/local/lib/python3.10/site-packages/asyncpg/pool.py", line 220, in release
    raise ex
  File "/usr/local/lib/python3.10/site-packages/asyncpg/pool.py", line 210, in release
    await self._con.reset(timeout=budget)
  File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 1366, in reset
    await self.execute(reset_query, timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/asyncpg/connection.py", line 317, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 323, in query
  File "asyncpg/protocol/protocol.pyx", line 707, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

So it looks like the real problem is that Event loop is closed error.

EDIT 3: Restructuring the testing code as follows works fine:

with TestClient(app) as client:
    response = client.get("/test")
    print(response.status_code, response.content)

    response = client.get("/test")
    print(response.status_code, response.content)

Still not sure where the original problem is, but at least now I have a workaround.

According to FastAPI documentation, using TestClient as a context manager invokes the startup and shutdown events; that seems to be the only difference. I'm still unclear why it does not work in the "normal" (ie non context-manager) mode, as the function Database.fetch() creates the pool anyway if it does not exist, so in principle there should be no practical difference compared to really executing the startup db.connect() event. Obviously, that is not true, otherwise I wouldn't be seeing the error.

答案1

得分: 1

谢谢你提供了MRE,这非常好。


pytest和curl之间有什么不同?

pytest具有requests网络客户端池,因此它可以保持TCP连接处于打开状态并重复使用它在第二个请求上。相比之下,每个curl在单独的子进程中运行,因此我们明显正在关闭旧的TCP连接并创建一个全新的连接。


而不是

        async with self.pool.acquire() as conn:

我认为你想要这个:

        async with self.pool.acquire() as conn:
            async with conn.transaction():

这应该有两个有益的结果:

  • 完成事务,以便下一个请求进入新的事务。
  • 释放从刚刚读取的表上的读取器锁。

当例如DBA想要DROP或ALTER TABLE时,悬挂的读取器锁会变得非常明显,因为排他锁与读取器锁不兼容。


在分配准备好的语句时,看起来你有机会缓存这些查询。你的真正应用程序可能只有少数几个不同的查询,然后通过params参数进行替换的WHERE子句过滤器。 (希望没有f" WHERE x = {x}"字符串插值,这可能会打开可能的注入攻击的大门。)

英文:

Thank you for the
MRE,
that's very nice.


What's different between pytest and the curls?

Well, pytest has a requests web client pool,
so it can hold a TCP connection open and reuse it
on the second request.
In contrast each curl runs in a separate child process,
so we're clearly tearing down the old TCP connection
and creating a brand new one.


Instead of

        async with self.pool.acquire() as conn:

I think you want this:

        async with self.pool.acquire() as conn:
            async with conn.transaction():

This should have two beneficial results:

  • finish the transaction so next request goes into a new transaction
  • release reader locks on the table(s) your SELECT just read from

Lingering reader locks become very noticeable when for example
a DBA wants to DROP or ALTER TABLE, since an exclusive lock is not compatible with a reader lock.


When assigning the prepared statement,
it looks like you have an opportunity to
cache
such queries. Your real app probably only
has a handful of distinct queries,
and then WHERE clause filters get
substituted in via a params argument.
(Hopefully there's no f" WHERE x = {x}"
string interpolation, which would open
the door to possible
injection attacks.

huangapple
  • 本文由 发表于 2023年5月30日 03:43:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/76359971.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定