运行 Celery 时出现运行时错误,似乎在请求上下文之外工作。

huangapple go评论101阅读模式
英文:

runtime error where it appears to have working outside of request context when running celery

问题

I'm working on a Flask project with Celery, and I want to distribute the same task when receiving lots of POST request data from the server. Whenever I run my Flask and Celery, I keep getting this error:

  1. RuntimeError: Working outside of request context. This typically means that you attempted to use functionality that needed an active HTTP request when using Celery.

My project is separated into several files:

__init__.py

  1. from celery import Celery
  2. def make_celery(app_name=__name__):
  3. redis_uri = "redis://localhost:6379"
  4. return Celery(app_name, backend=redis_uri, broker=redis_uri)
  5. celery = make_celery()

task.py

  1. from liveapp import celery
  2. from flask import abort, make_response, jsonify, request
  3. from datetime import datetime
  4. import re
  5. from numpy import array, concatenate
  6. from .utils import series_to_supervised, model_retrain, model_repredict, model_predict
  7. import pyodbc
  8. import gc
  9. @celery.task()
  10. def process_live():
  11. if request.is_json:
  12. data = request.get_json()
  13. # ... do something ...

run.py

  1. from liveapp import factory
  2. import liveapp
  3. from waitress import serve
  4. if __name__ == "__main__":
  5. app = factory.create_app(celery=liveapp.celery)
  6. serve(app, host='127.0.0.1', port=50000)

celery_utils.py

  1. from pandas import DataFrame, concat
  2. def init_celery(celery, app):
  3. celery.conf.update(app.config)
  4. TaskBase = celery.Task
  5. class ContextTask(TaskBase):
  6. def __call__(self, *args, **kwargs):
  7. with app.app_context():
  8. return TaskBase.__call__(self, *args, **kwargs)
  9. celery.Task = ContextTask

factory.py

  1. from flask import Flask
  2. import os
  3. from .celery_utils import init_celery
  4. PKG_NAME = os.path.dirname(os.path.realpath(__file__)).split("/")[-1]
  5. print(PKG_NAME)
  6. def create_app(app_name=PKG_NAME, **kwargs):
  7. app = Flask(app_name)
  8. if kwargs.get("celery"):
  9. init_celery(kwargs.get("celery"), app)
  10. from liveapp.all import bp
  11. app.register_blueprint(bp)
  12. return app

all.py

  1. from flask import Blueprint, make_response, jsonify
  2. import os
  3. from .task import process_live
  4. bp = Blueprint("all", __name__)
  5. @bp.route("/")
  6. def index():
  7. return "Hello!"
  8. @bp.route("/api/live", methods=["POST"])
  9. def live():
  10. process_live.delay()
  11. return make_response(jsonify({"message": "JSON received"}), 200)

When I run my Flask with Celery, this is how I configure it:

  1. celery -A celery_worker.celery worker -l info -P solo

The moment when I execute my Flask and Celery, everything works fine until I encounter this error:

  1. [2023-05-29 15:54:15,512: INFO/MainProcess] Task liveapp.task.process_live[d1c6508c-5fb0-4fe6-a732-0469dc262b7d] received
  2. [2023-05-29 15:54:15,514: ERROR/MainProcess] Task liveapp.task.process_live[d1c6508c-5fb0-4fe6-a732-0469dc262b7d] raised unexpected: RuntimeError('Working outside of request context.\n\nThis typically means that you attempted to use functionality that needed\nan active HTTP request. Consult the documentation on testing for\ninformation about how to avoid this problem.')
  3. Traceback (most recent call last):
  4. File "C:\Users\admin\anaconda3\lib\site-packages\celery\app\trace.py", line 451, in trace_task
  5. R = retval = fun(*args, **kwargs)
  6. File "D:\new test\liveapp\celery_utils.py", line 10, in __call__
  7. return TaskBase.__call__(self, *args, **kwargs)
  8. File "C:\Users\admin\anaconda3\lib\site-packages\celery\app\trace.py", line 734, in __protected_call__
  9. return self.run(*args, **kwargs)
  10. File "D:\new test\liveapp\task.py", line 12, in process_live
  11. if request.is_json:
  12. File "C:\Users\admin\anaconda3\lib\site-packages\werkzeug\local.py", line 436, in __get__
  13. obj = instance._get_current_object()
  14. File "C:\Users\admin\anaconda3\lib\site-packages\werkzeug\local.py", line 565, in _get_current_object
  15. return self.__local() # type: ignore
  16. File "C:\Users\admin\anaconda3\lib\site-packages\flask\globals.py", line 38, in _lookup_req_object
  17. raise RuntimeError(_request_ctx_err_msg)
  18. RuntimeError: Working outside of request context.
  19. This typically means that you attempted to use functionality that needed
  20. an active HTTP request. Consult the documentation on testing for
  21. information about how to avoid this problem.

