具有API的异步函数

huangapple go评论106阅读模式
英文:

Async Function with API

问题

TypeError: 视图函数未返回有效响应。返回类型必须是字符串、字典、元组、响应实例或 WSGI 可调用对象,但它是一个协程。

我需要在Flask中使用异步视图函数,接口函数 predict 应返回调用异步函数 model.transform(data_set, 'prediction') 的结果。然而,我尝试了各种方法,包括:

  1. 使用 async def 定义异步视图函数,并使用 await 等待异步操作的结果;
  2. 使用 async_helpers 模块;
  3. 使用 Flask 和 asyncio 实现异步函数接口。我尝试使用 Flask 的 Blueprint 和 asyncio 的 run_in_executor 方法定义异步接口。

然而,所有这些方法都返回了一个错误,即响应不是有效响应,而是协程对象。您有没有任何建议来解决这个问题?我的项目依赖于 Flask-2.0.2 和 Werkzeug-2.0.3。

英文:

TypeError: The view function did not return a valid response. The return type must be a string, dict, tuple, Response instance, or WSGI callable, but it was a coroutine.

I need to use asynchronous view function in Flask, and the interface function predict should return the result of calling the asynchronous function model.transform(data_set, 'prediction'). However, I have tried various methods including:

  1. defining an asynchronous view function using async def and using await to wait for the results of asynchronous operations;
  2. using the async_helpers module;
  3. using Flask and asyncio to implement an asynchronous function interface. I tried using Flask's Blueprint and the run_in_executor method in asyncio to define the asynchronous interface.
    However, all these methods returned an error that the response is not a valid response but a coroutine object. Do you have any suggestions to solve this problem? My project depends on Flask-2.0.2 and Werkzeug-2.0.3.
  1. @bp.route("/predict", methods=["POST"])
  2. @pre.catch({
  3. "userId": Rule(callback=ParamsCallback.convert_to_int, dest="user_id"),
  4. "modelId": Rule(type=str, dest="model_id"),
  5. "database": Rule(type=str, dest="database"),
  6. "table": Rule(type=str, dest="table")
  7. })
  8. # @async_helpers.async_suppress
  9. @asyncio.coroutine
  10. async def predictaa(params):
  11. # result = Result.get_model(user_id=params["user_id"], model_id=params["model_id"])
  12. result = Result.get_model(**params)
  13. model_id_value = params['model_id']
  14. if result is None:
  15. raise OperatorException("error", f"模型{model_id_value}不存在!")
  16. parameters = json.loads(result.parameterJson) # 从请求体中获取要获取的数据。
  17. model_class = parameters.pop("model_class")
  18. parameters["user_id"] = params['user_id']
  19. parameters["process_id"] = result.targetId
  20. parameters["output_port_id"] = 27
  21. # model = eval("%s(**parameters)" % model_class)
  22. data = json.loads(request.data)
  23. features = data.get("features")
  24. model_type = data.get("model_type")
  25. model_params = data.get("model_params")
  26. data_set = data.get("data")
  27. if model is None:
  28. return jsonify({'error': 'Model not found'})
  29. try:
  30. # loop = asyncio.get_event_loop()
  31. # result = await loop.run_in_executor(None, model.transform(data_set, 'prediction'))
  32. # # result = await model.transform(data_set, 'prediction')
  33. # response = result.to_dict(orient='records')
  34. # return jsonify(response)
  35. # except Exception as e:
  36. # return jsonify({'error': str(e)})
  37. result = await model.transform(data_set, 'prediction')
  38. response = result.to_dict(orient='records')
  39. return jsonify(response)
  40. except Exception as e:
  41. return jsonify({'error': str(e)})
  42. # # predictions, _ = await op.do_work(model, data_set)
  43. # prediction_coroutine, _ = op.do_work(model, data_set)
  44. # async with prediction_coroutine as prediction:
  45. # prediction_dict = prediction.to_dict()
  46. # return jsonify(prediction_dict)
  47. # result = predictions.to_dict(orient="records")
  48. # return jsonify(result)
  49. class LinearSVCModel(Model):
  50. def __init__(self, user_id, process_id, output_port_id, features, label, label_type, neg, pos, **kwargs):
  51. self.label = label
  52. self.label_type = label_type
  53. self.neg = neg
  54. self.pos = pos
  55. self.coefficients = kwargs["coefficients"]
  56. self.intercept = kwargs["intercept"]
  57. super(LinearSVCModel, self).__init__(user_id, process_id, output_port_id, features)
  58. async def transform(self, unl, table_name):
  59. lab = await super(LinearSVCModel, self).transform(unl, table_name)
  60. prediction = "`prediction_%s`" % self.label[1:-1]
  61. linear_item_sql_list = ["`%s` * (%f)" % (self.features[i], self.coefficients[i]) for i in
  62. range(len(self.features))]
  63. linear_sql = "SELECT %s + %f AS hr_temp_col_linear, * FROM %s" % (
  64. " + ".join(linear_item_sql_list), self.intercept, lab.hr_view)
  65. lab.loc[prediction] = [self.label_type, "prediction",
  66. "CASE WHEN hr_temp_col_linear >= 0 THEN '%s' ELSE '%s' END" % (
  67. self.pos, self.neg)]
  68. lab = lab.hr_sort_by_role()
  69. await lab.hr_update_view(table_name, lab.hr_get_view_sql(from_="(%s) hr_temp_table_lsvc_model" % linear_sql))
  70. return lab

