Properly passing data on stdin to a command and receiving data from stdout of that command in golang

huangapple go评论87阅读模式
英文:

Properly passing data on stdin to a command and receiving data from stdout of that command in golang

问题

我有以下的程序:

package main

import "bytes"
import "io"
import "log"
import "os"
import "os/exec"
import "time"

func main() {
    runCatFromStdinWorks(populateStdin("aaa\n"))
    runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        io.Copy(stdin, bytes.NewBufferString(str))
    }
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    go populate_stdin_func(stdin)
    go func() {
        // Removing the following lines allow some output
        // to be fetched from cat's stdout sometimes
        time.Sleep(5 * time.Second)
        io.Copy(os.Stdout, stdout)
    }()
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}

当在循环中运行时,我得不到结果,如下所示:

$ while true; do go run cat_thingy.go; echo ; done



^C

这个结果是在一个虚拟机上通过apt安装golang-go后得到的(go版本go1)。我无法在Macbook Air上的go安装中复制这个问题(go版本go1.0.3)。这似乎是某种竞争条件。实际上,如果我放一个sleep(1*time.Second),我就看不到这个问题,但代码中会有一个随机的睡眠。

我在代码中做错了什么,还是这是一个bug?如果是一个bug,它已经修复了吗?

更新:可能的线索

我发现Command.Wait会关闭用于与cat子进程通信的管道,即使它们仍然有未读数据。我不太确定正确的处理方式。我猜我可以创建一个通道来通知写入stdin何时完成,但我仍然需要知道cat进程是否已经结束,以确保没有其他内容将被写入其stdout管道。我知道我可以使用cmd.Process.Wait来确定进程何时结束,但是调用cmd.Wait是否安全?

更新:越来越接近

这是代码的新版本。我相信这在写入stdin和从stdout读取方面是有效的。如果我用一个流来替换stdout处理的goroutine中的io.Copy,我认为我可以使其正确地流式传输数据(而不是缓冲所有数据)。

package main

import "bytes"
import "fmt"
import "io"
import "log"
import "os/exec"
import "runtime"

const inputBufferBlockLength = 3*64*(2<<10) // enough to be bigger than 2x the pipe buffer of 64KiB
const numInputBlocks = 6

func main() {
    runtime.GOMAXPROCS(5)
    runCatFromStdin(populateStdin(numInputBlocks))
}

func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        repeatedByteBases := []string{"a", "b", "c", "d", "e", "f"}
        for i := 0; i < numInputBlocks; i++ {
            repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes()
            fmt.Printf("%s\n", repeatedBytes)
            io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength)))
        }
    }
}

func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    go populate_stdin_func(stdin)
    output_done_channel := make(chan bool)
    go func() {
        out_bytes := new(bytes.Buffer)
        io.Copy(out_bytes, stdout)
        fmt.Printf("%s\n", out_bytes)
        fmt.Println(out_bytes.Len())
        fmt.Println(inputBufferBlockLength*numInputBlocks)
        output_done_channel <- true
    }()
    <-output_done_channel
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}
英文:

I have the following program:

package main

import &quot;bytes&quot;
import &quot;io&quot;
import &quot;log&quot;
import &quot;os&quot;
import &quot;os/exec&quot;
import &quot;time&quot;

func main() {
	runCatFromStdinWorks(populateStdin(&quot;aaa\n&quot;))
	runCatFromStdinWorks(populateStdin(&quot;bbb\n&quot;))
}

func populateStdin(str string) func(io.WriteCloser) {
	return func(stdin io.WriteCloser) {
		defer stdin.Close()
		io.Copy(stdin, bytes.NewBufferString(str))
	}
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
	cmd := exec.Command(&quot;cat&quot;)
	stdin, err := cmd.StdinPipe()
	if err != nil {
		log.Panic(err)
	}
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		log.Panic(err)
	}
	err = cmd.Start()
	if err != nil {
		log.Panic(err)
	}
	go populate_stdin_func(stdin)
	go func() {
            // Removing the following lines allow some output
            // to be fetched from cat&#39;s stdout sometimes
            time.Sleep(5 * time.Second)
            io.Copy(os.Stdout, stdout)
    }()
	err = cmd.Wait()
	if err != nil {
		log.Panic(err)
	}
}

