读取超时

huangapple go评论67阅读模式
英文:

Go timeout for Read

问题

我想做类似于Unix的tail -f的操作,但是针对通过Go的Cmd功能运行的进程产生的输出。

显然,我的谷歌搜索能力不够强,但我找到了这篇文章,它引导我编写了下面的代码,几乎可以工作,但有一个奇怪的问题,希望能得到帮助。

如果有关系的话,我是在Mac上运行这个程序的。

首先,这是一个最小化的程序,编译成slowroll可执行文件:

package main

import (
	"fmt"
	"time"
)

func main() {
	line := 1
	for {
		fmt.Println("This is line", line)
		line += 1
		time.Sleep(2 * time.Second)
	}
}

运行时,它每2秒输出一行:

> ./slowroll
This is line 1
This is line 2
This is line 3
This is line 4

下面是尝试读取这个输出并允许超时的包代码:

package timeout_io

import (
	"bufio"
	"bytes"
	"context"
	"errors"
	"time"
)

const BufferSize = 4096

var ErrTimeout = errors.New("timeout")

type TimeoutReader struct {
	b *bufio.Reader
	t time.Duration
}

func NewTimeoutReader(stdOut *bytes.Buffer) *TimeoutReader {
	return &TimeoutReader{b: bufio.NewReaderSize(stdOut, BufferSize), t: 0}
}

func (r *TimeoutReader) SetTimeout(t time.Duration) time.Duration {
	prev := r.t
	r.t = t
	return prev
}

type CallResponse struct {
	Resp string
	Err  error
}

func helper(r *bufio.Reader) <-chan *CallResponse {
	respChan := make(chan *CallResponse, 1)

	go func() {
		resp, err := r.ReadString('\n')

		if err != nil {
			respChan <- &CallResponse{resp, err}
		} else {
			respChan <- &CallResponse{resp, nil}
		}

		return
	}()

	return respChan
}

func (r *TimeoutReader) ReadLineCtx(ctx context.Context) (string, error) {
	select {
	case <-ctx.Done():
		return "", ErrTimeout
	case respChan := <-helper(r.b):
		return respChan.Resp, respChan.Err
	}
}

func (r *TimeoutReader) ReadLine() (string, error) {
	ctx, cancel := context.WithTimeout(context.Background(), r.t)
	defer cancel()

	s, err := r.ReadLineCtx(ctx)
	if err != nil {
		return "", err
	}

	return s, nil
}

最后,这是调用带有超时的ReadLinemain代码:

package main

import (
	"bytes"
	"fmt"
	"io"
	"os"
	"os/exec"
	"sync"
	"time"

	"watcher/timeout_io"
)

func main() {
	var stdOut bytes.Buffer
	var stdErr bytes.Buffer
	runCommand := &exec.Cmd{
		Path:   "./slowroll",
		Stdout: &stdOut,
		Stderr: &stdErr,
	}

	var wg sync.WaitGroup

	go func(wg *sync.WaitGroup) {
		defer wg.Done()

		err := runCommand.Run()
		if err != nil {
			fmt.Println("failed due to error:", err)
			os.Exit(1)
		}
	}(&wg)

	wg.Add(1)

	stdOutReader := timeout_io.NewTimeoutReader(&stdOut)
	stdOutReader.SetTimeout(10 * time.Millisecond)
	index := 1
	for {
		s, err := stdOutReader.ReadLine()
		if err != nil {
			if err != timeout_io.ErrTimeout && err != io.EOF {
				fmt.Println("ReadLine got error", err)
				break
			}
		} else if len(s) > 0 {
			fmt.Println("index:", index, "s:", s)
			index += 1
			s = ""
		}
	}

	wg.Wait()
	fmt.Println("Done!")
}

运行时,它产生以下输出:

