如何使用SQLAlchemy创建异步生成器?

huangapple go评论88阅读模式
英文:

How to make async generator with SQLAlchemy?

问题

I'm trying to implement an asynchronous generator called get_all_by_chunk() to fetch data from my database in chunks using SQLAlchemy and AsyncSession. However, the current implementation does not work as expected.

  1. class BaseDAO(Generic[Model]):
  2. def __init__(self, model: Type[Model], session: AsyncSession):
  3. self.model = model
  4. self.session = session
  5. ...
  6. async def get_all_by_chunk(self, chunk_size=10_000):
  7. result = await self.session.execute(
  8. select(self.model).yield_per(chunk_size)
  9. )
  10. async for row in result.scalars():
  11. yield row

In result: TypeError: object async_generator can't be used in 'await' expression

How can I correctly implement the get_all_by_chunk method as an asynchronous generator to fetch data from the table in chunks using SQLAlchemy and AsyncSession?

python 3.11/sqlalchemy 2.0.13

英文:

I'm trying to implement an asynchronous generator called get_all_by_chunk() to fetch data from my database in chunks using SQLAlchemy and AsyncSession. However, the current implementation does not work as expected.

  1. class BaseDAO(Generic[Model]):
  2. def __init__(self, model: Type[Model], session: AsyncSession):
  3. self.model = model
  4. self.session = session
  5. ...
  6. async def get_all_by_chunk(self, chunk_size=10_000):
  7. result = await self.session.execute(
  8. select(self.model).yield_per(chunk_size)
  9. )
  10. async for row in result.scalars():
  11. yield row

In result: TypeError: object async_generator can't be used in 'await' expression

How can I correctly implement the get_all_by_chunk method as an asynchronous generator to fetch data from the table in chunks using SQLAlchemy and AsyncSession?

python 3.11/sqlalchemy 2.0.13

答案1

得分: 1

Sure, here's the translated code portion:

  1. async def get_iterator(self, *whereclauses, chunk_size: int=10_000):
  2. stmt = select(self.model)
  3. if whereclauses:
  4. stmt = stmt.where(*whereclauses)
  5. result = await (self.session.stream(stmt.execution_options(yield_per=chunk_size)))
  6. return result.scalars()
英文:

other solution:

  1. async def get_iterator(self, *whereclauses, chunk_size: int=10_000):
  2. stmt = select(self.model)
  3. if whereclauses:
  4. stmt = stmt.where(*whereclauses)
  5. result = await (self.session.stream(stmt.execution_options(yield_per=chunk_size)))
  6. return result.scalars()

答案2

得分: 0

Sure, here's the translated code:

  1. async def get_many(self, *whereclauses, options: Iterable | ExecutableOption = None, limit: int = None,
  2. offset: int = None, order_by=None):
  3. stmt = select(self.model)
  4. if whereclauses:
  5. stmt = stmt.where(*whereclauses)
  6. if options:
  7. if isinstance(options, ExecutableOption):
  8. stmt = stmt.options(options)
  9. elif isinstance(options, Iterable):
  10. stmt = stmt.options(*options)
  11. if limit:
  12. stmt = stmt.limit(limit)
  13. if offset:
  14. stmt = stmt.offset(offset)
  15. if order_by:
  16. stmt = stmt.order_by(order_by)
  17. result = await self.session.execute(stmt)
  18. return result.scalars().all()
  19. async def get_chunk_iterator(self, *whereclauses, chunk_size: int):
  20. offset = 0 # Start from the beginning
  21. while True:
  22. # Get the next batch of records
  23. records = await self.get_many(*whereclauses, limit=chunk_size, offset=offset, order_by=self.model.id)
  24. # If no more records, stop
  25. if not records:
  26. break
  27. # Yield the records
  28. yield records
  29. # Update the offset for the next batch
  30. offset += chunk_size
  31. update! order_by is necessarily

Please note that the code provided has been translated into Chinese as requested, and no additional content has been added.

英文:
  1. async def get_many(self, *whereclauses, options: Iterable | ExecutableOption = None, limit: int = None,
  2. offset: int = None, order_by=None):
  3. stmt = select(self.model)
  4. if whereclauses:
  5. stmt = stmt.where(*whereclauses)
  6. if options:
  7. if isinstance(options, ExecutableOption):
  8. stmt = stmt.options(options)
  9. elif isinstance(options, Iterable):
  10. stmt = stmt.options(*options)
  11. if limit:
  12. stmt = stmt.limit(limit)
  13. if offset:
  14. stmt = stmt.offset(offset)
  15. if order_by:
  16. stmt = stmt.order_by(order_by)
  17. result = await self.session.execute(stmt)
  18. return result.scalars().all()
  19. async def get_chunk_iterator(self, *whereclauses, chunk_size: int):
  20. offset = 0 # Start from the beginning
  21. while True:
  22. # Get the next batch of records
  23. records = await self.get_many(*whereclauses, limit=chunk_size, offset=offset, order_by=self.model.id)
  24. # If no more records, stop
  25. if not records:
  26. break
  27. # Yield the records
  28. yield records
  29. # Update the offset for the next batch
  30. offset += chunk_size

update! order_by is necessarily

huangapple
  • 本文由 发表于 2023年5月18日 04:07:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/76275847.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定