运行 Celery 时出现运行时错误,似乎在请求上下文之外工作。

huangapple go评论72阅读模式
英文:

runtime error where it appears to have working outside of request context when running celery

问题

I'm working on a Flask project with Celery, and I want to distribute the same task when receiving lots of POST request data from the server. Whenever I run my Flask and Celery, I keep getting this error:

RuntimeError: Working outside of request context. This typically means that you attempted to use functionality that needed an active HTTP request when using Celery.

My project is separated into several files:

__init__.py

from celery import Celery

def make_celery(app_name=__name__):
    redis_uri = "redis://localhost:6379"
    return Celery(app_name, backend=redis_uri, broker=redis_uri)

celery = make_celery()

task.py

from liveapp import celery
from flask import abort, make_response, jsonify, request
from datetime import datetime
import re
from numpy import array, concatenate
from .utils import series_to_supervised, model_retrain, model_repredict, model_predict
import pyodbc
import gc

@celery.task()
def process_live():
    if request.is_json:
        data = request.get_json()
        # ... do something ...

run.py

from liveapp import factory
import liveapp
from waitress import serve

if __name__ == "__main__":
    app = factory.create_app(celery=liveapp.celery)
    serve(app, host='127.0.0.1', port=50000)

celery_utils.py

from pandas import DataFrame, concat

def init_celery(celery, app):
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    
    celery.Task = ContextTask

factory.py

from flask import Flask
import os
from .celery_utils import init_celery

PKG_NAME = os.path.dirname(os.path.realpath(__file__)).split("/")[-1]
print(PKG_NAME)

def create_app(app_name=PKG_NAME, **kwargs):
    app = Flask(app_name)
    if kwargs.get("celery"):
        init_celery(kwargs.get("celery"), app)
    from liveapp.all import bp

    app.register_blueprint(bp)
    return app

all.py

from flask import Blueprint, make_response, jsonify
import os
from .task import process_live

bp = Blueprint("all", __name__)

@bp.route("/")
def index():
    return "Hello!"

@bp.route("/api/live", methods=["POST"])
def live():
    process_live.delay()
    return make_response(jsonify({"message": "JSON received"}), 200)

When I run my Flask with Celery, this is how I configure it:

celery -A celery_worker.celery worker -l info -P solo

The moment when I execute my Flask and Celery, everything works fine until I encounter this error:

[2023-05-29 15:54:15,512: INFO/MainProcess] Task liveapp.task.process_live[d1c6508c-5fb0-4fe6-a732-0469dc262b7d] received
[2023-05-29 15:54:15,514: ERROR/MainProcess] Task liveapp.task.process_live[d1c6508c-5fb0-4fe6-a732-0469dc262b7d] raised unexpected: RuntimeError('Working outside of request context.\n\nThis typically means that you attempted to use functionality that needed\nan active HTTP request.  Consult the documentation on testing for\ninformation about how to avoid this problem.')
Traceback (most recent call last):
  File "C:\Users\admin\anaconda3\lib\site-packages\celery\app\trace.py", line 451, in trace_task
    R = retval = fun(*args, **kwargs)
  File "D:\new test\liveapp\celery_utils.py", line 10, in __call__
    return TaskBase.__call__(self, *args, **kwargs)
  File "C:\Users\admin\anaconda3\lib\site-packages\celery\app\trace.py", line 734, in __protected_call__
    return self.run(*args, **kwargs)
  File "D:\new test\liveapp\task.py", line 12, in process_live
    if request.is_json:
  File "C:\Users\admin\anaconda3\lib\site-packages\werkzeug\local.py", line 436, in __get__
    obj = instance._get_current_object()
  File "C:\Users\admin\anaconda3\lib\site-packages\werkzeug\local.py", line 565, in _get_current_object
    return self.__local()  # type: ignore
  File "C:\Users\admin\anaconda3\lib\site-packages\flask\globals.py", line 38, in _lookup_req_object
    raise RuntimeError(_request_ctx_err_msg)
RuntimeError: Working outside of request context.

This typically means that you attempted to use functionality that needed
an active HTTP request. Consult the documentation on testing for
information about how to avoid this problem.

What am I missing out on?

英文:

im working on a flask project with celery, which i wanna distribute the same task when getting lots of post request data from the server. whenever i run my flask and celery, i keep having this error that says:

RuntimeError: Working outside of request context, This typically means that you attempted to use functionality that needed an active HTTP request when using celery.

my project was separated to few parts of files

new test
|_ liveapp
    |__ __init__.py
    |__ task.py
    |__ run.py
    |__ utils.py
    |__ celery_utils.py
    |__ factory.py
    |__ all.py
    celery_worker.py
    run.py

init.py

from celery import Celery

def make_celery(app_name = __name__):
    redis_uri = "redis://localhost:6379"
    return Celery(app_name, backend=redis_uri, broker=redis_uri)
celery = make_celery()

task.py

from liveapp import celery
from flask import abort, make_response,jsonify, request
from datetime import datetime
import re
from numpy import array, concatenate
from .utils import series_to_supervised,model_retrain, model_repredict, model_predict
import pyodbc
import gc

