从exec.Command逐行捕获stdout,并将其同时传输到os.Stdout。

huangapple go评论103阅读模式
英文:

Capture stdout from exec.Command line by line and also pipe to os.Stdout

问题

有人可以帮忙吗?

我有一个应用程序,我通过exec.CommandContext来运行它(这样我可以通过ctx取消它)。通常情况下,它不会停止,除非出现错误。

我目前已经将其输出传递到os.StdOut,效果很好。但我还想通过一个通道获取每一行的输出 - 这样做的想法是,我会在每一行上查找一个正则表达式,如果匹配成功,我会设置一个内部状态为"ERROR"。

虽然我无法让它工作,我尝试了NewScanner。以下是我的代码。

正如我所说,它确实输出到os.StdOut,效果很好,但我希望能够在我设置的通道中实时接收每一行。

有什么想法吗?

提前感谢。

func (d *Daemon) Start() {
    ctx, cancel := context.WithCancel(context.Background())
    d.cancel = cancel

    go func() {
        args := "-x f -a 1"
        cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)

        var stdoutBuf, stderrBuf bytes.Buffer

        cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf)
        cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf)

        lines := make(chan string)

        go func() {
            scanner := bufio.NewScanner(os.Stdin)
            for scanner.Scan() {
                fmt.Println("我正在读取一行!")
                lines <- scanner.Text()
            }
        }()

        err := cmd.Start()
        if err != nil {
            log.Fatal(err)
        }

        select {
        case outputx := <-lines:
            // 我将对其进行处理!
            fmt.Println("你好!", outputx)

        case <-ctx.Done():
            log.Println("我完成了!可能已取消!")
        }
    }()
}

还尝试了这个:

    go func() {
        scanner := bufio.NewScanner(&stdoutBuf)
        for scanner.Scan() {
            fmt.Println("我正在读取一行!")
            lines <- scanner.Text()
        }
    }()

即使使用了这个,"我正在读取一行"也从未输出,我还进行了调试,它从未进入"for scanner.."。

还尝试在&stderrBuf上进行扫描,同样,没有任何输出。

英文:

Can anyone help ?

I have an application I am running via exec.CommandContext (so I can cancel it via ctx). it would normally not stop unless it errors out.

I currently have it relaying its output to os.stdOut which is working great. But I also want to get each line via a channel - the idea behind this is I will look for a regular expression on the line and if its true then I will set an internal state of "ERROR" for example.

Although I can't get it to work, I tried NewSscanner. Here is my code.

As I say, it does output to os.StdOut which is great but I would like to receive each line as it happens in my channel I setup.

Any ideas ?

Thanks in advance.

func (d *Daemon) Start() {
	ctx, cancel := context.WithCancel(context.Background())
	d.cancel = cancel

	go func() {
		args := &quot;-x f -a 1&quot;
		cmd := exec.CommandContext(ctx, &quot;mydaemon&quot;, strings.Split(args, &quot; &quot;)...)

		var stdoutBuf, stderrBuf bytes.Buffer

		cmd.Stdout = io.MultiWriter(os.Stdout, &amp;stdoutBuf)
		cmd.Stderr = io.MultiWriter(os.Stderr, &amp;stderrBuf)

		lines := make(chan string)

		go func() {
			scanner := bufio.NewScanner(os.Stdin)
			for scanner.Scan() {
				fmt.Println(&quot;I am reading a line!&quot;)
				lines &lt;- scanner.Text()
			}
		}()

		err := cmd.Start()
		if err != nil {
			log.Fatal(err)
		}

		select {
		case outputx := &lt;-lines:
			// I will do somethign with this!
			fmt.Println(&quot;Hello!!&quot;, outputx)

		case &lt;-ctx.Done():
			log.Println(&quot;I am done!, probably cancelled!&quot;)
		}
	}()
}

Also tried using this

		go func() {
			scanner := bufio.NewScanner(&amp;stdoutBuf)
			for scanner.Scan() {
				fmt.Println(&quot;I am reading a line!&quot;)
				lines &lt;- scanner.Text()
			}
		}()

Even with that, the "I am reading a line" never gets out, I also debugged it and it neve enters the "for scanner.."

Also tried scanning on &amp;stderrBuf, same, nothing enters.

答案1

得分: 4