答案1

得分: 1

我使用了async和await关键字来定义一个视图函数,并在返回值上使用await关键字来等待model.transform(data_set, 'prediction')协程的结果并将其作为响应返回。我检查了model.transform()方法中也使用了await来等待异步函数的完成。然而,我不明白为什么异步函数仍然返回协程对象而不是一个值。运行以下脚本:

  1. import flask
  2. print(flask.__version__)
  3. print(hasattr(flask, 'has_websocket_context'))

我发现Flask的版本是2.0.2,has_websocket_context属性为False。因此,我发现问题是项目中使用的Flask版本不支持异步视图函数。最后,我采用了不同的方法来解决问题,通过在同步视图函数中调用异步函数,主要依赖于asyncio。我记录了以下两种方法:

  1. 创建了一个新的事件循环loop,并将其设置为当前事件循环。然后,使用run_until_complete方法运行异步函数。
  1. loop = asyncio.new_event_loop()
  2. asyncio.set_event_loop(loop)
  3. lab = loop.run_until_complete(model.transform(lab, table_name))
  1. 使用asyncio.run函数来运行异步函数,并使用return语句返回异步函数的结果。
  1. lab = asyncio.run(model.transform(lab, table_name))
英文:

I used the async and await keywords to define a view function and used the await keyword on the return value to wait for the result of the model.transform(data_set, 'prediction') coroutine and return it as a response. I checked that await was also used in the model.transform() method to wait for the completion of the asynchronous function. However, I didn't understand why the asynchronous function still returns a coroutine object instead of a value. Running the following script:

  1. import flask
  2. print(flask.__version__)
  3. print(hasattr(flask, 'has_websocket_context'))

I found that the Flask version is 2.0.2 and the has_websocket_context attribute is False. So I located the problem to be that the Flask version used in the project does not support asynchronous view functions. Finally, I used a different approach to solve the problem by calling the asynchronous function in a synchronous view function, mainly relying on asyncio.
I recorded the following two methods:

1.Created a new event loop loop and set it as the current event loop. Then, used the run_until_complete method to run the asynchronous function.

  1. loop = asyncio.new_event_loop()
  2. asyncio.set_event_loop(loop)
  3. lab = loop.run_until_complete(model.transform(lab, table_name))

2.Used the asyncio.run function to run the asynchronous function and returned the result of the asynchronous function using the return statement.

  1. lab = asyncio.run(model.transform(lab, table_name))

huangapple
  • 本文由 发表于 2023年2月27日 16:08:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/75578052.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定