如何在Celery任务中使用SQLAlchemy中的AsyncSession?

huangapple go评论69阅读模式
英文:

How to use AsyncSession from sqlalchemy in celery tasks?

问题

Use AsyncSession in celery tasks

我使用fastapi和sqlalchemy,必须创建一个celery任务,该任务将访问数据库并检查是否有我的Event(表)的任何对象的end_time < datetime.now()

这是我的代码:

@asynccontextmanager
async def scoped_session():
    scoped_factory = async_scoped_session(
        async_session,
        scopefunc=asyncio.current_task()
    )
    try:
        async with scoped_factory() as s:
            yield s
    finally:
        await scoped_factory().remove()


async def logic():
    async with scoped_session() as session:
        stmt = select(event.models.Event).where(
            event.models.Event.end_time <= datetime.now()
        )
        results = await session.execute(stmt)
        for res in results.fetchall():
            print(res.is_event_done)


@celery.task(name='is_event_done', bind=True, ignore_result=True)
def is_event_done(self):
    asyncio.run(logic())

这是我的async_session

engine = create_async_engine(settings.db_url, echo=True)

async_session = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False
)

所以我收到\&#39;_asyncio.Task\&#39; object is not callable错误。

英文:

Use AsyncSession in celery tasks

I use fastapi and sqlalchemy, I must create celery task, that will go to the database and check does any objects of my Event (table) has end_time < datetime.now()

There is my code:

@asynccontextmanager
async def scoped_session():
    scoped_factory = async_scoped_session(
        async_session,
        scopefunc=asyncio.current_task()
    )
    try:
        async with scoped_factory() as s:
            yield s
    finally:
        await scoped_factory().remove()


async def logic():
    async with scoped_session() as session:
        stmt = select(event.models.Event).where(
            event.models.Event.end_time &lt;= datetime.now()
        )
        results = await session.execute(stmt)
        for res in results.fetchall():
            print(res.is_event_done)


@celery.task(name=&#39;is_event_done&#39;, bind=True, ignore_result=True)
def is_event_done(self):
    asyncio.run(logic())

here is my async_session

engine = create_async_engine(settings.db_url, echo=True)

async_session = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False
)

so I got \&#39;_asyncio.Task\&#39; object is not callable

答案1

得分: 1

I just do like this

async def update_event() -> None:
    async with async_session() as session:
        stmt = update(event.models.Event).where(
            event.models.Event.end_time <= datetime.now(),
            event.models.Event.is_active is True
        ).values(is_done=True)
        await session.execute(stmt)

@celery.task(name='is_event_done', bind=True, ignore_result=True)
def is_event_done(self) -> None:
    loop.run_until_complete(update_event())

and now its work fine, I hope that its a good solution
If there is anything wrong let me know, thanks!

英文:

I just do like this

async def update_event() -&gt; None:
    async with async_session() as session:
        stmt = update(event.models.Event).where(
            event.models.Event.end_time &lt;= datetime.now(),
            event.models.Event.is_active is True
        ).values(is_done=True)
        await session.execute(stmt)


@celery.task(name=&#39;is_event_done&#39;, bind=True, ignore_result=True)
def is_event_done(self) -&gt; None:
    loop.run_until_complete(update_event())

and now its work fine, I hope that its a good solution
If there is anything wrong let me know, thanks!

huangapple
  • 本文由 发表于 2023年5月24日 18:27:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/76322524.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定