如何在使用Flask的多进程时实现WSGI?

huangapple go评论108阅读模式
英文:

How to implement WSGI while using multiprocessing in flask?

问题

以下是您要翻译的代码部分:

  1. 假设我有一个在函数中的视频处理程序我想使用`multiprocessing`模块而不是`threading`来实现真正的并行处理
  2. 因此我的代码大致如下
  3. ```python
  4. def start_subprocess(pid, progress_dict):
  5. '''
  6. 假设这是视频处理的启动器...
  7. '''
  8. from time import sleep
  9. # 模拟具有可变进度的子进程
  10. progress = 0
  11. while progress < 100:
  12. sleep(1)
  13. progress += 10
  14. progress_dict[pid] = progress
  15. def get_current_progress_of_subprocess(pid, progress_dict):
  16. '''
  17. 假设这是根据pid获取当前进度的视频当前进度,在这个上下文中,当前进度是所有已处理的当前帧...
  18. '''
  19. # 检索子进程的当前进度
  20. if pid in progress_dict:
  21. return progress_dict[pid]
  22. else:
  23. return None
  24. def flask_service(progress_dict):
  25. from flask import Flask, request, jsonify
  26. from multiprocessing import Process
  27. app = Flask(__name__)
  28. @app.route('/start_process')
  29. def start_process():
  30. pid = request.args.get('pid')
  31. if pid is not None:
  32. try:
  33. pid = int(pid)
  34. except ValueError:
  35. return jsonify({'message': '无效的pid。'}), 400
  36. # 启动新的子进程
  37. if pid not in progress_dict:
  38. process = Process(target=start_subprocess, args=(pid, progress_dict))
  39. process.start()
  40. progress_dict[pid] = 0
  41. else:
  42. return jsonify({'message': f'已启动pid {pid} 的进程。'}), 400
  43. return jsonify({'message': f'已启动pid:{pid} 的进程。'}), 200
  44. else:
  45. return jsonify({'message': '未提供pid。'}), 400
  46. @app.route('/get_progress')
  47. def get_progress():
  48. pid = request.args.get('pid')
  49. if pid is not None:
  50. try:
  51. pid = int(pid)
  52. except ValueError:
  53. return jsonify({'message': '无效的pid。'}), 400
  54. # 检索子进程的当前进度
  55. current_progress = get_current_progress_of_subprocess(pid, progress_dict)
  56. if current_progress is not None:
  57. return jsonify({'message': f'pid {pid} 的当前进度为{current_progress}。'}), 200
  58. else:
  59. return jsonify({'message': f'未找到pid {pid} 的进程。'}), 404
  60. else:
  61. return jsonify({'message': '未提供pid。'}), 400
  62. app.run(debug=False, threaded=True)
  63. if __name__ == '__main__':
  64. from multiprocessing import Process, Manager
  65. with Manager() as manager:
  66. progress_dict = manager.dict()
  67. p1 = Process(target=flask_service, args=(progress_dict,))
  68. p1.start()
  69. try:
  70. p1.join()
  71. except KeyboardInterrupt:
  72. p1.terminate()
  73. p1.join()
  74. finally:
  75. print('结束!')

我已经翻译了您的代码,以下是关于部署和并行处理的问题的回答:

您提到from flask import Flask创建了与WSGI兼容的实例。在部署时,您可以使用WSGI服务器(如Gunicorn、uWSGI等)来托管您的Flask应用程序。例如,使用Gunicorn,您可以运行以下命令来启动应用:

  1. gunicorn your_module:app

这将启动一个WSGI服务器,并使用app变量来处理Flask应用程序。

关于真正的并行处理,您使用了multiprocessing模块创建了多个进程,每个进程都可以并行处理视频处理任务。这是一种有效的并行处理方法,可以充分利用硬件资源,因此您确实实现了真正的并行处理,以解决视频处理等并行问题。

英文:

Suppose I have video process handler in a function, I want implement true parallel processing using multiprocessing module instead of threading.

So my code looks like this in general:

  1. def start_subprocess(pid, progress_dict):
  2. &#39;&#39;&#39;
  3. Suppose this is video processing starter...
  4. &#39;&#39;&#39;
  5. from time import sleep
  6. # Simulating a subprocess with variable progress
  7. progress = 0
  8. while progress &lt; 100:
  9. sleep(1)
  10. progress += 10
  11. progress_dict[pid] = progress
  12. def get_current_progress_of_subprocess(pid, progress_dict):
  13. &#39;&#39;&#39;
  14. Suppose this is video current progress by pid, in this context current progress are all current frames has been processed...
  15. &#39;&#39;&#39;
  16. # Retrieve current progress of a subprocess
  17. if pid in progress_dict:
  18. return progress_dict[pid]
  19. else:
  20. return None
  21. def flask_service(progress_dict):
  22. from flask import Flask, request, jsonify
  23. from multiprocessing import Process
  24. app = Flask(__name__)
  25. @app.route(&#39;/start_process&#39;)
  26. def start_process():
  27. pid = request.args.get(&#39;pid&#39;)
  28. if pid is not None:
  29. try:
  30. pid = int(pid)
  31. except ValueError:
  32. return jsonify({&#39;message&#39;: f&#39;Invalid pid.&#39;}), 400
  33. # Start a new subprocess
  34. if pid not in progress_dict:
  35. process = Process(target=start_subprocess, args=(pid, progress_dict))
  36. process.start()
  37. progress_dict[pid] = 0
  38. else:
  39. return jsonify({&#39;message&#39;: f&#39;Process with pid {pid} already started.&#39;}), 400
  40. return jsonify({&#39;message&#39;: f&#39;Process started with pid: {pid}&#39;}), 200
  41. else:
  42. return jsonify({&#39;message&#39;: &#39;No pid provided.&#39;}), 400
  43. @app.route(&#39;/get_progress&#39;)
  44. def get_progress():
  45. pid = request.args.get(&#39;pid&#39;)
  46. if pid is not None:
  47. try:
  48. pid = int(pid)
  49. except ValueError:
  50. return jsonify({&#39;message&#39;: f&#39;Invalid pid.&#39;}), 400
  51. # Retrieve current progress of the subprocess
  52. current_progress = get_current_progress_of_subprocess(pid, progress_dict)
  53. if current_progress is not None:
  54. return jsonify({&#39;message&#39;: f&#39;Current progress of pid: {pid} is {current_progress}.&#39;}), 200
  55. else:
  56. return jsonify({&#39;message&#39;: f&#39;Process with pid {pid} not found.&#39;}), 404
  57. else:
  58. return jsonify({&#39;message&#39;: &#39;No pid provided.&#39;}), 400
  59. app.run(debug=False, threaded=True)
  60. if __name__ == &#39;__main__&#39;:
  61. from multiprocessing import Process, Manager
  62. with Manager() as manager:
  63. progress_dict = manager.dict()
  64. p1 = Process(target=flask_service, args=(progress_dict,))
  65. p1.start()
  66. try:
  67. p1.join()
  68. except KeyboardInterrupt:
  69. p1.terminate()
  70. p1.join()
  71. finally:
  72. print(&#39;Ending up!&#39;)

I have achieved what I want, but the problem is how do I deploy this with WSGI? As far As I know from flask import Flask class is creating instance of compatible WSGI. So what it looks like in deployment?

Also, am I actually implement true parallel processing? I just want make sure if I really do it. true parallel I mean is using hardware capabilities to solve parallel issue such as video processing.

答案1

得分: 1

  1. def create_app():
  2. from flask import Flask, request, jsonify
  3. from multiprocessing import Process, Manager
  4. app = Flask(__name__)
  5. # 创建受管理的字典:
  6. progress_dict = Manager().dict()
  7. @app.route('/start_process')
  8. def start_process():
  9. pid = request.args.get('pid')
  10. if pid is not None:
  11. try:
  12. pid = int(pid)
  13. except ValueError:
  14. return jsonify({'message': f'无效的pid。'}), 400
  15. # 启动新的子进程
  16. if pid not in progress_dict:
  17. process = Process(target=start_subprocess, args=(pid, progress_dict))
  18. process.start()
  19. progress_dict[pid] = 0
  20. else:
  21. return jsonify({'message': f'已经启动了pid为{pid}的进程。'}), 400
  22. return jsonify({'message': f'已启动进程,pid为:{pid}。'}), 200
  23. else:
  24. return jsonify({'message': '未提供pid。'}), 400
  25. @app.route('/get_progress')
  26. def get_progress():
  27. pid = request.args.get('pid')
  28. if pid is not None:
  29. try:
  30. pid = int(pid)
  31. except ValueError:
  32. return jsonify({'message': f'无效的pid。'}), 400
  33. # 检索子进程的当前进度
  34. current_progress = get_current_progress_of_subprocess(pid, progress_dict)
  35. if current_progress is not None:
  36. return jsonify({'message': f'pid为{pid}的当前进度为{current_progress}。'}), 200
  37. else:
  38. return jsonify({'message': f'未找到pid为{pid}的进程。'}), 404
  39. else:
  40. return jsonify({'message': '未提供pid。'}), 400
  41. return app
英文:

I think your code is more complicated than it needs to be. You should create a function, create_app, which creates the Flask application and the process_dict managed dictionary and then returns the Flask application. Functionget_current_progress_of_subprocess remains unchanged and I added handling of a KeyboardInterrupt in start_subprocess.

  1. def start_subprocess(pid, progress_dict):
  2. &#39;&#39;&#39;
  3. Suppose this is video processing starter...
  4. &#39;&#39;&#39;
  5. from time import sleep
  6. # Simulating a subprocess with variable progress
  7. try:
  8. progress = 0
  9. while progress &lt; 100:
  10. sleep(1)
  11. progress += 10
  12. progress_dict[pid] = progress
  13. except KeyboardInterrupt:
  14. pass
  15. def get_current_progress_of_subprocess(pid, progress_dict):
  16. &#39;&#39;&#39;
  17. Suppose this is video current progress by pid, in this context current progress are all current frames has been processed...
  18. &#39;&#39;&#39;
  19. # Retrieve current progress of a subprocess
  20. if pid in progress_dict:
  21. return progress_dict[pid]
  22. else:
  23. return None
  24. def create_app():
  25. from flask import Flask, request, jsonify
  26. from multiprocessing import Process, Manager
  27. app = Flask(__name__)
  28. # Create the managed dictionary:
  29. progress_dict = Manager().dict()
  30. @app.route(&#39;/start_process&#39;)
  31. def start_process():
  32. pid = request.args.get(&#39;pid&#39;)
  33. if pid is not None:
  34. try:
  35. pid = int(pid)
  36. except ValueError:
  37. return jsonify({&#39;message&#39;: f&#39;Invalid pid.&#39;}), 400
  38. # Start a new subprocess
  39. if pid not in progress_dict:
  40. process = Process(target=start_subprocess, args=(pid, progress_dict))
  41. process.start()
  42. progress_dict[pid] = 0
  43. else:
  44. return jsonify({&#39;message&#39;: f&#39;Process with pid {pid} already started.&#39;}), 400
  45. return jsonify({&#39;message&#39;: f&#39;Process started with pid: {pid}&#39;}), 200
  46. else:
  47. return jsonify({&#39;message&#39;: &#39;No pid provided.&#39;}), 400
  48. @app.route(&#39;/get_progress&#39;)
  49. def get_progress():
  50. pid = request.args.get(&#39;pid&#39;)
  51. if pid is not None:
  52. try:
  53. pid = int(pid)
  54. except ValueError:
  55. return jsonify({&#39;message&#39;: f&#39;Invalid pid.&#39;}), 400
  56. # Retrieve current progress of the subprocess
  57. current_progress = get_current_progress_of_subprocess(pid, progress_dict)
  58. if current_progress is not None:
  59. return jsonify({&#39;message&#39;: f&#39;Current progress of pid: {pid} is {current_progress}.&#39;}), 200
  60. else:
  61. return jsonify({&#39;message&#39;: f&#39;Process with pid {pid} not found.&#39;}), 404
  62. else:
  63. return jsonify({&#39;message&#39;: &#39;No pid provided.&#39;}), 400
  64. return app
  65. if __name__ == &#39;__main__&#39;:
  66. create_app().run(debug=False, threaded=True)

huangapple
  • 本文由 发表于 2023年5月21日 03:13:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76296957.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定