将子进程的输出流传输到2个或更多客户端。

huangapple go评论69阅读模式
英文:

Streaming the output of a subprocess to 2 or more clients

问题

我这里有一个基本的示例

    import subprocess
    from flask import (
        Flask,
        Response,
    )
    app = Flask(__name__)
    
    stream = None
    
    @app.route("/play", methods=["GET"])
    def channel():
        def createStream():
            global stream
            print("create stream")
            stream = subprocess.Popen(
                    ffmpegcmd,
                    stdin=subprocess.DEVNULL,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.DEVNULL,
                )
    
        def streamData():
            print("stream data")
            try:
                while True:
                    chunk = stream.stdout.read(1024)
                    if len(chunk) == 0:
                        break
                    yield chunk
            except:
                pass
    
        if not stream:
            link = "https://cph-p2p-msl.akamaized.net/hls/live/2000341/test/master.m3u8"
    
            ffmpegcmd = [
                "ffmpeg",
                "-re",
                "-i",
                link,
                "-map",
                "0",
                "-codec",
                "copy",
                "-f",
                "mpegts",
                "pipe:"
            ]
    
            createStream()
    
            return Response(streamData(), mimetype="application/octet-stream")
    
        else:
    
            return Response(streamData(), mimetype="application/octet-stream")
    
    if __name__ == "__main__":
        app.run(host="0.0.0.0", port=8001, debug=True)

如果有2个或更多客户端尝试同时进行流式传输,则所有流都会冻结。关闭所有流并重新请求 /play 会接管现有的 sp,然后正常播放。

有人了解发生了什么,以及为什么它不起作用吗?
这是子进程的错误还是限制?

英文:

I have a basic example here

import subprocess
from flask import (
    Flask,
    Response,
)
app = Flask(__name__)

stream = None

@app.route("/play", methods=["GET"])
def channel():
    def createStream():
        global stream
        print("create stream")
        stream = subprocess.Popen(
                ffmpegcmd,
                stdin=subprocess.DEVNULL,
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,
            )

    def streamData():
        print("stream data")
        try:
            while True:
                chunk = stream.stdout.read(1024)
                if len(chunk) == 0:
                    break
                yield chunk
        except:
            pass


    if not stream:
        link = "https://cph-p2p-msl.akamaized.net/hls/live/2000341/test/master.m3u8"

        ffmpegcmd = [
            "ffmpeg",
            "-re",
            "-i",
            link,
            "-map",
            "0",
            "-codec",
            "copy",
            "-f",
            "mpegts",
            "pipe:"
        ]

        createStream()

        return Response(streamData(), mimetype="application/octet-stream")

    else:

        return Response(streamData(), mimetype="application/octet-stream")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8001, debug=True)

If 2 or more clients try to stream at the same time, all streams freeze. Closing all streams and re-requesting /play picks up the existing sp and it plays ok.

Does anyone understand what is happening and why it doesn't work?
Is this a bug or limitation of subprocesses?

答案1

得分: 1

  1. 请求 /play 的第一个请求会创建子进程,但流媒体无法播放。问题在于 sp 变量在 createStream() 函数外部不可访问。当第一个请求发出时,createStream() 被调用并初始化了 sp,但当你尝试在 streamData() 中访问它时,它超出了范围并且是未定义的。要解决这个问题,你可以将 sp 变量设为全局变量,类似于 stream,以便可以在整个代码中访问,如下所示:
import subprocess
from flask import Flask, Response

app = Flask(__name__)

stream = None
sp = None

@app.route("/play", methods=["GET"])
def channel():
    def createStream():
        global sp  # 将 sp 变量设为全局变量
        print("create stream")
        sp = subprocess.Popen(
            ffmpegcmd,
            stdin=subprocess.DEVNULL,
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,
        )

    def streamData():
        print("stream data")
        try:
            while True:
                chunk = sp.stdout.read(1024)
                if len(chunk) == 0:
                    break
                yield chunk
        except:
            pass

    global stream
    if not stream:
        link = "https://cph-p2p-msl.akamaized.net/hls/live/2000341/test/master.m3u8"

        ffmpegcmd = [
            "ffmpeg",
            "-re",
            "-i",
            link,
            "-map",
            "0",
            "-codec",
            "copy",
            "-f",
            "mpegts",
            "pipe:",
        ]

        createStream()

        return Response(streamData(), mimetype="application/octet-stream")
    else:
        return Response(streamData(), mimetype="application/octet-stream")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8001, debug=True)
  1. 如果两个或更多客户端尝试同时流式传输,所有流都会冻结。这个问题很可能是因为多个客户端共享相同的 sp 子进程导致的。streamData() 函数从 sp 子进程的标准输出读取数据,但如果多个客户端同时从中读取,可能会导致冲突和意外行为。

我们可以通过以下方式解决这个问题:

import subprocess
from flask import Flask, Response

app = Flask(__name__)

@app.route("/play", methods=["GET"])
def channel():
    link = "https://cph-p2p-msl.akamaized.net/hls/live/2000341/test/master.m3u8"

    ffmpegcmd = [
        "ffmpeg",
        "-re",
        "-i",
        link,
        "-map",
        "0",
        "-codec",
        "copy",
        "-f",
        "mpegts",
        "pipe:",
    ]

    sp = subprocess.Popen(
        ffmpegcmd,
        stdin=subprocess.DEVNULL,
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
    )

    def streamData():
        print("stream data")
        try:
            while True:
                chunk = sp.stdout.read(1024)
                if len(chunk) == 0:
                    break
                yield chunk
        except:
            pass

    return Response(streamData(), mimetype="application/octet-stream")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8001, debug=True)

