英文:
How to use AsyncSession from sqlalchemy in celery tasks?
问题
Use AsyncSession in celery tasks
我使用fastapi和sqlalchemy,必须创建一个celery任务,该任务将访问数据库并检查是否有我的Event(表)的任何对象的end_time < datetime.now()
这是我的代码:
@asynccontextmanager
async def scoped_session():
scoped_factory = async_scoped_session(
async_session,
scopefunc=asyncio.current_task()
)
try:
async with scoped_factory() as s:
yield s
finally:
await scoped_factory().remove()
async def logic():
async with scoped_session() as session:
stmt = select(event.models.Event).where(
event.models.Event.end_time <= datetime.now()
)
results = await session.execute(stmt)
for res in results.fetchall():
print(res.is_event_done)
@celery.task(name='is_event_done', bind=True, ignore_result=True)
def is_event_done(self):
asyncio.run(logic())
这是我的async_session
engine = create_async_engine(settings.db_url, echo=True)
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
所以我收到\'_asyncio.Task\' object is not callable
错误。
英文:
Use AsyncSession in celery tasks
I use fastapi and sqlalchemy, I must create celery task, that will go to the database and check does any objects of my Event (table) has end_time < datetime.now()
There is my code:
@asynccontextmanager
async def scoped_session():
scoped_factory = async_scoped_session(
async_session,
scopefunc=asyncio.current_task()
)
try:
async with scoped_factory() as s:
yield s
finally:
await scoped_factory().remove()
async def logic():
async with scoped_session() as session:
stmt = select(event.models.Event).where(
event.models.Event.end_time <= datetime.now()
)
results = await session.execute(stmt)
for res in results.fetchall():
print(res.is_event_done)
@celery.task(name='is_event_done', bind=True, ignore_result=True)
def is_event_done(self):
asyncio.run(logic())
here is my async_session
engine = create_async_engine(settings.db_url, echo=True)
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
so I got \'_asyncio.Task\' object is not callable
答案1
得分: 1
I just do like this
async def update_event() -> None:
async with async_session() as session:
stmt = update(event.models.Event).where(
event.models.Event.end_time <= datetime.now(),
event.models.Event.is_active is True
).values(is_done=True)
await session.execute(stmt)
@celery.task(name='is_event_done', bind=True, ignore_result=True)
def is_event_done(self) -> None:
loop.run_until_complete(update_event())
and now its work fine, I hope that its a good solution
If there is anything wrong let me know, thanks!
英文:
I just do like this
async def update_event() -> None:
async with async_session() as session:
stmt = update(event.models.Event).where(
event.models.Event.end_time <= datetime.now(),
event.models.Event.is_active is True
).values(is_done=True)
await session.execute(stmt)
@celery.task(name='is_event_done', bind=True, ignore_result=True)
def is_event_done(self) -> None:
loop.run_until_complete(update_event())
and now its work fine, I hope that its a good solution
If there is anything wrong let me know, thanks!
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论