What am I missing out on?

英文:

im working on a flask project with celery, which i wanna distribute the same task when getting lots of post request data from the server. whenever i run my flask and celery, i keep having this error that says:

RuntimeError: Working outside of request context, This typically means that you attempted to use functionality that needed an active HTTP request when using celery.

my project was separated to few parts of files

  1. new test
  2. |_ liveapp
  3. |__ __init__.py
  4. |__ task.py
  5. |__ run.py
  6. |__ utils.py
  7. |__ celery_utils.py
  8. |__ factory.py
  9. |__ all.py
  10. celery_worker.py
  11. run.py

init.py

  1. from celery import Celery
  2. def make_celery(app_name = __name__):
  3. redis_uri = "redis://localhost:6379"
  4. return Celery(app_name, backend=redis_uri, broker=redis_uri)
  5. celery = make_celery()

task.py

  1. from liveapp import celery
  2. from flask import abort, make_response,jsonify, request
  3. from datetime import datetime
  4. import re
  5. from numpy import array, concatenate
  6. from .utils import series_to_supervised,model_retrain, model_repredict, model_predict
  7. import pyodbc
  8. import gc
  9. @celery.task()
  10. def process_live():
  11. if request.is_json:
  12. data = request.get_json()
  13. ,,,,,,,,,,,do something,,,,

run.py

  1. from liveapp import factory
  2. import liveapp
  3. from waitress import serve
  4. if __name__ == "__main__":
  5. app = factory.create_app(celery = liveapp.celery)
  6. serve(app, host='127.0.0.1',port=50000)

celery_utils.py

  1. from pandas import DataFrame, concat
  2. def init_celery(celery,app):
  3. celery.conf.update(app.config)
  4. TaskBase = celery.Task
  5. class ContextTask(TaskBase):
  6. def __call__(self, *args, **kwargs):
  7. with app.app_context():
  8. return TaskBase.__call__(self, *args, **kwargs)
  9. celery.Task = ContextTask

factory.py

  1. from flask import Flask
  2. import os
  3. from .celery_utils import init_celery
  4. PKG_NAME = os.path.dirname(os.path.realpath(__file__)).split("/")[-1]
  5. print(PKG_NAME)
  6. def create_app(app_name=PKG_NAME, **kwargs):
  7. app = Flask(app_name)
  8. if kwargs.get("celery"):
  9. init_celery(kwargs.get("celery"), app)
  10. from liveapp.all import bp
  11. app.register_blueprint(bp)
  12. return app

all.py

  1. from flask import Blueprint, make_response,jsonify
  2. import os
  3. from .task import process_live
  4. bp = Blueprint("all", __name__)
  5. @bp.route("/")
  6. def index():
  7. return "Hello!"
  8. @bp.route("/api/live", methods=["POST"])
  9. def live():
  10. process_live.delay()
  11. return make_response(jsonify({"message":"JSON received"}), 200)

when i run my Flask with celery, this is how i configured:

  1. D:\new test>celery -A celery_worker.celery worker -l info -P solo

