英文:
Async Function with API
问题
TypeError: 视图函数未返回有效响应。返回类型必须是字符串、字典、元组、响应实例或 WSGI 可调用对象,但它是一个协程。
我需要在Flask中使用异步视图函数,接口函数 predict
应返回调用异步函数 model.transform(data_set, 'prediction')
的结果。然而,我尝试了各种方法,包括:
- 使用
async def
定义异步视图函数,并使用await
等待异步操作的结果; - 使用
async_helpers
模块; - 使用 Flask 和 asyncio 实现异步函数接口。我尝试使用 Flask 的 Blueprint 和 asyncio 的
run_in_executor
方法定义异步接口。
然而,所有这些方法都返回了一个错误,即响应不是有效响应,而是协程对象。您有没有任何建议来解决这个问题?我的项目依赖于 Flask-2.0.2 和 Werkzeug-2.0.3。
英文:
TypeError: The view function did not return a valid response. The return type must be a string, dict, tuple, Response instance, or WSGI callable, but it was a coroutine.
I need to use asynchronous view function in Flask, and the interface function predict should return the result of calling the asynchronous function model.transform(data_set, 'prediction'). However, I have tried various methods including:
- defining an asynchronous view function using async def and using await to wait for the results of asynchronous operations;
- using the async_helpers module;
- using Flask and asyncio to implement an asynchronous function interface. I tried using Flask's Blueprint and the run_in_executor method in asyncio to define the asynchronous interface.
However, all these methods returned an error that the response is not a valid response but a coroutine object. Do you have any suggestions to solve this problem? My project depends on Flask-2.0.2 and Werkzeug-2.0.3.
@bp.route("/predict", methods=["POST"])
@pre.catch({
"userId": Rule(callback=ParamsCallback.convert_to_int, dest="user_id"),
"modelId": Rule(type=str, dest="model_id"),
"database": Rule(type=str, dest="database"),
"table": Rule(type=str, dest="table")
})
# @async_helpers.async_suppress
@asyncio.coroutine
async def predictaa(params):
# result = Result.get_model(user_id=params["user_id"], model_id=params["model_id"])
result = Result.get_model(**params)
model_id_value = params['model_id']
if result is None:
raise OperatorException("error", f"模型{model_id_value}不存在!")
parameters = json.loads(result.parameterJson) # 从请求体中获取要获取的数据。
model_class = parameters.pop("model_class")
parameters["user_id"] = params['user_id']
parameters["process_id"] = result.targetId
parameters["output_port_id"] = 27
# model = eval("%s(**parameters)" % model_class)
data = json.loads(request.data)
features = data.get("features")
model_type = data.get("model_type")
model_params = data.get("model_params")
data_set = data.get("data")
if model is None:
return jsonify({'error': 'Model not found'})
try:
# loop = asyncio.get_event_loop()
# result = await loop.run_in_executor(None, model.transform(data_set, 'prediction'))
# # result = await model.transform(data_set, 'prediction')
# response = result.to_dict(orient='records')
# return jsonify(response)
# except Exception as e:
# return jsonify({'error': str(e)})
result = await model.transform(data_set, 'prediction')
response = result.to_dict(orient='records')
return jsonify(response)
except Exception as e:
return jsonify({'error': str(e)})
# # predictions, _ = await op.do_work(model, data_set)
# prediction_coroutine, _ = op.do_work(model, data_set)
# async with prediction_coroutine as prediction:
# prediction_dict = prediction.to_dict()
# return jsonify(prediction_dict)
# result = predictions.to_dict(orient="records")
# return jsonify(result)
class LinearSVCModel(Model):
def __init__(self, user_id, process_id, output_port_id, features, label, label_type, neg, pos, **kwargs):
self.label = label
self.label_type = label_type
self.neg = neg
self.pos = pos
self.coefficients = kwargs["coefficients"]
self.intercept = kwargs["intercept"]
super(LinearSVCModel, self).__init__(user_id, process_id, output_port_id, features)
async def transform(self, unl, table_name):
lab = await super(LinearSVCModel, self).transform(unl, table_name)
prediction = "`prediction_%s`" % self.label[1:-1]
linear_item_sql_list = ["`%s` * (%f)" % (self.features[i], self.coefficients[i]) for i in
range(len(self.features))]
linear_sql = "SELECT %s + %f AS hr_temp_col_linear, * FROM %s" % (
" + ".join(linear_item_sql_list), self.intercept, lab.hr_view)
lab.loc[prediction] = [self.label_type, "prediction",
"CASE WHEN hr_temp_col_linear >= 0 THEN '%s' ELSE '%s' END" % (
self.pos, self.neg)]
lab = lab.hr_sort_by_role()
await lab.hr_update_view(table_name, lab.hr_get_view_sql(from_="(%s) hr_temp_table_lsvc_model" % linear_sql))
return lab
答案1
得分: 1
我使用了async和await关键字来定义一个视图函数,并在返回值上使用await关键字来等待model.transform(data_set, 'prediction')协程的结果并将其作为响应返回。我检查了model.transform()方法中也使用了await来等待异步函数的完成。然而,我不明白为什么异步函数仍然返回协程对象而不是一个值。运行以下脚本:
import flask
print(flask.__version__)
print(hasattr(flask, 'has_websocket_context'))
我发现Flask的版本是2.0.2,has_websocket_context属性为False。因此,我发现问题是项目中使用的Flask版本不支持异步视图函数。最后,我采用了不同的方法来解决问题,通过在同步视图函数中调用异步函数,主要依赖于asyncio。我记录了以下两种方法:
- 创建了一个新的事件循环loop,并将其设置为当前事件循环。然后,使用run_until_complete方法运行异步函数。
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
lab = loop.run_until_complete(model.transform(lab, table_name))
- 使用asyncio.run函数来运行异步函数,并使用return语句返回异步函数的结果。
lab = asyncio.run(model.transform(lab, table_name))
英文:
I used the async and await keywords to define a view function and used the await keyword on the return value to wait for the result of the model.transform(data_set, 'prediction') coroutine and return it as a response. I checked that await was also used in the model.transform() method to wait for the completion of the asynchronous function. However, I didn't understand why the asynchronous function still returns a coroutine object instead of a value. Running the following script:
import flask
print(flask.__version__)
print(hasattr(flask, 'has_websocket_context'))
I found that the Flask version is 2.0.2 and the has_websocket_context attribute is False. So I located the problem to be that the Flask version used in the project does not support asynchronous view functions. Finally, I used a different approach to solve the problem by calling the asynchronous function in a synchronous view function, mainly relying on asyncio.
I recorded the following two methods:
1.Created a new event loop loop and set it as the current event loop. Then, used the run_until_complete method to run the asynchronous function.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
lab = loop.run_until_complete(model.transform(lab, table_name))
2.Used the asyncio.run function to run the asynchronous function and returned the result of the asynchronous function using the return statement.
lab = asyncio.run(model.transform(lab, table_name))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论