英文:
closing goroutine spawned by fiber endpoint
问题
我有一个程序,使用ffmpeg将rtsp摄像头转换为hls格式进行流媒体传输。
为每个rtsp链接创建goroutine,因为ffmpeg在后台运行。
以下是添加流的代码。
func StreamProcess(data <-chan StreamData, ctx context.Context) {
for v := range data {
ctx, _ := context.WithCancel(ctx)
go func() {
if !getStreams(v.camera_id) {
var stream StreamState
stream.camera_id = v.camera_id
stream.state = true
go Stream(v, ctx)
wg.Wait()
} else {
return
}
}()
}
}
以下是运行ffmpeg命令的流媒体函数。
func Stream(meta StreamData, ctx context.Context) error {
log.Println("Started Streaming")
ffmpegCmd := exec.Command("ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id))
output, _ := ffmpegCmd.CombinedOutput()
log.Println(string(output))
for {
select {
case <-ctx.Done():
log.Println("killing process")
ffmpegCmd.Process.Kill()
return nil
}
}
}
我的目标是停止每个os.exec进程(ffmpeg命令),或者至少关闭所有在ffmpeg命令下的goroutine,而不关闭fiber服务器。
需要帮助,对golang不熟悉
英文:
I have a program that is rtsp cameras to hls format using ffmpeg for streaming.
creating goroutines for each rtsp link as ffmpeg runs in background
Streams are added by following code.
func StreamProcess(data <-chan StreamData, ctx context.Context) {
for v := range data {
ctx, _ := context.WithCancel(ctx)
go func() {
if !getStreams(v.camera_id) {
var stream StreamState
stream.camera_id = v.camera_id
stream.state = true
go Stream(v, ctx)
wg.Wait()
} else {
return
}
}()
}
}
Streaming function which runs ffmpeg command.
func Stream(meta StreamData, ctx context.Context) error {
log.Println("Started Streaming")
ffmpegCmd := exec.Command("ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id))
output, _ := ffmpegCmd.CombinedOutput()
log.Println(string(output))
for {
select {
case <-ctx.Done():
log.Println("killing process")
ffmpegCmd.Process.Kill()
return nil
}
}}
my Goal is to stop each os.exec process (ffmpeg command) or at least close all goroutines that are under ffmpeg commands without closing fiber server.
** help required new to golang **
答案1
得分: 0
这是可工作的代码:
func StreamProcess(data <-chan StreamData, ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
case v, ok := <-data:
if ok {
go func() {
if !getStreams(v.camera_id) {
var stream StreamState
stream.camera_id = v.camera_id
stream.state = true
go Stream(v, ctx)
}
}()
} else if !ok {
cancel()
return
}
case <-ctx.Done():
log.Println("closed ctx")
cancel()
}
}
}
and started streaming with:
func Stream(meta StreamData, ctx context.Context) error {
log.Println("Started Streaming")
err := exec.CommandContext(ctx, "ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id)).Run()
if err != nil {
log.Println("error in streaming", err)
return err
}
log.Println(string("waiting for closure"))
for {
select {
case <-ctx.Done():
log.Println("killing process")
return nil
case <-time.After(2 * time.Second):
log.Println("started default context")
return nil
}
}
}
这对我来说现在可以工作,我没有找到更好的方法。如果有更好的方法,请留言。
英文:
This is working code:
func StreamProcess(data <-chan StreamData, ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
case v, ok := <-data:
if ok {
go func() {
if !getStreams(v.camera_id) {
var stream StreamState
stream.camera_id = v.camera_id
stream.state = true
go Stream(v, ctx)
}
}()
} else if !ok {
cancel()
return
}
case <-ctx.Done():
log.Println("closed ctx")
cancel()
}
}
and started streaming with:
func Stream(meta StreamData, ctx context.Context) error {
log.Println("Started Streaming")
err := exec.CommandContext(ctx, "ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id)).Run()
if err != nil {
log.Println("error in streaming", err)
return err
}
log.Println(string("waiting for closure"))
for {
select {
case <-ctx.Done():
log.Println("killing process")
return nil
case <-time.After(2* time.second):
log.Println("started default context")
return nil
}
}
.
This works for me now i didn't found any better way. Please comment if anyone is having better way.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论