> go run watcher.go
index: 1 s: This is line 1
index: 2 s: This is line 2
index: 3 s: This is line 2
index: 4 s: This is line 3
index: 5 s: This is line 2
index: 6 s: This is line 3
index: 7 s: This is line 4
index: 8 s: This is line 2
index: 9 s: This is line 3
index: 10 s: This is line 4
index: 11 s: This is line 5

偶尔,一些slowroll的输出行根本不显示;重复出现的行是随机的。

这就是我的问题...我看不到导致行被多次输出的(明显的)循环在哪里。

非常感谢提前的帮助!

英文:

I'd like to do something like unix's tail -f, but on the output produced by a process run through Go's Cmd facility.

My google-fu is not up to par, evidently, but I did find this article which lead me to write the following code, which almost works, with a bizarre twist I'm hoping I can get help with.

If it matters, I'm running this on a Mac.

First, here's the minimal program that's compiled to be the slowroll executable:

package main

import (
	&quot;fmt&quot;
	&quot;time&quot;
)

func main() {
	line := 1
	for {
		fmt.Println(&quot;This is line&quot;, line)
		line += 1
		time.Sleep(2 * time.Second)
	}
}

When run, it produces the following output, one line every 2 seconds:

	&gt; ./slowroll
This is line 1
This is line 2
This is line 3
This is line 4

And so on.

Here's the package code that attempts to read this, but allowing timeouts so other things can be done:

package timeout_io

import (
	&quot;bufio&quot;
	&quot;bytes&quot;
	&quot;context&quot;
	&quot;errors&quot;
	&quot;time&quot;
)

const BufferSize = 4096

var ErrTimeout = errors.New(&quot;timeout&quot;)

type TimeoutReader struct {
	b *bufio.Reader
	t time.Duration
}

func NewTimeoutReader(stdOut *bytes.Buffer) *TimeoutReader {
	return &amp;TimeoutReader{b: bufio.NewReaderSize(stdOut, BufferSize), t: 0}
}

func (r *TimeoutReader) SetTimeout(t time.Duration) time.Duration {
	prev := r.t
	r.t = t
	return prev
}

type CallResponse struct {
	Resp string
	Err  error
}