cmd.Start()不会等待命令执行完成。此外,需要调用cmd.Wait()来获取进程结束的通知。

reader, writer := io.Pipe()

cmdCtx, cmdDone := context.WithCancel(context.Background())

scannerStopped := make(chan struct{})
go func() {
    defer close(scannerStopped)

    scanner := bufio.NewScanner(reader)
    for scanner.Scan() {
        fmt.Println(scanner.Text())
    }
}()

cmd := exec.Command("ls")
cmd.Stdout = writer
_ = cmd.Start()
go func() {
    _ = cmd.Wait()
    cmdDone()
    writer.Close()
}()
<-cmdCtx.Done()

<-scannerStopped

添加了scannerStopped以演示扫描器协程现在停止了。

reader, writer := io.Pipe()

scannerStopped := make(chan struct{})
go func() {
    defer close(scannerStopped)

    scanner := bufio.NewScanner(reader)
    for scanner.Scan() {
        fmt.Println(scanner.Text())
    }
}()

cmd := exec.Command("ls")
cmd.Stdout = writer
_ = cmd.Run()

go func() {
    _ = cmd.Wait()
    writer.Close()
}()

<-scannerStopped

并处理这些行,因为它有帮助。

注意:我匆忙写下了这段代码。如果有任何不清楚或不正确的地方,请告诉我。

英文:

cmd.Start() does not wait for the command to finish. Also, cmd.Wait() needs to be called to be informed about the end of the process.

reader, writer := io.Pipe()

cmdCtx, cmdDone := context.WithCancel(context.Background())

scannerStopped := make(chan struct{})
go func() {
	defer close(scannerStopped)

	scanner := bufio.NewScanner(reader)
	for scanner.Scan() {
		fmt.Println(scanner.Text())
	}
}()

cmd := exec.Command(&quot;ls&quot;)
cmd.Stdout = writer
_ = cmd.Start()
go func() {
	_ = cmd.Wait()
	cmdDone()
	writer.Close()
}()
&lt;-cmdCtx.Done()

&lt;-scannerStopped

scannerStopped is added to demonstrate that the scanner goroutine stops now.

reader, writer := io.Pipe()

scannerStopped := make(chan struct{})
go func() {
	defer close(scannerStopped)

	scanner := bufio.NewScanner(reader)
	for scanner.Scan() {
		fmt.Println(scanner.Text())
	}
}()

cmd := exec.Command(&quot;ls&quot;)
cmd.Stdout = writer
_ = cmd.Run()

go func() {
	_ = cmd.Wait()
	writer.Close()
}()

&lt;-scannerStopped

And handle the lines as it helps.

Note: wrote this in a bit of hurry. Let me know if anything is unclear or not correct.

答案2

得分: 2

对于使用并发和goroutine的正确程序,我们应该尽量确保没有数据竞争,程序不会死锁,并且goroutine不会泄漏。让我们试着实现这个目标。

完整代码

Playground: https://play.golang.org/p/Xv1hJXYQoZq。我建议将代码复制并在本地运行,因为据我所知,playground不会实时输出,并且有超时限制。

请注意,我已将测试命令更改为% find /usr/local,这是一个通常运行时间较长(>3秒)且有大量输出行的命令,因为它更适合我们要测试的场景。

演示

让我们看一下Daemon.Start方法。在开始时,它基本上是相同的。但是需要注意的是,新代码在方法的大部分周围没有goroutine。即使没有这个,Daemon.Start方法仍然是非阻塞的,并且会立即返回。


第一个值得注意的修复是这些更新的行。

	outR, outW := io.Pipe()
	cmd.Stdout = io.MultiWriter(outW, os.Stdout)

我们不再构造一个bytes.Buffer变量,而是调用io.Pipe。如果我们没有进行这个更改,而是坚持使用bytes.Buffer,那么一旦没有更多数据可读,scanner.Scan()将返回false。这可能发生在命令只偶尔写入stdout的情况下(即使相隔一毫秒)。在scanner.Scan()返回false之后,goroutine退出,我们将错过处理未来的输出。

相反,通过使用io.Pipe的读端,scanner.Scan()将等待来自管道读端的输入,直到管道的写端关闭。

这解决了扫描器和命令输出之间的竞争问题。


