由Fiber端点生成的goroutine正在关闭。

huangapple go评论70阅读模式
英文:

closing goroutine spawned by fiber endpoint

问题

我有一个程序,使用ffmpeg将rtsp摄像头转换为hls格式进行流媒体传输。
为每个rtsp链接创建goroutine,因为ffmpeg在后台运行。

以下是添加流的代码。

func StreamProcess(data <-chan StreamData, ctx context.Context) {
	for v := range data {
		ctx, _ := context.WithCancel(ctx)
		go func() {
			if !getStreams(v.camera_id) {
				var stream StreamState
				stream.camera_id = v.camera_id
				stream.state = true
				go Stream(v, ctx)
				wg.Wait()
			} else {
				return
			}
		}()
	}	
}

以下是运行ffmpeg命令的流媒体函数。

func Stream(meta StreamData, ctx context.Context) error {
	log.Println("Started Streaming")
	ffmpegCmd := exec.Command("ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id))
	output, _ := ffmpegCmd.CombinedOutput()

	log.Println(string(output))

	for {
		select {
		case <-ctx.Done():
			log.Println("killing process")
			ffmpegCmd.Process.Kill()
			return nil
		}
	}
}

我的目标是停止每个os.exec进程(ffmpeg命令),或者至少关闭所有在ffmpeg命令下的goroutine,而不关闭fiber服务器。

需要帮助,对golang不熟悉

英文:

I have a program that is rtsp cameras to hls format using ffmpeg for streaming.
creating goroutines for each rtsp link as ffmpeg runs in background

Streams are added by following code.

func StreamProcess(data &lt;-chan StreamData, ctx context.Context) {
for v := range data {
	ctx, _ := context.WithCancel(ctx)
	go func() {
		if !getStreams(v.camera_id) {
			var stream StreamState
			stream.camera_id = v.camera_id
			stream.state = true
			go Stream(v, ctx)
			wg.Wait()
		} else {
			return
		}
	}()
}	

}

Streaming function which runs ffmpeg command.

func Stream(meta StreamData, ctx context.Context) error {
    log.Println(&quot;Started Streaming&quot;)
    ffmpegCmd := exec.Command(&quot;ffmpeg&quot;, &quot;-i&quot;, meta.rtsp, &quot;-pix_fmt&quot;, &quot;yuv420p&quot;, &quot;-c:v&quot;, &quot;libx264&quot;, &quot;-preset&quot;, &quot;ultrafast&quot;, &quot;-b:v&quot;, &quot;600k&quot;, &quot;-c:a&quot;, &quot;aac&quot;, &quot;-b:a&quot;, &quot;160k&quot;, &quot;-f&quot;, &quot;rtsp&quot;, fmt.Sprintf(&quot;rtsp://localhost:8554/%s&quot;, meta.camera_id))
    output, _ := ffmpegCmd.CombinedOutput()

    log.Println(string(output))

    for {
	    select {
	    case &lt;-ctx.Done():
		   log.Println(&quot;killing process&quot;)
		   ffmpegCmd.Process.Kill()
		   return nil
	    }
    }}

my Goal is to stop each os.exec process (ffmpeg command) or at least close all goroutines that are under ffmpeg commands without closing fiber server.

** help required new to golang **

答案1

得分: 0

这是可工作的代码:

func StreamProcess(data <-chan StreamData, ctx context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	for {
		select {
		case v, ok := <-data:
			if ok {
				go func() {
					if !getStreams(v.camera_id) {
						var stream StreamState
						stream.camera_id = v.camera_id
						stream.state = true
						go Stream(v, ctx)
					}
				}()
			} else if !ok {
				cancel()
				return
			}
		case <-ctx.Done():
			log.Println("closed ctx")
			cancel()
		}

	}
}

and started streaming with:

func Stream(meta StreamData, ctx context.Context) error {
	log.Println("Started Streaming")
	err := exec.CommandContext(ctx, "ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id)).Run()

	if err != nil {
		log.Println("error in streaming", err)
		return err
	}

	log.Println(string("waiting for closure"))
	for {
		select {
		case <-ctx.Done():
			log.Println("killing process")			
			return nil
		case <-time.After(2 * time.Second):
			log.Println("started default context")
			return nil
		}

	}
}

这对我来说现在可以工作,我没有找到更好的方法。如果有更好的方法,请留言。

英文:

This is working code:

func StreamProcess(data &lt;-chan StreamData, ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
case v, ok := &lt;-data:
if ok {
go func() {
if !getStreams(v.camera_id) {
var stream StreamState
stream.camera_id = v.camera_id
stream.state = true
go Stream(v, ctx)
}
}()
} else if !ok {
cancel()
return
}
case &lt;-ctx.Done():
log.Println(&quot;closed ctx&quot;)
cancel()
}
}

and started streaming with:

func Stream(meta StreamData, ctx context.Context) error {
log.Println(&quot;Started Streaming&quot;)
err := exec.CommandContext(ctx, &quot;ffmpeg&quot;, &quot;-i&quot;, meta.rtsp, &quot;-pix_fmt&quot;, &quot;yuv420p&quot;, &quot;-c:v&quot;, &quot;libx264&quot;, &quot;-preset&quot;, &quot;ultrafast&quot;, &quot;-b:v&quot;, &quot;600k&quot;, &quot;-c:a&quot;, &quot;aac&quot;, &quot;-b:a&quot;, &quot;160k&quot;, &quot;-f&quot;, &quot;rtsp&quot;, fmt.Sprintf(&quot;rtsp://localhost:8554/%s&quot;, meta.camera_id)).Run()
if err != nil {
log.Println(&quot;error in streaming&quot;, err)
return err
}
log.Println(string(&quot;waiting for closure&quot;))
for {
select {
case &lt;-ctx.Done():
log.Println(&quot;killing process&quot;)			
return nil
case &lt;-time.After(2* time.second):
log.Println(&quot;started default context&quot;)
return nil
}
}

.

This works for me now i didn't found any better way. Please comment if anyone is having better way.

huangapple
  • 本文由 发表于 2023年5月24日 18:56:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/76322770.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定