继承的类在APScheduler作业中使用抽象父类的方法。

huangapple go评论79阅读模式
英文:

Inherited class is using methods from abstract parent in APScheduler job

问题

我有一个问题,APScheduler 的任务没有正确运行。即使它们具有自己的实现,它们也会使用父抽象类的方法。只有在通过命令触发启动函数时才会发生这种情况。所有这些都在Telegram API机器人内部。似乎只有在使用RedisJobStore时才会出现此错误。

from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import asyncio
import logging

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.redis import RedisJobStore


logging.basicConfig(filename='logfile.log', level=logging.INFO)
logging.getLogger('apscheduler').setLevel(logging.DEBUG)


class MyAbstractClass(ABC):
    @classmethod
    @abstractmethod
    def get_required_members(cls):
        raise NotImplementedError

    @classmethod
    async def initiate_all(cls):
        members = cls.get_required_members()
        logging.info(f'Got {members=}')
        ...


class MyImplementation(MyAbstractClass):
    @classmethod
    def get_required_members(cls):
        return ['Alex', 'Anna']

    @classmethod
    def append_to_scheduler(cls, scheduler: AsyncIOScheduler, run_date: datetime):
        return scheduler.add_job(
            func=cls.initiate_all,
            trigger='date',
            run_date=run_date
        )


redis_job_store = RedisJobStore()
scheduler = AsyncIOScheduler(jobstores={'default': redis_job_store}, logger=logging.getLogger())
scheduler.start()

run_date = datetime.now() + timedelta(seconds=5)
MyImplementation.append_to_scheduler(scheduler=scheduler, run_date=run_date)
asyncio.get_event_loop().run_forever()

错误:

Job "MyAbstractClass.initiate_all (trigger: date[2023-08-04 19:01:03 MSK], next run at: 2023-08-04 19:01:03 MSK)" raised an exception
Traceback (most recent call last):
  File "/media/russich555/hdd/Programming/Freelance/YouDo/29.2pilot/venv/lib/python3.11/site-packages/apscheduler/executors/base_py3.py", line 30, in run_coroutine_job
    retval = await job.func(*job.args, **job.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/media/russich555/hdd/Programming/Freelance/YouDo/29.2pilot/mre/api.py", line 13, in initiate_all
    members = cls.get_required_members()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/media/russich555/hdd/Programming/Freelance/YouDo/29.2pilot/mre/api.py", line 9, in get_required_members
    raise NotImplementedError
NotImplementedError

P.S. 我将此问题发布在问题页面

英文:

I have a problem that APScheduler jobs are not running correctly. They are using methods from parent abstract class even having their own implementetion. It's not happening while launch function by command-trigger. All of this inside telegram API bot. Seems like this error appears only with RedisJobStore

from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import asyncio
import logging

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.redis import RedisJobStore


logging.basicConfig(filename='logfile.log', level=logging.INFO)
logging.getLogger('apscheduler').setLevel(logging.DEBUG)


class MyAbstractClass(ABC):
    @classmethod
    @abstractmethod
    def get_required_members(cls):
        raise NotImplementedError

    @classmethod
    async def initiate_all(cls):
        members = cls.get_required_members()
        logging.info(f'Got {members=}')
        ...


class MyImplementation(MyAbstractClass):
    @classmethod
    def get_required_members(cls):
        return ['Alex', 'Anna']

    @classmethod
    def append_to_scheduler(cls, scheduler: AsyncIOScheduler, run_date: datetime):
        return scheduler.add_job(
            func=cls.initiate_all,
            trigger='date',
            run_date=run_date
        )


redis_job_store = RedisJobStore()
scheduler = AsyncIOScheduler(jobstores={'default': redis_job_store}, logger=logging.getLogger())
scheduler.start()

run_date = datetime.now() + timedelta(seconds=5)
MyImplementation.append_to_scheduler(scheduler=scheduler, run_date=run_date)
asyncio.get_event_loop().run_forever()

Error:

Job "MyAbstractClass.initiate_all (trigger: date[2023-08-04 19:01:03 MSK], next run at: 2023-08-04 19:01:03 MSK)" raised an exception
Traceback (most recent call last):
  File "/media/russich555/hdd/Programming/Freelance/YouDo/29.2pilot/venv/lib/python3.11/site-packages/apscheduler/executors/base_py3.py", line 30, in run_coroutine_job
    retval = await job.func(*job.args, **job.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/media/russich555/hdd/Programming/Freelance/YouDo/29.2pilot/mre/api.py", line 13, in initiate_all
    members = cls.get_required_members()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/media/russich555/hdd/Programming/Freelance/YouDo/29.2pilot/mre/api.py", line 9, in get_required_members
    raise NotImplementedError
NotImplementedError

P.S. I posted this as issue

答案1

得分: 1

这是来自 agronholm/apscheduler@5c69150(v3.1.0) 的一个回退(regression)(当前版本:v3.10.1)。

它影响所有使用 job.__getstate__() 的作业存储(job stores):

  • MongoDBJobStore
  • RedisJobStore
  • RethinkDBJobStore
  • SQLAlchemyJobStore
  • ZooKeeperJobStore

您可以修补 apscheduler.util.get_callable_name,以不使用 func.__qualname__

from apscheduler import util
from inspect import isclass


def get_callable_name(func):
    # `func.__qualname__` 破坏了调用子类方法的继承方法
    # if hasattr(func, '__qualname__'):
    #     return func.__qualname__
    f_self = getattr(func, '__self__', None) or getattr(func, 'im_self', None)
    if f_self and hasattr(func, '__name__'):
        f_class = f_self if isclass(f_self) else f_self.__class__
    else:
        f_class = getattr(func, 'im_class', None)
    if f_class and hasattr(func, '__name__'):
        return '%s.%s' % (f_class.__name__, func.__name__)
    if hasattr(func, '__call__'):
        if hasattr(func, '__name__'):
            return func.__name__
        return func.__class__.__name__
    raise TypeError('Unable to determine a name for %r -- maybe it is not a callable?' % func)


util.get_callable_name = get_callable_name
英文:

It's a regression from agronholm/apscheduler@5c69150 (v3.1.0) (current version: v3.10.1).

It affects all job stores that use job.__getstate__():

  • MongoDBJobStore
  • RedisJobStore
  • RethinkDBJobStore
  • SQLAlchemyJobStore
  • ZooKeeperJobStore

You can patch apscheduler.util.get_callable_name to not use func.__qualname__:

from apscheduler import util
from inspect import isclass


def get_callable_name(func):
    # `func.__qualname__` breaks inherited methods that call subclass methods
    # if hasattr(func, '__qualname__'):
    #     return func.__qualname__
    f_self = getattr(func, '__self__', None) or getattr(func, 'im_self', None)
    if f_self and hasattr(func, '__name__'):
        f_class = f_self if isclass(f_self) else f_self.__class__
    else:
        f_class = getattr(func, 'im_class', None)
    if f_class and hasattr(func, '__name__'):
        return '%s.%s' % (f_class.__name__, func.__name__)
    if hasattr(func, '__call__'):
        if hasattr(func, '__name__'):
            return func.__name__
        return func.__class__.__name__
    raise TypeError('Unable to determine a name for %r -- maybe it is not a callable?' % func)


util.get_callable_name = get_callable_name

huangapple
  • 本文由 发表于 2023年8月5日 00:05:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76837560.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定