the moment when i execute my flask and celery, there wasnt any problem until i ran to this:

  1. [2023-05-29 15:54:15,512: INFO/MainProcess] Task liveapp.task.process_live[d1c6508c-5fb0-4fe6-a732-0469dc262b7d] received
  2. [2023-05-29 15:54:15,514: ERROR/MainProcess] Task liveapp.task.process_live[d1c6508c-5fb0-4fe6-a732-0469dc262b7d] raised unexpected: RuntimeError('Working outside of request context.\n\nThis typically means that you attempted to use functionality that needed\nan active HTTP request. Consult the documentation on testing for\ninformation about how to avoid this problem.')
  3. Traceback (most recent call last):
  4. File "C:\Users\admin\anaconda3\lib\site-packages\celery\app\trace.py", line 451, in trace_task
  5. R = retval = fun(*args, **kwargs)
  6. File "D:\new test\liveapp\celery_utils.py", line 10, in __call__
  7. return TaskBase.__call__(self, *args, **kwargs)
  8. File "C:\Users\admin\anaconda3\lib\site-packages\celery\app\trace.py", line 734, in __protected_call__
  9. return self.run(*args, **kwargs)
  10. File "D:\new test\liveapp\task.py", line 12, in process_live
  11. if request.is_json:
  12. File "C:\Users\admin\anaconda3\lib\site-packages\werkzeug\local.py", line 436, in __get__
  13. obj = instance._get_current_object()
  14. File "C:\Users\admin\anaconda3\lib\site-packages\werkzeug\local.py", line 565, in _get_current_object
  15. return self.__local() # type: ignore
  16. File "C:\Users\admin\anaconda3\lib\site-packages\flask\globals.py", line 38, in _lookup_req_object
  17. raise RuntimeError(_request_ctx_err_msg)
  18. RuntimeError: Working outside of request context.
  19. This typically means that you attempted to use functionality that needed
  20. an active HTTP request. Consult the documentation on testing for
  21. information about how to avoid this problem.

what am i missing out??

答案1

得分: 1

celery workersFlask 上下文之外运行,不应该在 task.py 中导入 Flask 对象。在 Flask 和 celery 之间传递参数的方式如下所示:

task.py - 向任务添加参数并移除 Flask 导入:

  1. from liveapp import celery
  2. from datetime import datetime
  3. import re
  4. from numpy import array, concatenate
  5. from .utils import series_to_supervised, model_retrain, model_repredict, model_predict
  6. import pyodbc
  7. import gc
  8. @celery.task()
  9. def process_live(arg1, arg2):
  10. # 在这里执行一些操作

all.py 使用以下方式调用你的任务并传递参数:

  1. from flask import Blueprint, make_response, jsonify
  2. import os
  3. from .task import process_live
  4. bp = Blueprint("all", __name__)
  5. @bp.route("/")
  6. def index():
  7. return "Hello!"
  8. @bp.route("/api/live", methods=["POST"])
  9. def live():
  10. arg1 = "something"
  11. arg2 = "something else"
  12. process_live.delay(arg1, arg2)
  13. return make_response(jsonify({"message": "JSON received"}), 200)
英文:

celery workers run outside of Flask context and you shouldn't import flask objects in task.py the way to pass arguments between flask and celery is like so:

task.py - Add arguments to your task and remove flask imports:

  1. from liveapp import celery
  2. from datetime import datetime
  3. import re
  4. from numpy import array, concatenate
  5. from .utils import series_to_supervised,model_retrain, model_repredict, model_predict
  6. import pyodbc
  7. import gc
  8. @celery.task()
  9. def process_live(arg1, arg2):
  10. ,,,,,,,,,,,do something,,,,

all.py call your task with arguments like so:

  1. from flask import Blueprint, make_response,jsonify
  2. import os
  3. from .task import process_live
  4. bp = Blueprint("all", __name__)
  5. @bp.route("/")
  6. def index():
  7. return "Hello!"
  8. @bp.route("/api/live", methods=["POST"])
  9. def live():
  10. arg1 = "something"
  11. arg2 = "something else"
  12. process_live.delay(arg1, arg2)
  13. return make_response(jsonify({"message":"JSON received"}), 200)

huangapple
  • 本文由 发表于 2023年5月29日 15:57:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76355586.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定