接下来,我们构造了两个密切相关的goroutine:第一个从<-lines消费,第二个向lines<-生产。

	go func() {
		for line := range lines {
			fmt.Println("output line from channel:", line)
			...
		}
	}()
	go func() {
		defer close(lines)
		scanner := bufio.NewScanner(outR)
		for scanner.Scan() {
			lines <- scanner.Text()
		}
		...
	}()

lines通道关闭时,消费者goroutine将退出,因为通道的关闭会自然地导致range循环终止;生产者goroutine在退出时关闭lines

scanner.Scan()返回false时,生产者goroutine将退出,这发生在io.Pipe的写端关闭时。这个关闭操作在即将出现的代码中进行。

请注意,根据上面两段的描述,这两个goroutine都保证会退出(即不会泄漏)。


接下来,我们启动命令。这是标准的操作,它是一个非阻塞调用,并且会立即返回。

// Start the command.
if err := cmd.Start(); err != nil {
	log.Fatal(err)
}

继续看Daemon.Start中的最后一段代码。这个goroutine通过cmd.Wait()等待命令退出。处理这个很重要,因为命令可能因为其他原因而退出。

特别是,我们希望关闭io.Pipe的写端(从而关闭前面提到的输出行生产者goroutine)。

	go func() {
		err := cmd.Wait()
		fmt.Println("command exited; error is:", err)
		outW.Close()
		...
	}()

顺便提一下,通过等待cmd.Wait(),我们不必单独等待ctx.Done()。等待cmd.Wait()处理了由自然原因引起的退出(命令成功完成、命令遇到内部错误等)以及由上下文取消引起的退出。

这个goroutine也保证会退出。它将在cmd.Wait()返回时退出。这可能是因为命令正常退出并成功完成;由于命令错误而失败退出;或者由于上下文取消而失败退出。


就是这样!我们应该没有数据竞争,没有死锁,也没有泄漏的goroutine。

上面片段中省略的部分("...")是针对Daemon类型的Done()CmdErr()Cancel()方法的代码。这些方法在代码中有相当详细的文档,所以这些省略的行希望是不言自明的。

除此之外,还可以查找TODO注释,根据需要进行错误处理!

测试!

使用以下驱动程序来测试代码。

func main() {
	var d Daemon
	d.Start()

	// Enable this code to test Context cancellation:
	// time.AfterFunc(100*time.Millisecond, d.Cancel)

	<-d.Done()
	fmt.Println("d.CmdErr():", d.CmdErr())
}
英文:

For a correct program using concurrency and goroutines, we should try to show there are no data races, the program can't deadlock, and goroutines don't leak. Let's try to achieve this.

Full code

Playground: https://play.golang.org/p/Xv1hJXYQoZq. I recommend copying and running locally, because the playground doesn't stream output afaik and it has timeouts.

Note that I've changed the test command to % find /usr/local, a typically long-running command (>3 seconds) with plenty of output lines, since it is better suited for the scenarios we should test.

Walkthrough

Let's look at the Daemon.Start method. At the start, it is mostly the same. Most noticeably, though, the new code doesn't have a goroutine around a large part of the method. Even without this, the Daemon.Start method remains non-blocking and will return "immediately".


The first noteworthy fix is these updated lines.

	outR, outW := io.Pipe()
	cmd.Stdout = io.MultiWriter(outW, os.Stdout)

Instead of constructing a bytes.Buffer variable, we call io.Pipe. If we didn't make this change and stuck with a bytes.Buffer, then scanner.Scan() will return false as soon as there is no more data to read. This can happen if the command writes to stdout only occasionally (even a millisecond apart, for this matter). After scanner.Scan() returns false, the goroutine exits and we miss processing future output.

Instead, by using the read end of io.Pipe, scanner.Scan() will wait for input from the pipe's read end until the pipe's write end is closed.

This fixes the race issue between the scanner and the command output.


Next, we construct two closely-related goroutines: the first to consume from &lt;-lines, and the second to produce into lines&lt;-.

	go func() {
		for line := range lines {
			fmt.Println(&quot;output line from channel:&quot;, line)
			...
		}
	}()
	go func() {
		defer close(lines)
		scanner := bufio.NewScanner(outR)
		for scanner.Scan() {
			lines &lt;- scanner.Text()
		}
		...
	}()

