英文:
Flask Celery - RabbitMQ Connection is not Working
问题
我已经开发了Flask应用程序。我想从我的应用程序中发送邮件。
因此,我选择了使用Celery和RabbitMQ的后台进程,但在拉取数据时出现以下错误。
celery=Celery(
'sequre_spacement_backend',
broker = app.config['QUEUE_BROKER_URL'],
include = ['sequre_spacement_backend.tasks.MeetingRoom.MeetingRoomSMSNotification']
)
运行celery worker时出现以下错误:
[2023-03-07 13:00:56,534: ERROR/MainProcess] 收到未注册的任务类型' tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'。
消息已被忽略和丢弃。
您是否记得导入包含此任务的模块?
或者您是否使用了相对导入?
请查看更多信息,请参阅
http://docs.celeryq.org/en/latest/internals/protocol.html
消息体的完整内容如下:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
消息头的完整内容如下:
{'lang': 'py', 'task': 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test', 'id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen137853@heptagon', 'ignore_result': False}
此任务的传递信息如下:
{'consumer_tag': 'None4', 'delivery_tag': 2, 'redelivered': True, 'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
File "/opt/lampp/htdocs/sequre_spacement_backend/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
strategy = strategies[type_]
KeyError: 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'
英文:
I have developed the flask application. I want to send mail from my application.
So I choose the Background process using Celery & RabbitMQ I got the below error while pull the data .
celery=Celery(
'sequre_spacement_backend',
broker = app.config['QUEUE_BROKER_URL'],
include = ['sequre_spacement_backend.tasks.MeetingRoom.MeetingRoomSMSNotification']
)
while run the celery worker
Error
[2023-03-07 13:00:56,534: ERROR/MainProcess] Received unregistered task of type 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Thw full contents of the message headers:
{'lang': 'py', 'task': 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test', 'id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen137853@heptagon', 'ignore_result': False}
The delivery info for this task is:
{'consumer_tag': 'None4', 'delivery_tag': 2, 'redelivered': True, 'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
File "/opt/lampp/htdocs/sequre_spacement_backend/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
strategy = strategies[type_]
KeyError: 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'
答案1
得分: 0
from config import QueueConfig
db = None
app = Flask(name, template_folder='../templates')
app.config.from_object(Config())
app.config.from_object(SqlDbConfig())
app.config.from_object(FileSystemConfig())
app.config.from_object(MongoDbConfig())
app.config.from_object(PushNotification())
app.config['MAX_CONTENT_LENGTH'] = 16 * 1000 * 1000
CORS(app, support_credentials=True)
bcrypt = Bcrypt(app)
migrate = Migrate(app, db)
db = SQLAlchemy(app, session_options={'autoflush': True, 'expire_on_commit': True, 'autocommit': True})
db.init_app(app)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config.from_object(QueueConfig())
celery_prefix = app.config['CELERY_PREFIX']
celery = Celery(
app.name,
broker=app.config['QUEUE_BROKER_URL'],
include=[
'tasks.MeetingRoom.MeetingRoomNotifications'
]
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def call(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
It Worked for Me, We should register the task ContextTask and we need to add task function's file in include in Celery Connection
英文:
from config import QueueConfig
db = None
app = Flask(__name__, template_folder='../templates')
app.config.from_object(Config())
app.config.from_object(SqlDbConfig())
app.config.from_object(FileSystemConfig())
app.config.from_object(MongoDbConfig())
app.config.from_object(PushNotification())
app.config['MAX_CONTENT_LENGTH'] = 16 * 1000 * 1000
CORS(app, support_credentials = True)
bcrypt = Bcrypt(app)
migrate = Migrate(app, db)
db = SQLAlchemy(app, session_options = { 'autoflush': True,
'expire_on_commit': True, 'autocommit': True })
db.init_app(app)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config.from_object(QueueConfig())
celery_prefix = app.config['CELERY_PREFIX']
celery = Celery(
app.name,
broker = app.config['QUEUE_BROKER_URL'],
include=[
'tasks.MeetingRoom.MeetingRoomNotifications'
]
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__( self, *args, **kwargs ):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
It Worked for Me, We should register the task ContextTask and we need to add task function's file in include in Celery Connection
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论