Flask Celery – RabbitMQ连接不起作用

huangapple go评论85阅读模式
英文:

Flask Celery - RabbitMQ Connection is not Working

问题

我已经开发了Flask应用程序。我想从我的应用程序中发送邮件。
因此,我选择了使用Celery和RabbitMQ的后台进程,但在拉取数据时出现以下错误。

celery=Celery(
    'sequre_spacement_backend',
    broker = app.config['QUEUE_BROKER_URL'],
    include = ['sequre_spacement_backend.tasks.MeetingRoom.MeetingRoomSMSNotification']
)

运行celery worker时出现以下错误

[2023-03-07 13:00:56,534: ERROR/MainProcess] 收到未注册的任务类型' tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'。
消息已被忽略和丢弃。

您是否记得导入包含此任务的模块?
或者您是否使用了相对导入?

请查看更多信息,请参阅
http://docs.celeryq.org/en/latest/internals/protocol.html

消息体的完整内容如下:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)

消息头的完整内容如下:
{'lang': 'py', 'task': 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test', 'id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen137853@heptagon', 'ignore_result': False}

此任务的传递信息如下:
{'consumer_tag': 'None4', 'delivery_tag': 2, 'redelivered': True, 'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
  File "/opt/lampp/htdocs/sequre_spacement_backend/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
    strategy = strategies[type_]
KeyError: 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'

Flask Celery – RabbitMQ连接不起作用

英文:

I have developed the flask application. I want to send mail from my application.
So I choose the Background process using Celery & RabbitMQ I got the below error while pull the data .

celery=Celery(
    'sequre_spacement_backend',
    broker = app.config['QUEUE_BROKER_URL'],
    include = ['sequre_spacement_backend.tasks.MeetingRoom.MeetingRoomSMSNotification']
)

while run the celery worker

Error

[2023-03-07 13:00:56,534: ERROR/MainProcess] Received unregistered task of type 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'.
The message has been ignored and discarded.


Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)

Thw full contents of the message headers:
{'lang': 'py', 'task': 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test', 'id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen137853@heptagon', 'ignore_result': False}

The delivery info for this task is:
{'consumer_tag': 'None4', 'delivery_tag': 2, 'redelivered': True, 'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
  File "/opt/lampp/htdocs/sequre_spacement_backend/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
    strategy = strategies[type_]
KeyError: 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'

Flask Celery – RabbitMQ连接不起作用

答案1

得分: 0

from config import QueueConfig
db = None
app = Flask(name, template_folder='../templates')
app.config.from_object(Config())
app.config.from_object(SqlDbConfig())
app.config.from_object(FileSystemConfig())
app.config.from_object(MongoDbConfig())
app.config.from_object(PushNotification())
app.config['MAX_CONTENT_LENGTH'] = 16 * 1000 * 1000
CORS(app, support_credentials=True)
bcrypt = Bcrypt(app)
migrate = Migrate(app, db)
db = SQLAlchemy(app, session_options={'autoflush': True, 'expire_on_commit': True, 'autocommit': True})
db.init_app(app)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config.from_object(QueueConfig())
celery_prefix = app.config['CELERY_PREFIX']
celery = Celery(
app.name,
broker=app.config['QUEUE_BROKER_URL'],
include=[
'tasks.MeetingRoom.MeetingRoomNotifications'
]
)
celery.conf.update(app.config)

class ContextTask(celery.Task):
def call(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask

It Worked for Me, We should register the task ContextTask and we need to add task function's file in include in Celery Connection

英文:
from config import QueueConfig
db = None
app = Flask(__name__, template_folder='../templates')
app.config.from_object(Config())
app.config.from_object(SqlDbConfig())
app.config.from_object(FileSystemConfig())
app.config.from_object(MongoDbConfig())
app.config.from_object(PushNotification())
app.config['MAX_CONTENT_LENGTH'] = 16 * 1000 * 1000
CORS(app, support_credentials = True)
bcrypt = Bcrypt(app)
migrate = Migrate(app, db)
db = SQLAlchemy(app, session_options = { 'autoflush': True, 
  'expire_on_commit': True, 'autocommit': True })
db.init_app(app)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config.from_object(QueueConfig())
celery_prefix = app.config['CELERY_PREFIX']
celery = Celery(
                app.name,
                broker = app.config['QUEUE_BROKER_URL'],
                include=[
                            'tasks.MeetingRoom.MeetingRoomNotifications'
                        ]
            )
 celery.conf.update(app.config)


class ContextTask(celery.Task):
  def __call__( self, *args, **kwargs ):
     with app.app_context():
     
        return self.run(*args, **kwargs)
 celery.Task = ContextTask

It Worked for Me, We should register the task ContextTask and we need to add task function's file in include in Celery Connection

huangapple
  • 本文由 发表于 2023年3月7日 17:18:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/75660014.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定