When running in a loop, I get no results, like so:

$ while true; do go run cat_thingy.go; echo ; done



^C

This result comes after installing golang-go on an Ubuntu 12.04 from apt in a virtual machine (go version go1). I have not been able to replicate on a go installation on a Macbook Air (go version go1.0.3). It seems to be some kind of race condition. In fact, if I put a sleep(1*time.Second), I never see the issue at the expense of a random sleep in my code.

Is there something I am doing wrong in the code, or is this a bug? If it is a bug, has it been fixed?

UPDATE: Possible Clue

I found that the Command.Wait will close the pipes for communicating to/from the cat subprocess even if they still have unread data. I am not really sure the about the proper way to handle that. I guess I could create a channel to notify when the writing to stdin is done, but I would still need to know if the cat process has ended to make sure that nothing else was going to be written to its stdout pipe. I know that I can use cmd.Process.Wait to determine when the process ends, but is it safe to then call cmd.Wait?

UPDATE: Getting Closer

Here's a new cut at the code. I believe that this works as far as writing to stdin and reading from the stdout. I think I can make it properly stream the data (instead of buffering it all) if I replace the io.Copy from the stdout handling goroutine without something that streams.

package main

import &quot;bytes&quot;
import &quot;fmt&quot;
import &quot;io&quot;
import &quot;log&quot;
import &quot;os/exec&quot;
import &quot;runtime&quot;

const inputBufferBlockLength = 3*64*(2&lt;&lt;10) // enough to be bigger than 2x the pipe buffer of 64KiB
const numInputBlocks = 6

func main() {
    runtime.GOMAXPROCS(5)
    runCatFromStdin(populateStdin(numInputBlocks))
}

func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        repeatedByteBases := []string{&quot;a&quot;, &quot;b&quot;, &quot;c&quot;, &quot;d&quot;, &quot;e&quot;, &quot;f&quot;}
        for i := 0; i &lt; numInputBlocks; i++ {
          repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes()
          fmt.Printf(&quot;%s\n&quot;, repeatedBytes)
          io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength)))
        }
    }
}

func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command(&quot;cat&quot;)
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    go populate_stdin_func(stdin)
    output_done_channel := make(chan bool)
    go func() {
        out_bytes := new(bytes.Buffer)
        io.Copy(out_bytes, stdout)
        fmt.Printf(&quot;%s\n&quot;, out_bytes)
        fmt.Println(out_bytes.Len())
        fmt.Println(inputBufferBlockLength*numInputBlocks)
        output_done_channel &lt;- true
    }()
    &lt;-output_done_channel
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}

答案1

得分: 5

这是你的第一个代码的一个可以工作的版本。请注意添加了sync.WaitGroup,以确保在关闭命令之前完成发送和接收的go例程。

package main

import (
	"bytes"
	"io"
	"log"
	"os"
	"os/exec"
	"sync"
	"time"
)

func main() {
	runCatFromStdinWorks(populateStdin("aaa\n"))
	runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
	return func(stdin io.WriteCloser) {
		defer stdin.Close()
		io.Copy(stdin, bytes.NewBufferString(str))
	}
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
	cmd := exec.Command("cat")
	stdin, err := cmd.StdinPipe()
	if err != nil {
		log.Panic(err)
	}
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		log.Panic(err)
	}
	err = cmd.Start()
	if err != nil {
		log.Panic(err)
	}
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		populate_stdin_func(stdin)
	}()
	go func() {
		defer wg.Done()
		time.Sleep(5 * time.Second)
		io.Copy(os.Stdout, stdout)
	}()
	wg.Wait()
	err = cmd.Wait()
	if err != nil {
		log.Panic(err)
	}
}

(这只是另一种说法,与@peterSO所说的一样;-))

英文:

Here is a version of your first code which works. Note the addition of the sync.WaitGroup to make sure you finish with the sending and receiving go routines before closing the command.

package main

