英文:
How to implement WSGI while using multiprocessing in flask?
问题
以下是您要翻译的代码部分:
假设我有一个在函数中的视频处理程序,我想使用`multiprocessing`模块而不是`threading`来实现真正的并行处理。
因此,我的代码大致如下:
```python
def start_subprocess(pid, progress_dict):
'''
假设这是视频处理的启动器...
'''
from time import sleep
# 模拟具有可变进度的子进程
progress = 0
while progress < 100:
sleep(1)
progress += 10
progress_dict[pid] = progress
def get_current_progress_of_subprocess(pid, progress_dict):
'''
假设这是根据pid获取当前进度的视频当前进度,在这个上下文中,当前进度是所有已处理的当前帧...
'''
# 检索子进程的当前进度
if pid in progress_dict:
return progress_dict[pid]
else:
return None
def flask_service(progress_dict):
from flask import Flask, request, jsonify
from multiprocessing import Process
app = Flask(__name__)
@app.route('/start_process')
def start_process():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': '无效的pid。'}), 400
# 启动新的子进程
if pid not in progress_dict:
process = Process(target=start_subprocess, args=(pid, progress_dict))
process.start()
progress_dict[pid] = 0
else:
return jsonify({'message': f'已启动pid {pid} 的进程。'}), 400
return jsonify({'message': f'已启动pid:{pid} 的进程。'}), 200
else:
return jsonify({'message': '未提供pid。'}), 400
@app.route('/get_progress')
def get_progress():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': '无效的pid。'}), 400
# 检索子进程的当前进度
current_progress = get_current_progress_of_subprocess(pid, progress_dict)
if current_progress is not None:
return jsonify({'message': f'pid {pid} 的当前进度为{current_progress}。'}), 200
else:
return jsonify({'message': f'未找到pid {pid} 的进程。'}), 404
else:
return jsonify({'message': '未提供pid。'}), 400
app.run(debug=False, threaded=True)
if __name__ == '__main__':
from multiprocessing import Process, Manager
with Manager() as manager:
progress_dict = manager.dict()
p1 = Process(target=flask_service, args=(progress_dict,))
p1.start()
try:
p1.join()
except KeyboardInterrupt:
p1.terminate()
p1.join()
finally:
print('结束!')
我已经翻译了您的代码,以下是关于部署和并行处理的问题的回答:
您提到from flask import Flask
创建了与WSGI兼容的实例。在部署时,您可以使用WSGI服务器(如Gunicorn、uWSGI等)来托管您的Flask应用程序。例如,使用Gunicorn,您可以运行以下命令来启动应用:
gunicorn your_module:app
这将启动一个WSGI服务器,并使用app
变量来处理Flask应用程序。
关于真正的并行处理,您使用了multiprocessing
模块创建了多个进程,每个进程都可以并行处理视频处理任务。这是一种有效的并行处理方法,可以充分利用硬件资源,因此您确实实现了真正的并行处理,以解决视频处理等并行问题。
英文:
Suppose I have video process handler in a function, I want implement true parallel processing using multiprocessing
module instead of threading
.
So my code looks like this in general:
def start_subprocess(pid, progress_dict):
'''
Suppose this is video processing starter...
'''
from time import sleep
# Simulating a subprocess with variable progress
progress = 0
while progress < 100:
sleep(1)
progress += 10
progress_dict[pid] = progress
def get_current_progress_of_subprocess(pid, progress_dict):
'''
Suppose this is video current progress by pid, in this context current progress are all current frames has been processed...
'''
# Retrieve current progress of a subprocess
if pid in progress_dict:
return progress_dict[pid]
else:
return None
def flask_service(progress_dict):
from flask import Flask, request, jsonify
from multiprocessing import Process
app = Flask(__name__)
@app.route('/start_process')
def start_process():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': f'Invalid pid.'}), 400
# Start a new subprocess
if pid not in progress_dict:
process = Process(target=start_subprocess, args=(pid, progress_dict))
process.start()
progress_dict[pid] = 0
else:
return jsonify({'message': f'Process with pid {pid} already started.'}), 400
return jsonify({'message': f'Process started with pid: {pid}'}), 200
else:
return jsonify({'message': 'No pid provided.'}), 400
@app.route('/get_progress')
def get_progress():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': f'Invalid pid.'}), 400
# Retrieve current progress of the subprocess
current_progress = get_current_progress_of_subprocess(pid, progress_dict)
if current_progress is not None:
return jsonify({'message': f'Current progress of pid: {pid} is {current_progress}.'}), 200
else:
return jsonify({'message': f'Process with pid {pid} not found.'}), 404
else:
return jsonify({'message': 'No pid provided.'}), 400
app.run(debug=False, threaded=True)
if __name__ == '__main__':
from multiprocessing import Process, Manager
with Manager() as manager:
progress_dict = manager.dict()
p1 = Process(target=flask_service, args=(progress_dict,))
p1.start()
try:
p1.join()
except KeyboardInterrupt:
p1.terminate()
p1.join()
finally:
print('Ending up!')
I have achieved what I want, but the problem is how do I deploy this with WSGI? As far As I know from flask import Flask
class is creating instance of compatible WSGI. So what it looks like in deployment?
Also, am I actually implement true parallel processing? I just want make sure if I really do it. true parallel I mean is using hardware capabilities to solve parallel issue such as video processing.
答案1
得分: 1
def create_app():
from flask import Flask, request, jsonify
from multiprocessing import Process, Manager
app = Flask(__name__)
# 创建受管理的字典:
progress_dict = Manager().dict()
@app.route('/start_process')
def start_process():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': f'无效的pid。'}), 400
# 启动新的子进程
if pid not in progress_dict:
process = Process(target=start_subprocess, args=(pid, progress_dict))
process.start()
progress_dict[pid] = 0
else:
return jsonify({'message': f'已经启动了pid为{pid}的进程。'}), 400
return jsonify({'message': f'已启动进程,pid为:{pid}。'}), 200
else:
return jsonify({'message': '未提供pid。'}), 400
@app.route('/get_progress')
def get_progress():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': f'无效的pid。'}), 400
# 检索子进程的当前进度
current_progress = get_current_progress_of_subprocess(pid, progress_dict)
if current_progress is not None:
return jsonify({'message': f'pid为{pid}的当前进度为{current_progress}。'}), 200
else:
return jsonify({'message': f'未找到pid为{pid}的进程。'}), 404
else:
return jsonify({'message': '未提供pid。'}), 400
return app
英文:
I think your code is more complicated than it needs to be. You should create a function, create_app
, which creates the Flask application and the process_dict
managed dictionary and then returns the Flask application. Functionget_current_progress_of_subprocess
remains unchanged and I added handling of a KeyboardInterrupt in start_subprocess
.
def start_subprocess(pid, progress_dict):
'''
Suppose this is video processing starter...
'''
from time import sleep
# Simulating a subprocess with variable progress
try:
progress = 0
while progress < 100:
sleep(1)
progress += 10
progress_dict[pid] = progress
except KeyboardInterrupt:
pass
def get_current_progress_of_subprocess(pid, progress_dict):
'''
Suppose this is video current progress by pid, in this context current progress are all current frames has been processed...
'''
# Retrieve current progress of a subprocess
if pid in progress_dict:
return progress_dict[pid]
else:
return None
def create_app():
from flask import Flask, request, jsonify
from multiprocessing import Process, Manager
app = Flask(__name__)
# Create the managed dictionary:
progress_dict = Manager().dict()
@app.route('/start_process')
def start_process():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': f'Invalid pid.'}), 400
# Start a new subprocess
if pid not in progress_dict:
process = Process(target=start_subprocess, args=(pid, progress_dict))
process.start()
progress_dict[pid] = 0
else:
return jsonify({'message': f'Process with pid {pid} already started.'}), 400
return jsonify({'message': f'Process started with pid: {pid}'}), 200
else:
return jsonify({'message': 'No pid provided.'}), 400
@app.route('/get_progress')
def get_progress():
pid = request.args.get('pid')
if pid is not None:
try:
pid = int(pid)
except ValueError:
return jsonify({'message': f'Invalid pid.'}), 400
# Retrieve current progress of the subprocess
current_progress = get_current_progress_of_subprocess(pid, progress_dict)
if current_progress is not None:
return jsonify({'message': f'Current progress of pid: {pid} is {current_progress}.'}), 200
else:
return jsonify({'message': f'Process with pid {pid} not found.'}), 404
else:
return jsonify({'message': 'No pid provided.'}), 400
return app
if __name__ == '__main__':
create_app().run(debug=False, threaded=True)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论