英文:
How to make async generator with SQLAlchemy?
问题
I'm trying to implement an asynchronous generator called get_all_by_chunk() to fetch data from my database in chunks using SQLAlchemy and AsyncSession. However, the current implementation does not work as expected.
class BaseDAO(Generic[Model]):
def __init__(self, model: Type[Model], session: AsyncSession):
self.model = model
self.session = session
...
async def get_all_by_chunk(self, chunk_size=10_000):
result = await self.session.execute(
select(self.model).yield_per(chunk_size)
)
async for row in result.scalars():
yield row
In result: TypeError: object async_generator can't be used in 'await' expression
How can I correctly implement the get_all_by_chunk method as an asynchronous generator to fetch data from the table in chunks using SQLAlchemy and AsyncSession?
python 3.11/sqlalchemy 2.0.13
英文:
I'm trying to implement an asynchronous generator called get_all_by_chunk() to fetch data from my database in chunks using SQLAlchemy and AsyncSession. However, the current implementation does not work as expected.
class BaseDAO(Generic[Model]):
def __init__(self, model: Type[Model], session: AsyncSession):
self.model = model
self.session = session
...
async def get_all_by_chunk(self, chunk_size=10_000):
result = await self.session.execute(
select(self.model).yield_per(chunk_size)
)
async for row in result.scalars():
yield row
In result: TypeError: object async_generator can't be used in 'await' expression
How can I correctly implement the get_all_by_chunk method as an asynchronous generator to fetch data from the table in chunks using SQLAlchemy and AsyncSession?
python 3.11/sqlalchemy 2.0.13
答案1
得分: 1
Sure, here's the translated code portion:
async def get_iterator(self, *whereclauses, chunk_size: int=10_000):
stmt = select(self.model)
if whereclauses:
stmt = stmt.where(*whereclauses)
result = await (self.session.stream(stmt.execution_options(yield_per=chunk_size)))
return result.scalars()
英文:
other solution:
async def get_iterator(self, *whereclauses, chunk_size: int=10_000):
stmt = select(self.model)
if whereclauses:
stmt = stmt.where(*whereclauses)
result = await (self.session.stream(stmt.execution_options(yield_per=chunk_size)))
return result.scalars()
答案2
得分: 0
Sure, here's the translated code:
async def get_many(self, *whereclauses, options: Iterable | ExecutableOption = None, limit: int = None,
offset: int = None, order_by=None):
stmt = select(self.model)
if whereclauses:
stmt = stmt.where(*whereclauses)
if options:
if isinstance(options, ExecutableOption):
stmt = stmt.options(options)
elif isinstance(options, Iterable):
stmt = stmt.options(*options)
if limit:
stmt = stmt.limit(limit)
if offset:
stmt = stmt.offset(offset)
if order_by:
stmt = stmt.order_by(order_by)
result = await self.session.execute(stmt)
return result.scalars().all()
async def get_chunk_iterator(self, *whereclauses, chunk_size: int):
offset = 0 # Start from the beginning
while True:
# Get the next batch of records
records = await self.get_many(*whereclauses, limit=chunk_size, offset=offset, order_by=self.model.id)
# If no more records, stop
if not records:
break
# Yield the records
yield records
# Update the offset for the next batch
offset += chunk_size
update! order_by is necessarily
Please note that the code provided has been translated into Chinese as requested, and no additional content has been added.
英文:
async def get_many(self, *whereclauses, options: Iterable | ExecutableOption = None, limit: int = None,
offset: int = None, order_by=None):
stmt = select(self.model)
if whereclauses:
stmt = stmt.where(*whereclauses)
if options:
if isinstance(options, ExecutableOption):
stmt = stmt.options(options)
elif isinstance(options, Iterable):
stmt = stmt.options(*options)
if limit:
stmt = stmt.limit(limit)
if offset:
stmt = stmt.offset(offset)
if order_by:
stmt = stmt.order_by(order_by)
result = await self.session.execute(stmt)
return result.scalars().all()
async def get_chunk_iterator(self, *whereclauses, chunk_size: int):
offset = 0 # Start from the beginning
while True:
# Get the next batch of records
records = await self.get_many(*whereclauses, limit=chunk_size, offset=offset, order_by=self.model.id)
# If no more records, stop
if not records:
break
# Yield the records
yield records
# Update the offset for the next batch
offset += chunk_size
update! order_by is necessarily
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论