import (
	&quot;bytes&quot;
	&quot;io&quot;
	&quot;log&quot;
	&quot;os&quot;
	&quot;os/exec&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

func main() {
	runCatFromStdinWorks(populateStdin(&quot;aaa\n&quot;))
	runCatFromStdinWorks(populateStdin(&quot;bbb\n&quot;))
}

func populateStdin(str string) func(io.WriteCloser) {
	return func(stdin io.WriteCloser) {
		defer stdin.Close()
		io.Copy(stdin, bytes.NewBufferString(str))
	}
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
	cmd := exec.Command(&quot;cat&quot;)
	stdin, err := cmd.StdinPipe()
	if err != nil {
		log.Panic(err)
	}
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		log.Panic(err)
	}
	err = cmd.Start()
	if err != nil {
		log.Panic(err)
	}
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		populate_stdin_func(stdin)
	}()
	go func() {
		defer wg.Done()
		time.Sleep(5 * time.Second)
		io.Copy(os.Stdout, stdout)
	}()
	wg.Wait()
	err = cmd.Wait()
	if err != nil {
		log.Panic(err)
	}
}

(This is just another way of saying what @peterSO said though Properly passing data on stdin to a command and receiving data from stdout of that command in golang

答案2

得分: 0

> Go语句
>
> 一个"go"语句启动一个函数或方法调用的执行,作为一个独立的并发线程控制,或者称为goroutine,在同一个地址空间内。
>
> GoStmt = "go" Expression .
>
> 表达式必须是一个调用。函数值和参数在调用的goroutine中按照通常的方式进行评估,但是与常规调用不同,程序执行不会等待被调用的函数完成。相反,该函数在一个新的goroutine中开始独立执行。当函数终止时,它的goroutine也终止。如果函数有任何返回值,在函数完成时它们将被丢弃。

将多余的goroutines转换为函数调用。

package main

import (
	"bytes"
	"io"
	"log"
	"os"
	"os/exec"
)

func main() {
	runCatFromStdinWorks(populateStdin("aaa\n"))
	runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
	return func(stdin io.WriteCloser) {
		defer stdin.Close()
		io.Copy(stdin, bytes.NewBufferString(str))
	}
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
	cmd := exec.Command("cat")
	stdin, err := cmd.StdinPipe()
	if err != nil {
		log.Panic(err)
	}
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		log.Panic(err)
	}
	err = cmd.Start()
	if err != nil {
		log.Panic(err)
	}
	populate_stdin_func(stdin)
	io.Copy(os.Stdout, stdout)
	err = cmd.Wait()
	if err != nil {
		log.Panic(err)
	}
}
英文:

> Go statements
>
> A "go" statement starts the execution of a function or method call as
> an independent concurrent thread of control, or goroutine, within the
> same address space.
>
> GoStmt = "go" Expression .
>
> The expression must be a call. The function value and parameters are
> evaluated as usual in the calling goroutine, but unlike with a regular
> call, program execution does not wait for the invoked function to
> complete. Instead, the function begins executing independently in a
> new goroutine. When the function terminates, its goroutine also
> terminates. If the function has any return values, they are discarded
> when the function completes.

Convert the gratuitous goroutines to function calls.

package main

import (
	&quot;bytes&quot;
	&quot;io&quot;
	&quot;log&quot;
	&quot;os&quot;
	&quot;os/exec&quot;
)

func main() {
	runCatFromStdinWorks(populateStdin(&quot;aaa\n&quot;))
	runCatFromStdinWorks(populateStdin(&quot;bbb\n&quot;))
}

func populateStdin(str string) func(io.WriteCloser) {
	return func(stdin io.WriteCloser) {
		defer stdin.Close()
		io.Copy(stdin, bytes.NewBufferString(str))
	}
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
	cmd := exec.Command(&quot;cat&quot;)
	stdin, err := cmd.StdinPipe()
	if err != nil {
		log.Panic(err)
	}
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		log.Panic(err)
	}
	err = cmd.Start()
	if err != nil {
		log.Panic(err)
	}
	populate_stdin_func(stdin)
	io.Copy(os.Stdout, stdout)
	err = cmd.Wait()
	if err != nil {
		log.Panic(err)
	}
}

huangapple
  • 本文由 发表于 2013年2月11日 06:37:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/14803425.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定