The consumer goroutine will exit when the lines channel is closed, as the closing of the channel would naturally cause the range loop to terminate; the producer goroutine closes lines upon exit.

The producer goroutine will exit when scanner.Scan() returns false, which happens when the write end of the io.Pipe is closed. This closing happens in upcoming code.

Note from the two paragraphs above that the two goroutines are guaranteed to exit (i.e. will not leak).


Next, we start the command. Standard stuff, it's a non-blocking call, and it returns immediately.

// Start the command.
if err := cmd.Start(); err != nil {
	log.Fatal(err)
}

Moving on to the final piece of code in Daemon.Start. This goroutine waits for the command to exit via cmd.Wait(). Handling this is important because the command may for reasons other than Context cancellation.

Particularly, we want to close the write end of the io.Pipe (which, in turn, closes the output lines producer goroutine as mentioned earlier).

	go func() {
		err := cmd.Wait()
		fmt.Println(&quot;command exited; error is:&quot;, err)
		outW.Close()
		...
	}()

As a side note, by waiting on cmd.Wait(), we don't have to separately wait on ctx.Done(). Waiting on cmd.Wait() handles both exits caused by natural reasons (command successfully finished, command ran into internal error etc.) and exits caused by Context-cancelation.

This goroutine, too, is guaranteed to exit. It will exit when cmd.Wait() returns. This can happen either because the command exited normally with success; exited with failure due to a command error; or exited with failure due to Context cancelation.


That's it! We should have no data races, no deadlocks, and no leaked goroutines.

The lines elided ("...") in the snippets above are geared towards the Done(), CmdErr(), and Cancel() methods of the Daemon type. These methods are fairly well-documented in the code, so these elided lines are hopefully self-explanatory.

Besides that, look for the TODO comments for error handling you may want to do based on your needs!

Test it!

Use this driver program to test the code.

func main() {
	var d Daemon
	d.Start()

	// Enable this code to test Context cancellation:
	// time.AfterFunc(100*time.Millisecond, d.Cancel)

	&lt;-d.Done()
	fmt.Println(&quot;d.CmdErr():&quot;, d.CmdErr())
}

答案3

得分: 0

你需要扫描stdoutBuf而不是os.Stdin

scanner := bufio.NewScanner(&stdoutBuf)
英文:

You have to scan stdoutBuf instead of os.Stdin:

scanner := bufio.NewScanner(&amp;stdoutBuf)

答案4

得分: 0

命令在上下文被取消时终止。如果可以在命令终止之前读取所有输出,可以使用以下代码:

func (d *Daemon) Start() {
    ctx, cancel := context.WithCancel(context.Background())
    d.cancel = cancel

    args := "-x f -a 1"
    cmd := exec.CommandContext(ctx, "mydaemon", strings.Split(args, " ")...)
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Fatal(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Fatal(err)
    }

    go func() {
        defer cmd.Wait()
        scanner := bufio.NewScanner(stdout)
        for scanner.Scan() {
            s := scanner.Text()
            fmt.Println(s) // 输出到标准输出
            // 对 s 进行处理
        }
    }()
}

当上下文被取消时,命令将被终止。当命令终止时,从 stdout 读取将返回 io.EOF。当 stdout 返回错误时,goroutine 将退出扫描循环。

英文:

The command is terminated when the context canceled. If it's OK to read all output from the command until the command is terminated, then use this code:

func (d *Daemon) Start() {
	ctx, cancel := context.WithCancel(context.Background())
	d.cancel = cancel

	args := &quot;-x f -a 1&quot;
	cmd := exec.CommandContext(ctx, &quot;mydaemon&quot;, strings.Split(args, &quot; &quot;)...)
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		log.Fatal(err)
	}
	err = cmd.Start()
	if err != nil {
		log.Fatal(err)
	}

	go func() {
		defer cmd.Wait()
		scanner := bufio.NewScanner(stdout)
		for scanner.Scan() {
			s := scanner.Text()
			fmt.Println(s) // echo to stdout
			// Do something with s
		}
	}()
}

The command is terminated when the context is canceled.
Read on stdout returns io.EOF when the command is terminated. The goroutine breaks out of the scan loop when stdout returns an error.

huangapple
  • 本文由 发表于 2021年11月13日 21:59:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/69954944.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定