要在单个子进程中管理它,我们可以使用 multiprocessing 模块:

import subprocess
from flask import Flask, Response
from multiprocessing import Process

app = Flask(__name__)
ffmpeg_process = None  # 用于存储 FFmpeg 子进程的全局变量

@app.route("/play", methods=["GET"])
def channel():
    global ffmpeg_process

    # 检查 FFmpeg 子进程是否已经运行
    if ffmpeg_process is None or not ffmpeg_process.is_alive():
        link = "https://cph-p2p-msl.akamaized.net/hls/live/2000341/test/master.m3u8"

        ffmpegcmd = [
            "ffmpeg",
            "-re",
            "-i",
            link,
            "-map",
            "0",
            "-codec",
            "copy",
            "-f",
            "mpegts",
            "pipe:",
        ]

        ffmpeg_process = subprocess.Popen(
            ffmpegcmd,
            stdin=subprocess.DEVNULL,
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,
        )

    def streamData():
        print("stream data")
        try:
            while True:
                chunk = ffmpeg_process.stdout.read(1024)
                if len(chunk) == 0:
                    break
                yield chunk
        except:
            pass

    return Response(streamData(), mimetype="application/octet-stream")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8001, debug=True)
英文:
  1. The first request to /play creates the subprocess, but the stream fails to play.
    The issue here is that the sp variable is not accessible outside the createStream() function. When the first request is made, createStream() is called and initializes sp, but when you try to access it in streamData(), it is out of scope and undefined. To fix this, you can make sp a global variable similar to stream, so that it can be accessed throughout the code as follows:

import subprocess
from flask import Flask, Response

app = Flask(__name__)

stream = None
sp = None

@app.route("/play", methods=["GET"])
def channel():
    def createStream():
        global sp  # Make sp a global variable
        print("create stream")
        sp = subprocess.Popen(
            ffmpegcmd,
            stdin=subprocess.DEVNULL,
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,
        )

    def streamData():
        print("stream data")
        try:
            while True:
                chunk = sp.stdout.read(1024)
                if len(chunk) == 0:
                    break
                yield chunk
        except:
            pass

    global stream
    if not stream:
        link = "https://cph-p2p-msl.akamaized.net/hls/live/2000341/test/master.m3u8"

        ffmpegcmd = [
            "ffmpeg",
            "-re",
            "-i",
            link,
            "-map",
            "0",
            "-codec",
            "copy",
            "-f",
            "mpegts",
            "pipe:",
        ]

        createStream()

        return Response(streamData(), mimetype="application/octet-stream")
    else:
        return Response(streamData(), mimetype="application/octet-stream")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8001, debug=True)
  1. If two or more clients try to stream at the same time, all streams freeze.
    This issue is likely due to the fact that multiple clients are sharing the same sp subprocess. The streamData() function reads from the sp subprocess's stdout, but if multiple clients are reading from it simultaneously, it can cause conflicts and unexpected behavior.

We can fix this:

    import subprocess
    from flask import Flask, Response
    
    app = Flask(__name__)
    
    @app.route("/play", methods=["GET"])
    def channel():
        link = "https://cph-p2p-msl.akamaized.net/hls/live/2000341/test/master.m3u8"
    
        ffmpegcmd = [
            "ffmpeg",
            "-re",
            "-i",
            link,
            "-map",
            "0",
            "-codec",
            "copy",
            "-f",
            "mpegts",
            "pipe:",
        ]
    
        sp = subprocess.Popen(
            ffmpegcmd,
            stdin=subprocess.DEVNULL,
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,
        )
    
        def streamData():
            print("stream data")
            try:
                while True:
                    chunk = sp.stdout.read(1024)
                    if len(chunk) == 0:
                        break
                    yield chunk
            except:
                pass
    
        return Response(streamData(), mimetype="application/octet-stream")
    
    if __name__ == "__main__":
        app.run(host="0.0.0.0", port=8001, debug=True)

To manage it in a single subprocess, we can use the multiprocessing module:

import subprocess
from flask import Flask, Response
from multiprocessing import Process

app = Flask(__name__)
ffmpeg_process = None  # Global variable to store the FFmpeg subprocess

@app.route("/play", methods=["GET"])
def channel():
    global ffmpeg_process

    # Check if the FFmpeg subprocess is already running
    if ffmpeg_process is None or not ffmpeg_process.is_alive():
        link = "https://cph-p2p-msl.akamaized.net/hls/live/2000341/test/master.m3u8"

        ffmpegcmd = [
            "ffmpeg",
            "-re",
            "-i",
            link,
            "-map",
            "0",
            "-codec",
            "copy",
            "-f",
            "mpegts",
            "pipe:",
        ]

        ffmpeg_process = subprocess.Popen(
            ffmpegcmd,
            stdin=subprocess.DEVNULL,
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,
        )

    def streamData():
        print("stream data")
        try:
            while True:
                chunk = ffmpeg_process.stdout.read(1024)
                if len(chunk) == 0:
                    break
                yield chunk
        except:
            pass

    return Response(streamData(), mimetype="application/octet-stream")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8001, debug=True)

huangapple
  • 本文由 发表于 2023年7月11日 08:38:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76658073.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定