使用APScheduler多线程与SQLAlchemy一起工作

huangapple go评论55阅读模式
英文:

get sqlalchemy with apscheduler multithreading to work

问题

I have a list of jobs I am adding to an apscheduler BlockingScheduler with a ThreadPoolExecutor number the same size as the number of jobs.

我有一系列的工作,我正在将它们添加到一个apscheduler的BlockingScheduler中,线程池执行器的数量与工作数量相同。

The jobs I am adding are using sqlalchemy and interacting with the same database, but i am getting errors:

我添加的工作使用了sqlalchemy,并与同一个数据库交互,但是我遇到了错误:

sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked

sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) 数据库已被锁定

I have used a scoped_session and a sessionmaker in my base sqlalchemy set-up.

我在我的基本sqlalchemy设置中使用了scoped_session和sessionmaker。

from os.path import join, realpath
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker

db_name = environ.get("DB_NAME")
db_path = realpath(join("data", db_name))
engine = create_engine(f"sqlite:///{db_path}", pool_pre_ping=True)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
Base = declarative_base()

然后,我添加到apscheduler的计划任务类的示例如下:

然后,我添加到apscheduler的计划任务类的示例如下:

from app.data_structures.base import Base, Session, engine
from app.data_structures.job import Job

from app.data_structures.scheduled_job import ScheduledJob
from app.data_structures.user import User

class AccountingProcessorJob(ScheduledJob):
    name: str = "Accounting Processor"
    def __init__(self, resources: AppResources, depends: List[str] = None) -> None:
        super().__init__(resources)

    def job_function(self) -> None:
        account_dir = realpath(environ.get("ACCOUNTING_DIRECTORY"))
        Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)
        session = Session()
        try:
            # 在这里使用会话执行一些操作,例如:
            # 使用一些设置好的变量
            user = User(user_name=user_name)
            session.add(user)
            user.extend(jobs)
            session.commit()
        except:
            session.rollback()
        finally:
            Session.remove()

I was under the impression that using a scoped_session and a session factory would start a new session for each thread, and make it thread-safe.

我以为使用scoped_session和session工厂会为每个线程启动一个新会话,并使其线程安全。

where the User and Job are sqlalchemy orm objects, eg:

其中User和Job是sqlalchemy orm对象,例如:

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.expression import false

from app.data_structures.base import Base
from app.data_structures.job import Job

class User(Base):
        __tablename__ = "users"

        user_name: Mapped[str] = mapped_column(primary_key=True),
        employee_number = Column(Integer)
        manager = relationship("User", remote_side=[user_name], post_update=True)
        jobs: Mapped[list[Job]] = relationship()

        def __init__(
            self,
            user_name: str,
            employee_number: int = None,
            manager: str = None,

        ) -> None:
            self.user_name = user_name
            self.employee_number = employee_number
            self.manager = manager

Can anyone explain what I am doing wrong, and how to go about fixing it?

有人能解释我做错了什么,以及如何修复它吗?

英文:

I have a list of jobs I am adding to an apscheduler BlockingScheduler with a ThreadPoolExecutor number the same size as the number of jobs.

The jobs I am adding are using sqlalchemy and interacting with the same database, but i am getting errors:

sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked

I have used a scoped_session and a sessionmaker in my base sqlalchemy set-up.

    from os.path import join, realpath
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import scoped_session, sessionmaker
    
    db_name = environ.get("DB_NAME")
    db_path = realpath(join( "data", db_name))
    engine = create_engine(f"sqlite:///{db_path}", pool_pre_ping=True)
    session_factory = sessionmaker(bind=engine)
    Session = scoped_session(session_factory)
    Base = declarative_base()

Then an example of the scheduled job class I add to apscheduler I have something like this:

from app.data_structures.base import Base, Session, engine
from app.data_structures.job import Job

from app.data_structures.scheduled_job import ScheduledJob
from app.data_structures.user import User


class AccountingProcessorJob(ScheduledJob):
    name: str = "Accounting Processor"
    def __init__(self, resources: AppResources, depends: List[str] = None) -> None:
        super().__init__(resources)

    def job_function(self) -> None:
        account_dir = realpath(environ.get("ACCOUNTING_DIRECTORY"))
        Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)
        session = Session()
        try:
            #do some stuff with the session here e.g.
            # with some variables that are setup
            user = User(user_name=user_name)
            session.add(user)
            user.extend(jobs)
            session.commit()
        except:
            session.rollback()
        finally:
            Session.remove() 

I was under the impression that using a scoped_session and a session factory would start a new session for each thread, and make it thread-safe.

where the User and Job are sqlalchemy orm objects, eg:

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.expression import false

from app.data_structures.base import Base
from app.data_structures.job import Job
    
    
    

class User(Base):
        __tablename__ = "users"
    
        user_name: Mapped[str] = mapped_column(primary_key=True),
        employee_number = Column(Integer)
        manager = relationship("User", remote_side=[user_name], post_update=True)
        jobs: Mapped[list[Job]] = relationship()
    
        def __init__(
            self,
            user_name: str,
            employee_number: int = None,
            manager: str = None,
    
        ) -> None:
            self.user_name = user_name
            self.employee_number = employee_number
            self.manager = manager

