如何使用SQLAlchemy创建异步生成器?

huangapple go评论69阅读模式
英文:

How to make async generator with SQLAlchemy?

问题

I'm trying to implement an asynchronous generator called get_all_by_chunk() to fetch data from my database in chunks using SQLAlchemy and AsyncSession. However, the current implementation does not work as expected.

class BaseDAO(Generic[Model]):
    def __init__(self, model: Type[Model], session: AsyncSession):
        self.model = model
        self.session = session

    ...

    async def get_all_by_chunk(self, chunk_size=10_000):
        result = await self.session.execute(
            select(self.model).yield_per(chunk_size)
        )
        async for row in result.scalars():
            yield row

In result: TypeError: object async_generator can't be used in 'await' expression

How can I correctly implement the get_all_by_chunk method as an asynchronous generator to fetch data from the table in chunks using SQLAlchemy and AsyncSession?

python 3.11/sqlalchemy 2.0.13

英文:

I'm trying to implement an asynchronous generator called get_all_by_chunk() to fetch data from my database in chunks using SQLAlchemy and AsyncSession. However, the current implementation does not work as expected.

class BaseDAO(Generic[Model]):
    def __init__(self, model: Type[Model], session: AsyncSession):
        self.model = model
        self.session = session

    ...

    async def get_all_by_chunk(self, chunk_size=10_000):
        result = await self.session.execute(
            select(self.model).yield_per(chunk_size)
        )
        async for row in result.scalars():
            yield row

In result: TypeError: object async_generator can't be used in 'await' expression

How can I correctly implement the get_all_by_chunk method as an asynchronous generator to fetch data from the table in chunks using SQLAlchemy and AsyncSession?

python 3.11/sqlalchemy 2.0.13

答案1

得分: 1

Sure, here's the translated code portion:

    async def get_iterator(self, *whereclauses, chunk_size: int=10_000):
        stmt = select(self.model)
        if whereclauses:
            stmt = stmt.where(*whereclauses)
        result = await (self.session.stream(stmt.execution_options(yield_per=chunk_size)))
        return result.scalars()
英文:

other solution:

async def get_iterator(self, *whereclauses, chunk_size: int=10_000):
    stmt = select(self.model)
    if whereclauses:
        stmt = stmt.where(*whereclauses)
    result = await (self.session.stream(stmt.execution_options(yield_per=chunk_size)))
    return result.scalars()

答案2

得分: 0

Sure, here's the translated code:

    async def get_many(self, *whereclauses, options: Iterable | ExecutableOption = None, limit: int = None,
                       offset: int = None, order_by=None):
        stmt = select(self.model)
    
        if whereclauses:
            stmt = stmt.where(*whereclauses)
        if options:
            if isinstance(options, ExecutableOption):
                stmt = stmt.options(options)
            elif isinstance(options, Iterable):
                stmt = stmt.options(*options)
        if limit:
            stmt = stmt.limit(limit)
        if offset:
            stmt = stmt.offset(offset)
        if order_by:
            stmt = stmt.order_by(order_by)
        result = await self.session.execute(stmt)
        return result.scalars().all()
    
    async def get_chunk_iterator(self, *whereclauses, chunk_size: int):
        offset = 0  # Start from the beginning
    
        while True:
            # Get the next batch of records
            records = await self.get_many(*whereclauses, limit=chunk_size, offset=offset, order_by=self.model.id)
    
            # If no more records, stop
            if not records:
                break
    
            # Yield the records
            yield records
    
            # Update the offset for the next batch
            offset += chunk_size

update! order_by is necessarily

Please note that the code provided has been translated into Chinese as requested, and no additional content has been added.

英文:
async def get_many(self, *whereclauses, options: Iterable | ExecutableOption = None, limit: int = None,
                   offset: int = None, order_by=None):
    stmt = select(self.model)

    if whereclauses:
        stmt = stmt.where(*whereclauses)
    if options:
        if isinstance(options, ExecutableOption):
            stmt = stmt.options(options)
        elif isinstance(options, Iterable):
            stmt = stmt.options(*options)
    if limit:
        stmt = stmt.limit(limit)
    if offset:
        stmt = stmt.offset(offset)
    if order_by:
        stmt = stmt.order_by(order_by)
    result = await self.session.execute(stmt)
    return result.scalars().all()

async def get_chunk_iterator(self, *whereclauses, chunk_size: int):
    offset = 0  # Start from the beginning

    while True:
        # Get the next batch of records
        records = await self.get_many(*whereclauses, limit=chunk_size, offset=offset, order_by=self.model.id)

        # If no more records, stop
        if not records:
            break

        # Yield the records
        yield records

        # Update the offset for the next batch
        offset += chunk_size

update! order_by is necessarily

huangapple
  • 本文由 发表于 2023年5月18日 04:07:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/76275847.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定