如何创建一个自动设置编解码器(类型转换器)的 asyncpg 连接池?

huangapple go评论75阅读模式
英文:

How to create an asyncpg pool that sets codecs (type converters) automatically?

问题

以下是您要翻译的内容:

More specifically, I need to use hstore, json, and jsonb values. There are built-in codecs for this, but they must be registered on the connection explicitly, as described in https://magicstack.github.io/asyncpg/current/usage.html#example-decoding-hstore-values

import asyncpg
import asyncio

async def run():
    conn = await asyncpg.connect()
    # Assuming the hstore extension exists in the public schema.
    await conn.set_builtin_type_codec(
        'hstore', codec_name='pg_contrib.hstore')
    result = await conn.fetchval("SELECT 'a=>1,b=>2,c=>NULL'::hstore")
    assert result == {'a': '1', 'b': '2', 'c': None}

asyncio.get_event_loop().run_until_complete(run())

Since I need these codecs everywhere in my application, I want to create a pool that automatically sets these codecs when a new connection is created.

But I'm not sure how to do this. I can see that connection_class can be given for create_pool. In other words, I can create a pool that uses a custom connection class, for example:

async def create_my_pool(pool_config):
    return await asyncpg.create_pool(connection_class=MyDbConnection, **pool_config)

But it does not solve the problem because I cannot call set_builtin_type_codec from MyDbConnection.__init__, simply because set_builtin_type_codec is an async method, and the constructor of my custom MyDbConnection is not async.

I might be able to write my own async context manager that wraps the one that is returned by asyncpg.create_pool, but then I need to keep track of each connection and set the codecs on them only if they have not been set previously. It looks very clumsy.

Since this problem (e.g. automatically setting codecs) seems to be a very common requirement to me, there must be a better way to do this, I just can't figure it out.

So this is my question: how to create an asyncpg pool that sets various codecs automatically whenever it creates a new connection?

英文:

More specifically, I need to use hstore, json an jsonb values. There are built-in codecs for this, but they must be registered on the connection explicitly, as described in https://magicstack.github.io/asyncpg/current/usage.html#example-decoding-hstore-values

import asyncpg
import asyncio

async def run():
    conn = await asyncpg.connect()
    # Assuming the hstore extension exists in the public schema.
    await conn.set_builtin_type_codec(
        'hstore', codec_name='pg_contrib.hstore')
    result = await conn.fetchval("SELECT 'a=>1,b=>2,c=>NULL'::hstore")
    assert result == {'a': '1', 'b': '2', 'c': None}

asyncio.get_event_loop().run_until_complete(run())

Since I need these codec everwhere in my application, I want to create a pool that automatically sets these codecs when a new connection is created.

But I'm not sure how to do this. I can see that connection_class can be given for create_pool. In other words, I can create a pool that uses a custom connection class, for example:

async def create_my_pool(pool_config):
    return await asyncpg.create_pool(connection_class=MyDbConnection, **pool_config)

But it does not solve the problem, because I cannot call set_builtin_type_codec from MyDbConnection.__init__, simply because set_builtin_type_codec is an async method, and the constructor of my custom MyDbConnection is not async.

I might be able to write my own async context manager that wraps the one that is returned by asyncpg.create_pool, but then I need to keep track of each connection and set the codecs on them only if they have not been set previously. It looks very clumsy.

Since this problem (e.g. automatically setting codecs) seems to be a very common requirement to me, there must be a better way to do this, I just can't figure it out.

So this is my question: how to create an asyncpg pool that sets various codecs automatically, whenever if creates a new connection?

答案1

得分: 1

首先,我们创建自己的连接类,其中包含一个编解码器安装程序:

import copy
from contextlib import asynccontextmanager

import asyncpg
import asyncpg.pool

class MyConnection(asyncpg.Connection):
    _codecs_installed = False

    async def _install_codecs(self):
        if not self._codecs_installed:
            await self.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore')
            # 在这里安装更多编解码器...
            self._codecs_installed = True

关键点是,连接可以被池重复使用(由池的用户重新获取),但编解码器在每个连接中仅安装一次。

然后,我们创建一个包装器,模仿 asyncpg.Pool(具有一个获取方法),但会要求连接安装编解码器(如果尚未安装):

class Db:
    def __init__(self, db_config):
        self._db_config = db_config
        self._pool = None

    async def initialize(self):
        self._pool = await asyncpg.pool.create_pool(connection_class=MyConnection, **self._db_config)

    @asynccontextmanager
    async def acquire(self, timeout=None) -> MyConnection:
        async with self._pool.acquire(timeout=timeout) as conn:
            await conn._install_codecs()
            yield conn

这样做有些笨拙,因为在生产代码中,您需要包装所有 Pool 方法,并在 asyncpg.Pool 方法的签名更改时重新包装它们。请注意,无法简单地子类化 asyncpg.Pool,因为 asyncpg.Pool.acquire 不是异步方法。

最后,定义一个 create_pool 函数,应该在 asyncpg.create_pool 之后使用:

async def create_pool(**db_config):
    db = Db(db_config=db_config)
    await db.initialize()
    return db

这个解决方案实际上是有效的,但我仍然认为应该有更好的方法。

英文:

First, we create our own connection class, that has a codec installer:

import copy
from contextlib import asynccontextmanager

import asyncpg
import asyncpg.pool

class MyConnection(asyncpg.Connection):
    _codecs_installed = False

    async def _install_codecs(self):
        if not self._codecs_installed:
            await self.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore')
            # install more codecs here...
            self._codecs_installed = True

The key point is that a connection can be reused by the pool (re-acquired by the users of the pool), but the codecs are installed once per connection.

Then we create a wrapper that mimics asyncpg.Pool (has an acquire method), but asks the connection to install the codecs (if they are not already installed):

class Db:
    def __init__(self, db_config):
        self._db_config = db_config
        self._pool = None

    async def initialize(self):
        self._pool = await asyncpg.pool.create_pool(connection_class=MyConnection, **self._db_config)

    @asynccontextmanager
    async def acquire(self, timeout=None) -> MyConnection:
        async with self._pool.acquire(timeout=timeout) as conn:
            await conn._install_codecs()
            yield conn

This is clumsy, because in production code, you need to wrap all Pool methods, and re-wrap them whenever the signature(s) of asyncpg.Pool methods are changed. Please note that it is not possible to simply subclass asyncpg.Pool, because asyncpg.Pool.acquire is NOT an async method.

And finally, define a create_pool function that should be used instead of asyncpg.create_pool:

async def create_pool(**db_config):
    db = Db(db_config=db_config)
    await db.initialize()
    return db

This solution actually works, but I still think that there must be a better way.

huangapple
  • 本文由 发表于 2023年6月11日 20:44:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76450527.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定