func helper(r *bufio.Reader) &lt;-chan *CallResponse {
	respChan := make(chan *CallResponse, 1)

	go func() {
		resp, err := r.ReadString(&#39;\n&#39;)

		if err != nil {
			respChan &lt;- &amp;CallResponse{resp, err}
		} else {
			respChan &lt;- &amp;CallResponse{resp, nil}
		}

		return
	}()

	return respChan
}

func (r *TimeoutReader) ReadLineCtx(ctx context.Context) (string, error) {
	select {
	case &lt;-ctx.Done():
		return &quot;&quot;, ErrTimeout
	case respChan := &lt;-helper(r.b):
		return respChan.Resp, respChan.Err
	}
}

func (r *TimeoutReader) ReadLine() (string, error) {
	ctx, cancel := context.WithTimeout(context.Background(), r.t)
	defer cancel()

	s, err := r.ReadLineCtx(ctx)
	if err != nil {
		return &quot;&quot;, err
	}

	return s, nil
}

Finally, here's the main code that calls ReadLine with timeout:

package main

import (
	&quot;bytes&quot;
	&quot;fmt&quot;
	&quot;io&quot;
	&quot;os&quot;
	&quot;os/exec&quot;
	&quot;sync&quot;
	&quot;time&quot;

	&quot;watcher/timeout_io&quot;
)

func main() {
	var stdOut bytes.Buffer
	var stdErr bytes.Buffer
	runCommand := &amp;exec.Cmd{
		Path:   &quot;./slowroll&quot;,
		Stdout: &amp;stdOut,
		Stderr: &amp;stdErr,
	}

	var wg sync.WaitGroup

	go func(wg *sync.WaitGroup) {
		defer wg.Done()

		err := runCommand.Run()
		if err != nil {
			fmt.Println(&quot;failed due to error:&quot;, err)
			os.Exit(1)
		}
	}(&amp;wg)

	wg.Add(1)

	stdOutReader := timeout_io.NewTimeoutReader(&amp;stdOut)
	stdOutReader.SetTimeout(10 * time.Millisecond)
	index := 1
	for {
		s, err := stdOutReader.ReadLine()
		if err != nil {
			if err != timeout_io.ErrTimeout &amp;&amp; err != io.EOF {
				fmt.Println(&quot;ReadLine got error&quot;, err)
				break
			}
		} else if len(s) &gt; 0 {
			fmt.Println(&quot;index: &quot;, index, &quot; s: &quot;, s)
			index += 1
			s = &quot;&quot;
		}
	}

	wg.Wait()
	fmt.Println(&quot;Done!&quot;)
}

When run, it produces the following output:

	&gt; go run watcher.go
index:  1  s:  This is line 1
index:  2  s:  This is line 2
index:  3  s:  This is line 2
index:  4  s:  This is line 3
index:  5  s:  This is line 2
index:  6  s:  This is line 3
index:  7  s:  This is line 4
index:  8  s:  This is line 2
index:  9  s:  This is line 3
index:  10  s:  This is line 4
index:  11  s:  This is line 5

And so on.

Occasionally, some slowroll output lines don't show up at all; which lines get repeated is random.

So that's my mystery... I don't see where the (apparent) loop is happening that causes the lines to be produced multiple times.

Thanks very much in advance for any help!

答案1

得分: 1

通过创建一个管道并从该管道读取来简化代码:

cmd := exec.Command("./slowroll")
stdout, _ := cmd.StdoutPipe()
if err := cmd.Start(); err != nil {
    log.Fatal(err)
}

s := bufio.NewScanner(stdout)
for s.Scan() {
    fmt.Printf("%s\n", s.Bytes())
}

如果你的目标是监视stderr和stdin的组合输出,则可以同时使用同一个管道:

cmd := exec.Command("./slowroll")
combined, _ := cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout // <-- 使用stdout管道作为stderr
if err := cmd.Start(); err != nil {
    log.Fatal(err)
}

s := bufio.NewScanner(combined)
for s.Scan() {
    fmt.Printf("%s\n", s.Bytes())
}

问题中的代码在stdOut bytes.Buffer上存在数据竞争。

英文:

Simplify the code by creating a pipe and reading from that pipe:

cmd := exec.Command(&quot;./slowroll&quot;)
stdout, _ := cmd.StdoutPipe()
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
s := bufio.NewScanner(stdout)
for s.Scan() {
fmt.Printf(&quot;%s\n&quot;, s.Bytes())
}

If your goal is to monitor the combined output of stderr and stdin, then use the same pipe for both:

cmd := exec.Command(&quot;./slowroll&quot;)
combined, _ := cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout // &lt;-- use stdout pipe for stderr
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
s := bufio.NewScanner(combined)
for s.Scan() {
fmt.Printf(&quot;%s\n&quot;, s.Bytes())
}

The code in the question has a data race on the stdOut bytes.Buffer.

答案2

得分: 0

如果出现这样的条件,ErrTimeout 将被静默忽略,并且不会中断你的读取循环。

还要注意,达到 io.EOF 会使你的程序进入无限循环(尝试使用 echo "Hello" 而不是 ./slowroll 作为命令)。


你可能想在 if 块之后放置 break 指令:

if err != timeout_io.ErrTimeout && err != io.EOF {
    fmt.Println("ReadLine got error", err)
}
break
英文:

if err != timeout_io.ErrTimeout &amp;&amp; err != io.EOF { ...; break; }

With such a condition, an ErrTimeout will be silently ignored and will not interrupt your reading loop.

Also note that reaching io.EOF would send your program in an endless loop (try using echo &quot;Hello&quot; instead of ./slowroll as a command).


You probably want to place the break instruction after the if block :

if err != timeout_io.ErrTimeout &amp;&amp; err != io.EOF {
fmt.Println(&quot;ReadLine got error&quot;, err)
}
break

答案3

得分: 0

昨晚意识到我在与Go的标准行为作斗争。

应该解释一下目标是能够同时监视标准输出和标准错误输出。

根据@Zombo的建议,我切换到了cmd.StdoutPipecmd.StderrPipe

主要思路是创建读取管道并将找到的内容放入通道的goroutine,然后在通道之间使用select语句。

所以为了展示EOF不会导致无限循环,slowroll.go不会产生无限输出:

package main

import (
	"fmt"
	"os"
	"time"
)

func main() {
	line := 1
	for {
		fmt.Println("This is line", line)
		line += 1
		time.Sleep(2 * time.Second)
		if line%3 == 0 {
			fmt.Fprintf(os.Stderr, "This is error %d\n", line)
		}
		if line > 10 {
			break
		}
	}
}

而更简单、可行的watcher.go现在是这样的:

package main

import (
	"bufio"
	"fmt"
	"os"
	"os/exec"
	"sync"
)

func main() {
	runCommand := &exec.Cmd{
		Path: "./slowroll",
	}
	stdOut, err := runCommand.StdoutPipe()
	if err != nil {
		fmt.Println("无法创建StdoutPipe:", err)
		os.Exit(1)
	}
	stdErr, err := runCommand.StderrPipe()
	if err != nil {
		fmt.Println("无法创建StderrPipe:", err)
		os.Exit(1)
	}

	var wg sync.WaitGroup

	go func(wg *sync.WaitGroup) {
		defer wg.Done()

		err := runCommand.Run()
		if err != nil {
			fmt.Println("由于错误而失败:", err)
			os.Exit(1)
		}
	}(&wg)

	wg.Add(1)

	stdOutChan := make(chan string, 1)
	go func(wg *sync.WaitGroup) {
		defer wg.Done()

		scanner := bufio.NewScanner(stdOut)
		for scanner.Scan() {
			stdOutChan <- string(scanner.Bytes())
		}
		fmt.Println("stdout输入已用完,读取线程退出。")
		close(stdOutChan)
	}(&wg)

	wg.Add(1)

	stdErrChan := make(chan string, 1)
	go func(wg *sync.WaitGroup) {
		defer wg.Done()

		scanner := bufio.NewScanner(stdErr)
		for scanner.Scan() {
			stdErrChan <- string(scanner.Bytes())
		}
		fmt.Println("stderr输入已用完,读取线程退出。")
		close(stdErrChan)
	}(&wg)

	wg.Add(1)

	index := 1
	keepGoing := true
	for keepGoing {
		select {
		case res, isOpen := <-stdOutChan:
			if !isOpen {
				fmt.Println("stdOutChan已关闭,主线程退出。")
				keepGoing = false
			} else {
				fmt.Println(index, "s:", res)
				index += 1
			}

		case res, isOpen := <-stdErrChan:
			if !isOpen {
				fmt.Println("stdErrChan已关闭,主线程退出。")
				keepGoing = false
			} else {
				fmt.Println(index, "error s:", res)
				index += 1
			}
		}
	}

	wg.Wait()
	fmt.Println("完成!")
}

输出:

> go run watcher.go
1 s: This is line 1
2 s: This is line 2
3 error s: This is error 3
4 s: This is line 3
5 s: This is line 4
6 s: This is line 5
7 s: This is line 6
8 error s: This is error 6
9 s: This is line 7
10 s: This is line 8
11 s: This is line 9
12 error s: This is error 9
13 s: This is line 10
stdout输入已用完,读取线程退出。
stdOutChan已关闭,主线程退出。
stderr输入已用完,读取线程退出。
完成!

显然,它可以进行一些重构,但它能够正常工作,这就是目标。

谢谢!

英文:

Realized late last night that I was kind of fighting go's standard behavior.

Should have explained that the goal was to be able to watch stdout and stderr at the same time.

Taking @Zombo's advice above, I switched to cmd.StdoutPipe and cmd.StderrPipe.

The main idea is to just have goroutines that read the pipes and put content found into channels, and then select between the channels.

So slowroll.go does not produce infinite output, to show that EOF doesn't cause a infinite loop:

package main

import (
	&quot;fmt&quot;
	&quot;os&quot;
	&quot;time&quot;
)

func main() {
	line := 1
	for {
		fmt.Println(&quot;This is line&quot;, line)
		line += 1
		time.Sleep(2 * time.Second)
		if line%3 == 0 {
			fmt.Fprintf(os.Stderr, &quot;This is error %d\n&quot;, line)
		}
		if line &gt; 10 {
			break
		}
	}
}

And the simpler, working watcher.go is now:

package main

import (
	&quot;bufio&quot;
	&quot;fmt&quot;
	&quot;os&quot;
	&quot;os/exec&quot;
	&quot;sync&quot;
)

func main() {
	runCommand := &amp;exec.Cmd{
		Path: &quot;./slowroll&quot;,
	}
	stdOut, err := runCommand.StdoutPipe()
	if err != nil {
		fmt.Println(&quot;Can&#39;t create StdoutPipe:&quot;, err)
		os.Exit(1)
	}
	stdErr, err := runCommand.StderrPipe()
	if err != nil {
		fmt.Println(&quot;Can&#39;t create StderrPipe:&quot;, err)
		os.Exit(1)
	}

	var wg sync.WaitGroup

	go func(wg *sync.WaitGroup) {
		defer wg.Done()

		err := runCommand.Run()
		if err != nil {
			fmt.Println(&quot;failed due to error:&quot;, err)
			os.Exit(1)
		}
	}(&amp;wg)

	wg.Add(1)

	stdOutChan := make(chan string, 1)
	go func(wg *sync.WaitGroup) {
		defer wg.Done()

		scanner := bufio.NewScanner(stdOut)
		for scanner.Scan() {
			stdOutChan &lt;- string(scanner.Bytes())
		}
		fmt.Println(&quot;Ran out of stdout input, read thread bailing.&quot;)
		close(stdOutChan)
	}(&amp;wg)

	wg.Add(1)

	stdErrChan := make(chan string, 1)
	go func(wg *sync.WaitGroup) {
		defer wg.Done()

		scanner := bufio.NewScanner(stdErr)
		for scanner.Scan() {
			stdErrChan &lt;- string(scanner.Bytes())
		}
		fmt.Println(&quot;Ran out of stderr input, read thread bailing.&quot;)
		close(stdErrChan)
	}(&amp;wg)

	wg.Add(1)

	index := 1
	keepGoing := true
	for keepGoing {
		select {
		case res, isOpen := &lt;-stdOutChan:
			if !isOpen {
				fmt.Println(&quot;stdOutChan is no longer open, main bailing.&quot;)
				keepGoing = false
			} else {
				fmt.Println(index, &quot;s:&quot;, res)
				index += 1
			}

		case res, isOpen := &lt;-stdErrChan:
			if !isOpen {
				fmt.Println(&quot;stdErrChan is no longer open, main bailing.&quot;)
				keepGoing = false
			} else {
				fmt.Println(index, &quot;error s:&quot;, res)
				index += 1
			}
		}
	}

	wg.Wait()
	fmt.Println(&quot;Done!&quot;)
}

Output:

	&gt; go run watcher.go
1 s: This is line 1
2 s: This is line 2
3 error s: This is error 3
4 s: This is line 3
5 s: This is line 4
6 s: This is line 5
7 s: This is line 6
8 error s: This is error 6
9 s: This is line 7
10 s: This is line 8
11 s: This is line 9
12 error s: This is error 9
13 s: This is line 10
Ran out of stdout input, read thread bailing.
stdOutChan is no longer open, main bailing.
Ran out of stderr input, read thread bailing.
Done!

Could stand with some refactoring, obviously, but it works, which is the goal.

Thanks!

huangapple
  • 本文由 发表于 2022年2月9日 06:30:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/71041707.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定