具有API的异步函数

huangapple go评论85阅读模式
英文:

Async Function with API

问题

TypeError: 视图函数未返回有效响应。返回类型必须是字符串、字典、元组、响应实例或 WSGI 可调用对象,但它是一个协程。

我需要在Flask中使用异步视图函数,接口函数 predict 应返回调用异步函数 model.transform(data_set, 'prediction') 的结果。然而,我尝试了各种方法,包括:

  1. 使用 async def 定义异步视图函数,并使用 await 等待异步操作的结果;
  2. 使用 async_helpers 模块;
  3. 使用 Flask 和 asyncio 实现异步函数接口。我尝试使用 Flask 的 Blueprint 和 asyncio 的 run_in_executor 方法定义异步接口。

然而,所有这些方法都返回了一个错误,即响应不是有效响应,而是协程对象。您有没有任何建议来解决这个问题?我的项目依赖于 Flask-2.0.2 和 Werkzeug-2.0.3。

英文:

TypeError: The view function did not return a valid response. The return type must be a string, dict, tuple, Response instance, or WSGI callable, but it was a coroutine.

I need to use asynchronous view function in Flask, and the interface function predict should return the result of calling the asynchronous function model.transform(data_set, 'prediction'). However, I have tried various methods including:

  1. defining an asynchronous view function using async def and using await to wait for the results of asynchronous operations;
  2. using the async_helpers module;
  3. using Flask and asyncio to implement an asynchronous function interface. I tried using Flask's Blueprint and the run_in_executor method in asyncio to define the asynchronous interface.
    However, all these methods returned an error that the response is not a valid response but a coroutine object. Do you have any suggestions to solve this problem? My project depends on Flask-2.0.2 and Werkzeug-2.0.3.
@bp.route("/predict", methods=["POST"])
@pre.catch({
    "userId": Rule(callback=ParamsCallback.convert_to_int, dest="user_id"),
    "modelId": Rule(type=str, dest="model_id"),
    "database": Rule(type=str, dest="database"),
    "table": Rule(type=str, dest="table")
})
# @async_helpers.async_suppress
@asyncio.coroutine
async def predictaa(params):
    # result = Result.get_model(user_id=params["user_id"], model_id=params["model_id"])
    result = Result.get_model(**params)
    model_id_value = params['model_id']
    if result is None:
        raise OperatorException("error", f"模型{model_id_value}不存在!")
    parameters = json.loads(result.parameterJson)  # 从请求体中获取要获取的数据。
    model_class = parameters.pop("model_class")
    parameters["user_id"] = params['user_id']
    parameters["process_id"] = result.targetId
    parameters["output_port_id"] = 27
    # model = eval("%s(**parameters)" % model_class) 

    data = json.loads(request.data)
    features = data.get("features")
    model_type = data.get("model_type")
    model_params = data.get("model_params")
    data_set = data.get("data")

    if model is None:
        return jsonify({'error': 'Model not found'})
    try:
    #     loop = asyncio.get_event_loop()
    #     result = await loop.run_in_executor(None, model.transform(data_set, 'prediction'))
    #     # result = await model.transform(data_set, 'prediction')
    #     response = result.to_dict(orient='records')
    #     return jsonify(response)
    # except Exception as e:
    #     return jsonify({'error': str(e)})
        result = await model.transform(data_set, 'prediction')
        response = result.to_dict(orient='records')
        return jsonify(response)
    except Exception as e:
        return jsonify({'error': str(e)})
    # # predictions, _ = await op.do_work(model, data_set)
    # prediction_coroutine, _ = op.do_work(model, data_set)
    # async with prediction_coroutine as prediction:
    #     prediction_dict = prediction.to_dict()
    #     return jsonify(prediction_dict)
    # result = predictions.to_dict(orient="records")
    # return jsonify(result)

class LinearSVCModel(Model):
    def __init__(self, user_id, process_id, output_port_id, features, label, label_type, neg, pos, **kwargs):
        self.label = label
        self.label_type = label_type
        self.neg = neg
        self.pos = pos
        self.coefficients = kwargs["coefficients"]
        self.intercept = kwargs["intercept"]
        super(LinearSVCModel, self).__init__(user_id, process_id, output_port_id, features)

    async def transform(self, unl, table_name):
        lab = await super(LinearSVCModel, self).transform(unl, table_name)
        prediction = "`prediction_%s`" % self.label[1:-1]
        linear_item_sql_list = ["`%s` * (%f)" % (self.features[i], self.coefficients[i]) for i in
                                range(len(self.features))]
        linear_sql = "SELECT %s + %f AS hr_temp_col_linear, * FROM %s" % (
            " + ".join(linear_item_sql_list), self.intercept, lab.hr_view)

        lab.loc[prediction] = [self.label_type, "prediction",
                               "CASE WHEN hr_temp_col_linear >= 0 THEN '%s' ELSE '%s' END" % (
                                   self.pos, self.neg)]
        lab = lab.hr_sort_by_role()
        await lab.hr_update_view(table_name, lab.hr_get_view_sql(from_="(%s) hr_temp_table_lsvc_model" % linear_sql))
        return lab

答案1

得分: 1

我使用了async和await关键字来定义一个视图函数,并在返回值上使用await关键字来等待model.transform(data_set, 'prediction')协程的结果并将其作为响应返回。我检查了model.transform()方法中也使用了await来等待异步函数的完成。然而,我不明白为什么异步函数仍然返回协程对象而不是一个值。运行以下脚本:

import flask
print(flask.__version__)
print(hasattr(flask, 'has_websocket_context'))

我发现Flask的版本是2.0.2,has_websocket_context属性为False。因此,我发现问题是项目中使用的Flask版本不支持异步视图函数。最后,我采用了不同的方法来解决问题,通过在同步视图函数中调用异步函数,主要依赖于asyncio。我记录了以下两种方法:

  1. 创建了一个新的事件循环loop,并将其设置为当前事件循环。然后,使用run_until_complete方法运行异步函数。
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
lab = loop.run_until_complete(model.transform(lab, table_name))
  1. 使用asyncio.run函数来运行异步函数,并使用return语句返回异步函数的结果。
lab = asyncio.run(model.transform(lab, table_name))
英文:

I used the async and await keywords to define a view function and used the await keyword on the return value to wait for the result of the model.transform(data_set, 'prediction') coroutine and return it as a response. I checked that await was also used in the model.transform() method to wait for the completion of the asynchronous function. However, I didn't understand why the asynchronous function still returns a coroutine object instead of a value. Running the following script:

import flask
print(flask.__version__)
print(hasattr(flask, 'has_websocket_context'))

I found that the Flask version is 2.0.2 and the has_websocket_context attribute is False. So I located the problem to be that the Flask version used in the project does not support asynchronous view functions. Finally, I used a different approach to solve the problem by calling the asynchronous function in a synchronous view function, mainly relying on asyncio.
I recorded the following two methods:

1.Created a new event loop loop and set it as the current event loop. Then, used the run_until_complete method to run the asynchronous function.

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
lab = loop.run_until_complete(model.transform(lab, table_name))

2.Used the asyncio.run function to run the asynchronous function and returned the result of the asynchronous function using the return statement.

lab = asyncio.run(model.transform(lab, table_name))

huangapple
  • 本文由 发表于 2023年2月27日 16:08:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/75578052.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定