英文:
get sqlalchemy with apscheduler multithreading to work
问题
I have a list of jobs I am adding to an apscheduler BlockingScheduler with a ThreadPoolExecutor number the same size as the number of jobs.
我有一系列的工作,我正在将它们添加到一个apscheduler的BlockingScheduler中,线程池执行器的数量与工作数量相同。
The jobs I am adding are using sqlalchemy and interacting with the same database, but i am getting errors:
我添加的工作使用了sqlalchemy,并与同一个数据库交互,但是我遇到了错误:
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) 数据库已被锁定
I have used a scoped_session and a sessionmaker in my base sqlalchemy set-up.
我在我的基本sqlalchemy设置中使用了scoped_session和sessionmaker。
from os.path import join, realpath
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
db_name = environ.get("DB_NAME")
db_path = realpath(join("data", db_name))
engine = create_engine(f"sqlite:///{db_path}", pool_pre_ping=True)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
Base = declarative_base()
然后,我添加到apscheduler的计划任务类的示例如下:
然后,我添加到apscheduler的计划任务类的示例如下:
from app.data_structures.base import Base, Session, engine
from app.data_structures.job import Job
from app.data_structures.scheduled_job import ScheduledJob
from app.data_structures.user import User
class AccountingProcessorJob(ScheduledJob):
name: str = "Accounting Processor"
def __init__(self, resources: AppResources, depends: List[str] = None) -> None:
super().__init__(resources)
def job_function(self) -> None:
account_dir = realpath(environ.get("ACCOUNTING_DIRECTORY"))
Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)
session = Session()
try:
# 在这里使用会话执行一些操作,例如:
# 使用一些设置好的变量
user = User(user_name=user_name)
session.add(user)
user.extend(jobs)
session.commit()
except:
session.rollback()
finally:
Session.remove()
I was under the impression that using a scoped_session and a session factory would start a new session for each thread, and make it thread-safe.
我以为使用scoped_session和session工厂会为每个线程启动一个新会话,并使其线程安全。
where the User and Job are sqlalchemy orm objects, eg:
其中User和Job是sqlalchemy orm对象,例如:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.expression import false
from app.data_structures.base import Base
from app.data_structures.job import Job
class User(Base):
__tablename__ = "users"
user_name: Mapped[str] = mapped_column(primary_key=True),
employee_number = Column(Integer)
manager = relationship("User", remote_side=[user_name], post_update=True)
jobs: Mapped[list[Job]] = relationship()
def __init__(
self,
user_name: str,
employee_number: int = None,
manager: str = None,
) -> None:
self.user_name = user_name
self.employee_number = employee_number
self.manager = manager
Can anyone explain what I am doing wrong, and how to go about fixing it?
有人能解释我做错了什么,以及如何修复它吗?
英文:
I have a list of jobs I am adding to an apscheduler BlockingScheduler with a ThreadPoolExecutor number the same size as the number of jobs.
The jobs I am adding are using sqlalchemy and interacting with the same database, but i am getting errors:
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
I have used a scoped_session and a sessionmaker in my base sqlalchemy set-up.
from os.path import join, realpath
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
db_name = environ.get("DB_NAME")
db_path = realpath(join( "data", db_name))
engine = create_engine(f"sqlite:///{db_path}", pool_pre_ping=True)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
Base = declarative_base()
Then an example of the scheduled job class I add to apscheduler I have something like this:
from app.data_structures.base import Base, Session, engine
from app.data_structures.job import Job
from app.data_structures.scheduled_job import ScheduledJob
from app.data_structures.user import User
class AccountingProcessorJob(ScheduledJob):
name: str = "Accounting Processor"
def __init__(self, resources: AppResources, depends: List[str] = None) -> None:
super().__init__(resources)
def job_function(self) -> None:
account_dir = realpath(environ.get("ACCOUNTING_DIRECTORY"))
Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)
session = Session()
try:
#do some stuff with the session here e.g.
# with some variables that are setup
user = User(user_name=user_name)
session.add(user)
user.extend(jobs)
session.commit()
except:
session.rollback()
finally:
Session.remove()
I was under the impression that using a scoped_session and a session factory would start a new session for each thread, and make it thread-safe.
where the User and Job are sqlalchemy orm objects, eg:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.expression import false
from app.data_structures.base import Base
from app.data_structures.job import Job
class User(Base):
__tablename__ = "users"
user_name: Mapped[str] = mapped_column(primary_key=True),
employee_number = Column(Integer)
manager = relationship("User", remote_side=[user_name], post_update=True)
jobs: Mapped[list[Job]] = relationship()
def __init__(
self,
user_name: str,
employee_number: int = None,
manager: str = None,
) -> None:
self.user_name = user_name
self.employee_number = employee_number
self.manager = manager
Can anyone explain what I am doing wrong, and how to go about fixing it?
答案1
得分: 1
这个示例使用的是SQLAlchemy 1.4.48、apscheduler 3.10.1和python模块sqlite3 3.34.1。
我不确定如何完全重现你的作业函数中发生的情况,因为不清楚jobs
或user.extend(jobs)
中的jobs
来自哪里。
create_all 在每个作业中似乎不起作用(抛出表已存在的错误),所以我将它放在自己的作业中,只在创建用户的作业之前运行一次。
我使用了随机的睡眠来确保作业会间歇性地运行,而不是在最大线程数的确切块中运行。
from os import environ
from time import sleep
from os.path import join, realpath
from secrets import randbelow
from sqlalchemy import create_engine, Integer, String
from sqlalchemy.schema import Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
db_name = environ.get("DB_NAME")
db_path = realpath(join("data", db_name))
engine = create_engine(f"sqlite:///{db_path}", pool_pre_ping=True)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String)
def setup_func():
print("Running create_all")
Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)
def job_function(i):
print("Running job function {}".format(i))
session = Session()
try:
seconds_to_wait = 1 + randbelow(4)
sleep(seconds_to_wait)
user = User(username=str(i))
session.add(user)
session.commit()
finally:
# 如果需要,将回滚。
Session.remove()
FIFTEEN_MINUTES_IN_SECONDS = 15 * 60
def main():
sched = BlockingScheduler(
timezone="US/Pacific",
executors={"default": ThreadPoolExecutor(4)},
job_defaults={"misfire_grace_time": FIFTEEN_MINUTES_IN_SECONDS},
)
sched.add_job(setup_func)
for i in range(100):
sched.add_job(job_function, args=[i])
sched.start()
if __name__ == "__main__":
main()
$ rm -f data/test.sql && DB_NAME=test.sql ve/bin/python test_apscheduler.py
/home/ian/workspace/laspilitas-project/stackoverflow-and-testing/test_apscheduler.py:18: MovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
Base = declarative_base()
Running create_all
Running job function 0
Running job function 1
Running job function 2
Running job function 3
Running job function 4
Running job function 5
Running job function 6
Running job function 7
Running job function 8
Running job function 9
Running job function 10
Running job function 11
Running job function 12
Running job function 13
Running job function 14
Running job function 15
Running job function 16
Running job function 17
Running job function 18
Running job function 19
英文:
This example is using SQLAlchemy 1.4.48, apscheduler 3.10.1 and python module sqlite3 3.34.1.
I'm not sure how to fully recreate what is going on in your job function because it isn't clear where jobs
or .extend
in user.extend(jobs)
comes from.
The create_all doesn't seem to work well in each job (throws errors that table already exists) so I put it in its own job and only ran it once ahead of the jobs that create users.
I used a random sleep to make sure the jobs would run intermittently and not in exact blocks of the max thread count.
from os import environ
from time import sleep
from os.path import join, realpath
from secrets import randbelow
from sqlalchemy import create_engine, Integer, String
from sqlalchemy.schema import Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
db_name = environ.get("DB_NAME")
db_path = realpath(join("data", db_name))
engine = create_engine(f"sqlite:///{db_path}", pool_pre_ping=True)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String)
def setup_func():
print("Running create_all")
Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)
def job_function(i):
print("Running job function {}".format(i))
session = Session()
try:
seconds_to_wait = 1 + randbelow(4)
sleep(seconds_to_wait)
user = User(username=str(i))
session.add(user)
session.commit()
finally:
# Will rollback if needed.
Session.remove()
FIFTEEN_MINUTES_IN_SECONDS = 15 * 60
def main():
sched = BlockingScheduler(
timezone="US/Pacific",
executors={"default": ThreadPoolExecutor(4)},
job_defaults={"misfire_grace_time": FIFTEEN_MINUTES_IN_SECONDS},
)
sched.add_job(setup_func)
for i in range(100):
sched.add_job(job_function, args=[i])
sched.start()
if __name__ == "__main__":
main()
$ rm -f data/test.sql && DB_NAME=test.sql ve/bin/python test_apscheduler.py
/home/ian/workspace/laspilitas-project/stackoverflow-and-testing/test_apscheduler.py:18: MovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
Base = declarative_base()
Running create_all
Running job function 0
Running job function 1
Running job function 2
Running job function 3
Running job function 4
Running job function 5
Running job function 6
Running job function 7
Running job function 8
Running job function 9
Running job function 10
Running job function 11
Running job function 12
Running job function 13
Running job function 14
Running job function 15
Running job function 16
Running job function 17
Running job function 18
Running job function 19
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论