Can anyone explain what I am doing wrong, and how to go about fixing it?

答案1

得分: 1

这个示例使用的是SQLAlchemy 1.4.48、apscheduler 3.10.1和python模块sqlite3 3.34.1。

我不确定如何完全重现你的作业函数中发生的情况,因为不清楚jobsuser.extend(jobs)中的jobs来自哪里。

create_all 在每个作业中似乎不起作用(抛出表已存在的错误),所以我将它放在自己的作业中,只在创建用户的作业之前运行一次。

我使用了随机的睡眠来确保作业会间歇性地运行,而不是在最大线程数的确切块中运行。

from os import environ
from time import sleep
from os.path import join, realpath
from secrets import randbelow

from sqlalchemy import create_engine, Integer, String
from sqlalchemy.schema import Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

db_name = environ.get("DB_NAME")
db_path = realpath(join("data", db_name))
engine = create_engine(f"sqlite:///{db_path}", pool_pre_ping=True)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
Base = declarative_base()


class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    username = Column(String)


def setup_func():
    print("Running create_all")
    Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)


def job_function(i):
    print("Running job function {}".format(i))
    session = Session()
    try:
        seconds_to_wait = 1 + randbelow(4)
        sleep(seconds_to_wait)
        user = User(username=str(i))
        session.add(user)
        session.commit()
    finally:
        # 如果需要,将回滚。
        Session.remove()


FIFTEEN_MINUTES_IN_SECONDS = 15 * 60


def main():
    sched = BlockingScheduler(
        timezone="US/Pacific",
        executors={"default": ThreadPoolExecutor(4)},
        job_defaults={"misfire_grace_time": FIFTEEN_MINUTES_IN_SECONDS},
    )
    sched.add_job(setup_func)
    for i in range(100):
        sched.add_job(job_function, args=[i])
    sched.start()


if __name__ == "__main__":
    main()
$ rm -f data/test.sql && DB_NAME=test.sql ve/bin/python test_apscheduler.py 
/home/ian/workspace/laspilitas-project/stackoverflow-and-testing/test_apscheduler.py:18: MovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
  Base = declarative_base()
Running create_all
Running job function 0
Running job function 1
Running job function 2
Running job function 3
Running job function 4
Running job function 5
Running job function 6
Running job function 7
Running job function 8
Running job function 9
Running job function 10
Running job function 11
Running job function 12
Running job function 13
Running job function 14
Running job function 15
Running job function 16
Running job function 17
Running job function 18
Running job function 19
英文:

This example is using SQLAlchemy 1.4.48, apscheduler 3.10.1 and python module sqlite3 3.34.1.

I'm not sure how to fully recreate what is going on in your job function because it isn't clear where jobs or .extend in user.extend(jobs) comes from.

The create_all doesn't seem to work well in each job (throws errors that table already exists) so I put it in its own job and only ran it once ahead of the jobs that create users.

I used a random sleep to make sure the jobs would run intermittently and not in exact blocks of the max thread count.

from os import environ
from time import sleep
from os.path import join, realpath
from secrets import randbelow

from sqlalchemy import create_engine, Integer, String
from sqlalchemy.schema import Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

db_name = environ.get(&quot;DB_NAME&quot;)
db_path = realpath(join(&quot;data&quot;, db_name))
engine = create_engine(f&quot;sqlite:///{db_path}&quot;, pool_pre_ping=True)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
Base = declarative_base()


class User(Base):
    __tablename__ = &quot;users&quot;
    id = Column(Integer, primary_key=True)
    username = Column(String)


def setup_func():
    print(&quot;Running create_all&quot;)
    Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)


def job_function(i):
    print(&quot;Running job function {}&quot;.format(i))
    session = Session()
    try:
        seconds_to_wait = 1 + randbelow(4)
        sleep(seconds_to_wait)
        user = User(username=str(i))
        session.add(user)
        session.commit()
    finally:
        # Will rollback if needed.
        Session.remove()


FIFTEEN_MINUTES_IN_SECONDS = 15 * 60


def main():
    sched = BlockingScheduler(
        timezone=&quot;US/Pacific&quot;,
        executors={&quot;default&quot;: ThreadPoolExecutor(4)},
        job_defaults={&quot;misfire_grace_time&quot;: FIFTEEN_MINUTES_IN_SECONDS},
    )
    sched.add_job(setup_func)
    for i in range(100):
        sched.add_job(job_function, args=[i])
    sched.start()


if __name__ == &quot;__main__&quot;:
    main()
$ rm -f data/test.sql &amp;&amp; DB_NAME=test.sql ve/bin/python test_apscheduler.py 
/home/ian/workspace/laspilitas-project/stackoverflow-and-testing/test_apscheduler.py:18: MovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to &quot;sqlalchemy&lt;2.0&quot;. Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
  Base = declarative_base()
Running create_all
Running job function 0
Running job function 1
Running job function 2
Running job function 3
Running job function 4
Running job function 5
Running job function 6
Running job function 7
Running job function 8
Running job function 9
Running job function 10
Running job function 11
Running job function 12
Running job function 13
Running job function 14
Running job function 15
Running job function 16
Running job function 17
Running job function 18
Running job function 19

huangapple
  • 本文由 发表于 2023年5月24日 21:48:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/76324247.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定