如何在使用Flask的多进程时实现WSGI?

huangapple go评论79阅读模式
英文:

How to implement WSGI while using multiprocessing in flask?

问题

以下是您要翻译的代码部分:

假设我有一个在函数中的视频处理程序我想使用`multiprocessing`模块而不是`threading`来实现真正的并行处理

因此我的代码大致如下
```python
def start_subprocess(pid, progress_dict):
    '''
    假设这是视频处理的启动器...
    '''
    from time import sleep
    # 模拟具有可变进度的子进程
    progress = 0
    while progress < 100:
        sleep(1)
        progress += 10
        progress_dict[pid] = progress


def get_current_progress_of_subprocess(pid, progress_dict):
    '''
    假设这是根据pid获取当前进度的视频当前进度,在这个上下文中,当前进度是所有已处理的当前帧...
    '''
    # 检索子进程的当前进度
    if pid in progress_dict:
        return progress_dict[pid]
    else:
        return None


def flask_service(progress_dict):
    from flask import Flask, request, jsonify
    from multiprocessing import Process
    app = Flask(__name__)

    @app.route('/start_process')
    def start_process():
        pid = request.args.get('pid')
        if pid is not None:
            try:
                pid = int(pid)
            except ValueError:
                return jsonify({'message': '无效的pid。'}), 400

            # 启动新的子进程
            if pid not in progress_dict:
                process = Process(target=start_subprocess, args=(pid, progress_dict))
                process.start()
                progress_dict[pid] = 0
            else:
                return jsonify({'message': f'已启动pid {pid} 的进程。'}), 400

            return jsonify({'message': f'已启动pid:{pid} 的进程。'}), 200
        else:
            return jsonify({'message': '未提供pid。'}), 400

    @app.route('/get_progress')
    def get_progress():
        pid = request.args.get('pid')
        if pid is not None:
            try:
                pid = int(pid)
            except ValueError:
                return jsonify({'message': '无效的pid。'}), 400

            # 检索子进程的当前进度
            current_progress = get_current_progress_of_subprocess(pid, progress_dict)

            if current_progress is not None:
                return jsonify({'message': f'pid {pid} 的当前进度为{current_progress}。'}), 200
            else:
                return jsonify({'message': f'未找到pid {pid} 的进程。'}), 404
        else:
            return jsonify({'message': '未提供pid。'}), 400

    app.run(debug=False, threaded=True)


if __name__ == '__main__':
    from multiprocessing import Process, Manager
    with Manager() as manager:
        progress_dict = manager.dict()
        p1 = Process(target=flask_service, args=(progress_dict,))
        p1.start()
        try:
            p1.join()
        except KeyboardInterrupt:
            p1.terminate()
            p1.join()
        finally:
            print('结束!')

我已经翻译了您的代码,以下是关于部署和并行处理的问题的回答:

您提到from flask import Flask创建了与WSGI兼容的实例。在部署时,您可以使用WSGI服务器(如Gunicorn、uWSGI等)来托管您的Flask应用程序。例如,使用Gunicorn,您可以运行以下命令来启动应用:

gunicorn your_module:app

这将启动一个WSGI服务器,并使用app变量来处理Flask应用程序。

关于真正的并行处理,您使用了multiprocessing模块创建了多个进程,每个进程都可以并行处理视频处理任务。这是一种有效的并行处理方法,可以充分利用硬件资源,因此您确实实现了真正的并行处理,以解决视频处理等并行问题。

英文:

Suppose I have video process handler in a function, I want implement true parallel processing using multiprocessing module instead of threading.

So my code looks like this in general:

def start_subprocess(pid, progress_dict):
&#39;&#39;&#39;
Suppose this is video processing starter...
&#39;&#39;&#39;
from time import sleep
# Simulating a subprocess with variable progress
progress = 0
while progress &lt; 100:
sleep(1)
progress += 10
progress_dict[pid] = progress
def get_current_progress_of_subprocess(pid, progress_dict):
&#39;&#39;&#39;
Suppose this is video current progress by pid, in this context current progress are all current frames has been processed... 
&#39;&#39;&#39;
# Retrieve current progress of a subprocess
if pid in progress_dict:
return progress_dict[pid]
else:
return None
def flask_service(progress_dict):
from flask import Flask, request, jsonify
from multiprocessing import Process
app = Flask(__name__)
@app.route(&#39;/start_process&#39;)
def start_process():
pid = request.args.get(&#39;pid&#39;)
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({&#39;message&#39;: f&#39;Invalid pid.&#39;}), 400
# Start a new subprocess
if pid not in progress_dict:
process = Process(target=start_subprocess, args=(pid, progress_dict))
process.start()
progress_dict[pid] = 0
else:
return jsonify({&#39;message&#39;: f&#39;Process with pid {pid} already started.&#39;}), 400
return jsonify({&#39;message&#39;: f&#39;Process started with pid: {pid}&#39;}), 200
else:
return jsonify({&#39;message&#39;: &#39;No pid provided.&#39;}), 400
@app.route(&#39;/get_progress&#39;)
def get_progress():
pid = request.args.get(&#39;pid&#39;)
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({&#39;message&#39;: f&#39;Invalid pid.&#39;}), 400
# Retrieve current progress of the subprocess
current_progress = get_current_progress_of_subprocess(pid, progress_dict)
if current_progress is not None:
return jsonify({&#39;message&#39;: f&#39;Current progress of pid: {pid} is {current_progress}.&#39;}), 200
else:
return jsonify({&#39;message&#39;: f&#39;Process with pid {pid} not found.&#39;}), 404
else:
return jsonify({&#39;message&#39;: &#39;No pid provided.&#39;}), 400
app.run(debug=False, threaded=True)
if __name__ == &#39;__main__&#39;:
from multiprocessing import Process, Manager
with Manager() as manager:
progress_dict = manager.dict()
p1 = Process(target=flask_service, args=(progress_dict,))
p1.start()
try:
p1.join()
except KeyboardInterrupt:
p1.terminate()
p1.join()
finally:
print(&#39;Ending up!&#39;)

I have achieved what I want, but the problem is how do I deploy this with WSGI? As far As I know from flask import Flask class is creating instance of compatible WSGI. So what it looks like in deployment?

Also, am I actually implement true parallel processing? I just want make sure if I really do it. true parallel I mean is using hardware capabilities to solve parallel issue such as video processing.

答案1

得分: 1

def create_app():
  from flask import Flask, request, jsonify
  from multiprocessing import Process, Manager

  app = Flask(__name__)

  # 创建受管理的字典:
  progress_dict = Manager().dict()

  @app.route('/start_process')
  def start_process():
    pid = request.args.get('pid')
    if pid is not None:
      try:
        pid = int(pid)
      except ValueError:
        return jsonify({'message': f'无效的pid。'}), 400

      # 启动新的子进程
      if pid not in progress_dict:
        process = Process(target=start_subprocess, args=(pid, progress_dict))
        process.start()
        progress_dict[pid] = 0
      else:
        return jsonify({'message': f'已经启动了pid为{pid}的进程。'}), 400

      return jsonify({'message': f'已启动进程,pid为:{pid}。'}), 200
    else:
      return jsonify({'message': '未提供pid。'}), 400

  @app.route('/get_progress')
  def get_progress():
    pid = request.args.get('pid')
    if pid is not None:
      try:
        pid = int(pid)
      except ValueError:
        return jsonify({'message': f'无效的pid。'}), 400

      # 检索子进程的当前进度
      current_progress = get_current_progress_of_subprocess(pid, progress_dict)

      if current_progress is not None:
        return jsonify({'message': f'pid为{pid}的当前进度为{current_progress}。'}), 200
      else:
        return jsonify({'message': f'未找到pid为{pid}的进程。'}), 404
    else:
      return jsonify({'message': '未提供pid。'}), 400

  return app
英文:

I think your code is more complicated than it needs to be. You should create a function, create_app, which creates the Flask application and the process_dict managed dictionary and then returns the Flask application. Functionget_current_progress_of_subprocess remains unchanged and I added handling of a KeyboardInterrupt in start_subprocess.

def start_subprocess(pid, progress_dict):
  &#39;&#39;&#39;
  Suppose this is video processing starter...
  &#39;&#39;&#39;
  from time import sleep
  # Simulating a subprocess with variable progress
  try:
    progress = 0
    while progress &lt; 100:
      sleep(1)
      progress += 10
      progress_dict[pid] = progress
  except KeyboardInterrupt:
    pass


def get_current_progress_of_subprocess(pid, progress_dict):
  &#39;&#39;&#39;
  Suppose this is video current progress by pid, in this context current progress are all current frames has been processed...
  &#39;&#39;&#39;
  # Retrieve current progress of a subprocess
  if pid in progress_dict:
    return progress_dict[pid]
  else:
    return None


def create_app():
  from flask import Flask, request, jsonify
  from multiprocessing import Process, Manager

  app = Flask(__name__)

  # Create the managed dictionary:
  progress_dict = Manager().dict()

  @app.route(&#39;/start_process&#39;)
  def start_process():
    pid = request.args.get(&#39;pid&#39;)
    if pid is not None:
      try:
        pid = int(pid)
      except ValueError:
        return jsonify({&#39;message&#39;: f&#39;Invalid pid.&#39;}), 400

      # Start a new subprocess
      if pid not in progress_dict:
        process = Process(target=start_subprocess, args=(pid, progress_dict))
        process.start()
        progress_dict[pid] = 0
      else:
        return jsonify({&#39;message&#39;: f&#39;Process with pid {pid} already started.&#39;}), 400

      return jsonify({&#39;message&#39;: f&#39;Process started with pid: {pid}&#39;}), 200
    else:
      return jsonify({&#39;message&#39;: &#39;No pid provided.&#39;}), 400

  @app.route(&#39;/get_progress&#39;)
  def get_progress():
    pid = request.args.get(&#39;pid&#39;)
    if pid is not None:
      try:
        pid = int(pid)
      except ValueError:
        return jsonify({&#39;message&#39;: f&#39;Invalid pid.&#39;}), 400

      # Retrieve current progress of the subprocess
      current_progress = get_current_progress_of_subprocess(pid, progress_dict)

      if current_progress is not None:
        return jsonify({&#39;message&#39;: f&#39;Current progress of pid: {pid} is {current_progress}.&#39;}), 200
      else:
        return jsonify({&#39;message&#39;: f&#39;Process with pid {pid} not found.&#39;}), 404
    else:
      return jsonify({&#39;message&#39;: &#39;No pid provided.&#39;}), 400

  return app


if __name__ == &#39;__main__&#39;:
  create_app().run(debug=False, threaded=True)

huangapple
  • 本文由 发表于 2023年5月21日 03:13:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76296957.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定