@celery.task()
def process_live():
    if request.is_json:
        data = request.get_json()
   ,,,,,,,,,,,do something,,,,

run.py

from liveapp import factory
import liveapp
from waitress import serve

if __name__ == "__main__":
    app = factory.create_app(celery = liveapp.celery)
    serve(app, host='127.0.0.1',port=50000)

celery_utils.py

from pandas import DataFrame, concat

def init_celery(celery,app):
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)     
    celery.Task = ContextTask

factory.py

from flask import Flask
import os
from .celery_utils import init_celery

PKG_NAME = os.path.dirname(os.path.realpath(__file__)).split("/")[-1]
print(PKG_NAME)
def create_app(app_name=PKG_NAME, **kwargs):
    app = Flask(app_name)
    if kwargs.get("celery"):
        init_celery(kwargs.get("celery"), app)
    from liveapp.all import bp

    app.register_blueprint(bp)
    return app

all.py

from flask import Blueprint, make_response,jsonify
import os
from .task import process_live

bp = Blueprint("all", __name__)
@bp.route("/")
def index():
    return "Hello!"

@bp.route("/api/live", methods=["POST"])
def live():
    
    
    process_live.delay()

    return  make_response(jsonify({"message":"JSON received"}), 200)

when i run my Flask with celery, this is how i configured:

D:\new test>celery -A celery_worker.celery worker -l info -P solo

the moment when i execute my flask and celery, there wasnt any problem until i ran to this:

[2023-05-29 15:54:15,512: INFO/MainProcess] Task liveapp.task.process_live[d1c6508c-5fb0-4fe6-a732-0469dc262b7d] received
[2023-05-29 15:54:15,514: ERROR/MainProcess] Task liveapp.task.process_live[d1c6508c-5fb0-4fe6-a732-0469dc262b7d] raised unexpected: RuntimeError('Working outside of request context.\n\nThis typically means that you attempted to use functionality that needed\nan active HTTP request.  Consult the documentation on testing for\ninformation about how to avoid this problem.')
Traceback (most recent call last):
  File "C:\Users\admin\anaconda3\lib\site-packages\celery\app\trace.py", line 451, in trace_task
    R = retval = fun(*args, **kwargs)
  File "D:\new test\liveapp\celery_utils.py", line 10, in __call__
    return TaskBase.__call__(self, *args, **kwargs)
  File "C:\Users\admin\anaconda3\lib\site-packages\celery\app\trace.py", line 734, in __protected_call__
    return self.run(*args, **kwargs)
  File "D:\new test\liveapp\task.py", line 12, in process_live
    if request.is_json:
  File "C:\Users\admin\anaconda3\lib\site-packages\werkzeug\local.py", line 436, in __get__
    obj = instance._get_current_object()
  File "C:\Users\admin\anaconda3\lib\site-packages\werkzeug\local.py", line 565, in _get_current_object
    return self.__local()  # type: ignore
  File "C:\Users\admin\anaconda3\lib\site-packages\flask\globals.py", line 38, in _lookup_req_object
    raise RuntimeError(_request_ctx_err_msg)
RuntimeError: Working outside of request context.

This typically means that you attempted to use functionality that needed
an active HTTP request.  Consult the documentation on testing for
information about how to avoid this problem.

what am i missing out??

答案1

得分: 1

celery workersFlask 上下文之外运行,不应该在 task.py 中导入 Flask 对象。在 Flask 和 celery 之间传递参数的方式如下所示:

task.py - 向任务添加参数并移除 Flask 导入:

from liveapp import celery
from datetime import datetime
import re
from numpy import array, concatenate
from .utils import series_to_supervised, model_retrain, model_repredict, model_predict
import pyodbc
import gc

@celery.task()
def process_live(arg1, arg2):
    # 在这里执行一些操作

all.py 使用以下方式调用你的任务并传递参数:

from flask import Blueprint, make_response, jsonify
import os
from .task import process_live

bp = Blueprint("all", __name__)

@bp.route("/")
def index():
    return "Hello!"

@bp.route("/api/live", methods=["POST"])
def live():
    arg1 = "something"
    arg2 = "something else"

    process_live.delay(arg1, arg2)

    return make_response(jsonify({"message": "JSON received"}), 200)
英文:

celery workers run outside of Flask context and you shouldn't import flask objects in task.py the way to pass arguments between flask and celery is like so:

task.py - Add arguments to your task and remove flask imports:

from liveapp import celery
from datetime import datetime
import re
from numpy import array, concatenate
from .utils import series_to_supervised,model_retrain, model_repredict, model_predict
import pyodbc
import gc

@celery.task()
def process_live(arg1, arg2):
   ,,,,,,,,,,,do something,,,,

all.py call your task with arguments like so:

from flask import Blueprint, make_response,jsonify
import os
from .task import process_live

bp = Blueprint("all", __name__)
@bp.route("/")
def index():
    return "Hello!"

@bp.route("/api/live", methods=["POST"])
def live():
    
    arg1 = "something"
    arg2 = "something else"

    process_live.delay(arg1, arg2)

    return  make_response(jsonify({"message":"JSON received"}), 200)

huangapple
  • 本文由 发表于 2023年5月29日 15:57:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76355586.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定