英文:
How to create an asyncpg pool that sets codecs (type converters) automatically?
问题
以下是您要翻译的内容:
More specifically, I need to use hstore
, json
, and jsonb
values. There are built-in codecs for this, but they must be registered on the connection explicitly, as described in https://magicstack.github.io/asyncpg/current/usage.html#example-decoding-hstore-values
import asyncpg
import asyncio
async def run():
conn = await asyncpg.connect()
# Assuming the hstore extension exists in the public schema.
await conn.set_builtin_type_codec(
'hstore', codec_name='pg_contrib.hstore')
result = await conn.fetchval("SELECT 'a=>1,b=>2,c=>NULL'::hstore")
assert result == {'a': '1', 'b': '2', 'c': None}
asyncio.get_event_loop().run_until_complete(run())
Since I need these codecs everywhere in my application, I want to create a pool that automatically sets these codecs when a new connection is created.
But I'm not sure how to do this. I can see that connection_class
can be given for create_pool
. In other words, I can create a pool that uses a custom connection class, for example:
async def create_my_pool(pool_config):
return await asyncpg.create_pool(connection_class=MyDbConnection, **pool_config)
But it does not solve the problem because I cannot call set_builtin_type_codec
from MyDbConnection.__init__
, simply because set_builtin_type_codec
is an async method, and the constructor of my custom MyDbConnection
is not async.
I might be able to write my own async context manager that wraps the one that is returned by asyncpg.create_pool
, but then I need to keep track of each connection and set the codecs on them only if they have not been set previously. It looks very clumsy.
Since this problem (e.g. automatically setting codecs) seems to be a very common requirement to me, there must be a better way to do this, I just can't figure it out.
So this is my question: how to create an asyncpg pool that sets various codecs automatically whenever it creates a new connection?
英文:
More specifically, I need to use hstore
, json
an jsonb
values. There are built-in codecs for this, but they must be registered on the connection explicitly, as described in https://magicstack.github.io/asyncpg/current/usage.html#example-decoding-hstore-values
import asyncpg
import asyncio
async def run():
conn = await asyncpg.connect()
# Assuming the hstore extension exists in the public schema.
await conn.set_builtin_type_codec(
'hstore', codec_name='pg_contrib.hstore')
result = await conn.fetchval("SELECT 'a=>1,b=>2,c=>NULL'::hstore")
assert result == {'a': '1', 'b': '2', 'c': None}
asyncio.get_event_loop().run_until_complete(run())
Since I need these codec everwhere in my application, I want to create a pool that automatically sets these codecs when a new connection is created.
But I'm not sure how to do this. I can see that connection_class
can be given for create_pool
. In other words, I can create a pool that uses a custom connection class, for example:
async def create_my_pool(pool_config):
return await asyncpg.create_pool(connection_class=MyDbConnection, **pool_config)
But it does not solve the problem, because I cannot call set_builtin_type_codec
from MyDbConnection.__init__
, simply because set_builtin_type_codec
is an async method, and the constructor of my custom MyDbConnection
is not async.
I might be able to write my own async context manager that wraps the one that is returned by asyncpg.create_pool
, but then I need to keep track of each connection and set the codecs on them only if they have not been set previously. It looks very clumsy.
Since this problem (e.g. automatically setting codecs) seems to be a very common requirement to me, there must be a better way to do this, I just can't figure it out.
So this is my question: how to create an asyncpg pool that sets various codecs automatically, whenever if creates a new connection?
答案1
得分: 1
首先,我们创建自己的连接类,其中包含一个编解码器安装程序:
import copy
from contextlib import asynccontextmanager
import asyncpg
import asyncpg.pool
class MyConnection(asyncpg.Connection):
_codecs_installed = False
async def _install_codecs(self):
if not self._codecs_installed:
await self.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore')
# 在这里安装更多编解码器...
self._codecs_installed = True
关键点是,连接可以被池重复使用(由池的用户重新获取),但编解码器在每个连接中仅安装一次。
然后,我们创建一个包装器,模仿 asyncpg.Pool
(具有一个获取方法),但会要求连接安装编解码器(如果尚未安装):
class Db:
def __init__(self, db_config):
self._db_config = db_config
self._pool = None
async def initialize(self):
self._pool = await asyncpg.pool.create_pool(connection_class=MyConnection, **self._db_config)
@asynccontextmanager
async def acquire(self, timeout=None) -> MyConnection:
async with self._pool.acquire(timeout=timeout) as conn:
await conn._install_codecs()
yield conn
这样做有些笨拙,因为在生产代码中,您需要包装所有 Pool
方法,并在 asyncpg.Pool
方法的签名更改时重新包装它们。请注意,无法简单地子类化 asyncpg.Pool
,因为 asyncpg.Pool.acquire
不是异步方法。
最后,定义一个 create_pool
函数,应该在 asyncpg.create_pool
之后使用:
async def create_pool(**db_config):
db = Db(db_config=db_config)
await db.initialize()
return db
这个解决方案实际上是有效的,但我仍然认为应该有更好的方法。
英文:
First, we create our own connection class, that has a codec installer:
import copy
from contextlib import asynccontextmanager
import asyncpg
import asyncpg.pool
class MyConnection(asyncpg.Connection):
_codecs_installed = False
async def _install_codecs(self):
if not self._codecs_installed:
await self.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore')
# install more codecs here...
self._codecs_installed = True
The key point is that a connection can be reused by the pool (re-acquired by the users of the pool), but the codecs are installed once per connection.
Then we create a wrapper that mimics asyncpg.Pool
(has an acquire method), but asks the connection to install the codecs (if they are not already installed):
class Db:
def __init__(self, db_config):
self._db_config = db_config
self._pool = None
async def initialize(self):
self._pool = await asyncpg.pool.create_pool(connection_class=MyConnection, **self._db_config)
@asynccontextmanager
async def acquire(self, timeout=None) -> MyConnection:
async with self._pool.acquire(timeout=timeout) as conn:
await conn._install_codecs()
yield conn
This is clumsy, because in production code, you need to wrap all Pool
methods, and re-wrap them whenever the signature(s) of asyncpg.Pool
methods are changed. Please note that it is not possible to simply subclass asyncpg.Pool, because asyncpg.Pool.acquire
is NOT an async method.
And finally, define a create_pool
function that should be used instead of asyncpg.create_pool
:
async def create_pool(**db_config):
db = Db(db_config=db_config)
await db.initialize()
return db
This solution actually works, but I still think that there must be a better way.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论