英文:
Go timeout for Read
问题
我想做类似于Unix的tail -f
的操作,但是针对通过Go的Cmd
功能运行的进程产生的输出。
显然,我的谷歌搜索能力不够强,但我找到了这篇文章,它引导我编写了下面的代码,几乎可以工作,但有一个奇怪的问题,希望能得到帮助。
如果有关系的话,我是在Mac上运行这个程序的。
首先,这是一个最小化的程序,编译成slowroll
可执行文件:
package main
import (
"fmt"
"time"
)
func main() {
line := 1
for {
fmt.Println("This is line", line)
line += 1
time.Sleep(2 * time.Second)
}
}
运行时,它每2秒输出一行:
> ./slowroll
This is line 1
This is line 2
This is line 3
This is line 4
下面是尝试读取这个输出并允许超时的包代码:
package timeout_io
import (
"bufio"
"bytes"
"context"
"errors"
"time"
)
const BufferSize = 4096
var ErrTimeout = errors.New("timeout")
type TimeoutReader struct {
b *bufio.Reader
t time.Duration
}
func NewTimeoutReader(stdOut *bytes.Buffer) *TimeoutReader {
return &TimeoutReader{b: bufio.NewReaderSize(stdOut, BufferSize), t: 0}
}
func (r *TimeoutReader) SetTimeout(t time.Duration) time.Duration {
prev := r.t
r.t = t
return prev
}
type CallResponse struct {
Resp string
Err error
}
func helper(r *bufio.Reader) <-chan *CallResponse {
respChan := make(chan *CallResponse, 1)
go func() {
resp, err := r.ReadString('\n')
if err != nil {
respChan <- &CallResponse{resp, err}
} else {
respChan <- &CallResponse{resp, nil}
}
return
}()
return respChan
}
func (r *TimeoutReader) ReadLineCtx(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return "", ErrTimeout
case respChan := <-helper(r.b):
return respChan.Resp, respChan.Err
}
}
func (r *TimeoutReader) ReadLine() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.t)
defer cancel()
s, err := r.ReadLineCtx(ctx)
if err != nil {
return "", err
}
return s, nil
}
最后,这是调用带有超时的ReadLine
的main
代码:
package main
import (
"bytes"
"fmt"
"io"
"os"
"os/exec"
"sync"
"time"
"watcher/timeout_io"
)
func main() {
var stdOut bytes.Buffer
var stdErr bytes.Buffer
runCommand := &exec.Cmd{
Path: "./slowroll",
Stdout: &stdOut,
Stderr: &stdErr,
}
var wg sync.WaitGroup
go func(wg *sync.WaitGroup) {
defer wg.Done()
err := runCommand.Run()
if err != nil {
fmt.Println("failed due to error:", err)
os.Exit(1)
}
}(&wg)
wg.Add(1)
stdOutReader := timeout_io.NewTimeoutReader(&stdOut)
stdOutReader.SetTimeout(10 * time.Millisecond)
index := 1
for {
s, err := stdOutReader.ReadLine()
if err != nil {
if err != timeout_io.ErrTimeout && err != io.EOF {
fmt.Println("ReadLine got error", err)
break
}
} else if len(s) > 0 {
fmt.Println("index:", index, "s:", s)
index += 1
s = ""
}
}
wg.Wait()
fmt.Println("Done!")
}
运行时,它产生以下输出:
> go run watcher.go
index: 1 s: This is line 1
index: 2 s: This is line 2
index: 3 s: This is line 2
index: 4 s: This is line 3
index: 5 s: This is line 2
index: 6 s: This is line 3
index: 7 s: This is line 4
index: 8 s: This is line 2
index: 9 s: This is line 3
index: 10 s: This is line 4
index: 11 s: This is line 5
偶尔,一些slowroll
的输出行根本不显示;重复出现的行是随机的。
这就是我的问题...我看不到导致行被多次输出的(明显的)循环在哪里。
非常感谢提前的帮助!
英文:
I'd like to do something like unix's tail -f
, but on the output produced by a process run through Go's Cmd
facility.
My google-fu is not up to par, evidently, but I did find this article which lead me to write the following code, which almost works, with a bizarre twist I'm hoping I can get help with.
If it matters, I'm running this on a Mac.
First, here's the minimal program that's compiled to be the slowroll
executable:
package main
import (
"fmt"
"time"
)
func main() {
line := 1
for {
fmt.Println("This is line", line)
line += 1
time.Sleep(2 * time.Second)
}
}
When run, it produces the following output, one line every 2 seconds:
> ./slowroll
This is line 1
This is line 2
This is line 3
This is line 4
And so on.
Here's the package code that attempts to read this, but allowing timeouts so other things can be done:
package timeout_io
import (
"bufio"
"bytes"
"context"
"errors"
"time"
)
const BufferSize = 4096
var ErrTimeout = errors.New("timeout")
type TimeoutReader struct {
b *bufio.Reader
t time.Duration
}
func NewTimeoutReader(stdOut *bytes.Buffer) *TimeoutReader {
return &TimeoutReader{b: bufio.NewReaderSize(stdOut, BufferSize), t: 0}
}
func (r *TimeoutReader) SetTimeout(t time.Duration) time.Duration {
prev := r.t
r.t = t
return prev
}
type CallResponse struct {
Resp string
Err error
}
func helper(r *bufio.Reader) <-chan *CallResponse {
respChan := make(chan *CallResponse, 1)
go func() {
resp, err := r.ReadString('\n')
if err != nil {
respChan <- &CallResponse{resp, err}
} else {
respChan <- &CallResponse{resp, nil}
}
return
}()
return respChan
}
func (r *TimeoutReader) ReadLineCtx(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return "", ErrTimeout
case respChan := <-helper(r.b):
return respChan.Resp, respChan.Err
}
}
func (r *TimeoutReader) ReadLine() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.t)
defer cancel()
s, err := r.ReadLineCtx(ctx)
if err != nil {
return "", err
}
return s, nil
}
Finally, here's the main
code that calls ReadLine
with timeout:
package main
import (
"bytes"
"fmt"
"io"
"os"
"os/exec"
"sync"
"time"
"watcher/timeout_io"
)
func main() {
var stdOut bytes.Buffer
var stdErr bytes.Buffer
runCommand := &exec.Cmd{
Path: "./slowroll",
Stdout: &stdOut,
Stderr: &stdErr,
}
var wg sync.WaitGroup
go func(wg *sync.WaitGroup) {
defer wg.Done()
err := runCommand.Run()
if err != nil {
fmt.Println("failed due to error:", err)
os.Exit(1)
}
}(&wg)
wg.Add(1)
stdOutReader := timeout_io.NewTimeoutReader(&stdOut)
stdOutReader.SetTimeout(10 * time.Millisecond)
index := 1
for {
s, err := stdOutReader.ReadLine()
if err != nil {
if err != timeout_io.ErrTimeout && err != io.EOF {
fmt.Println("ReadLine got error", err)
break
}
} else if len(s) > 0 {
fmt.Println("index: ", index, " s: ", s)
index += 1
s = ""
}
}
wg.Wait()
fmt.Println("Done!")
}
When run, it produces the following output:
> go run watcher.go
index: 1 s: This is line 1
index: 2 s: This is line 2
index: 3 s: This is line 2
index: 4 s: This is line 3
index: 5 s: This is line 2
index: 6 s: This is line 3
index: 7 s: This is line 4
index: 8 s: This is line 2
index: 9 s: This is line 3
index: 10 s: This is line 4
index: 11 s: This is line 5
And so on.
Occasionally, some slowroll
output lines don't show up at all; which lines get repeated is random.
So that's my mystery... I don't see where the (apparent) loop is happening that causes the lines to be produced multiple times.
Thanks very much in advance for any help!
答案1
得分: 1
通过创建一个管道并从该管道读取来简化代码:
cmd := exec.Command("./slowroll")
stdout, _ := cmd.StdoutPipe()
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
s := bufio.NewScanner(stdout)
for s.Scan() {
fmt.Printf("%s\n", s.Bytes())
}
如果你的目标是监视stderr和stdin的组合输出,则可以同时使用同一个管道:
cmd := exec.Command("./slowroll")
combined, _ := cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout // <-- 使用stdout管道作为stderr
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
s := bufio.NewScanner(combined)
for s.Scan() {
fmt.Printf("%s\n", s.Bytes())
}
问题中的代码在stdOut bytes.Buffer上存在数据竞争。
英文:
Simplify the code by creating a pipe and reading from that pipe:
cmd := exec.Command("./slowroll")
stdout, _ := cmd.StdoutPipe()
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
s := bufio.NewScanner(stdout)
for s.Scan() {
fmt.Printf("%s\n", s.Bytes())
}
If your goal is to monitor the combined output of stderr and stdin, then use the same pipe for both:
cmd := exec.Command("./slowroll")
combined, _ := cmd.StdoutPipe()
cmd.Stderr = cmd.Stdout // <-- use stdout pipe for stderr
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
s := bufio.NewScanner(combined)
for s.Scan() {
fmt.Printf("%s\n", s.Bytes())
}
The code in the question has a data race on the stdOut bytes.Buffer.
答案2
得分: 0
如果出现这样的条件,ErrTimeout
将被静默忽略,并且不会中断你的读取循环。
还要注意,达到 io.EOF
会使你的程序进入无限循环(尝试使用 echo "Hello"
而不是 ./slowroll
作为命令)。
你可能想在 if 块之后放置 break
指令:
if err != timeout_io.ErrTimeout && err != io.EOF {
fmt.Println("ReadLine got error", err)
}
break
英文:
if err != timeout_io.ErrTimeout && err != io.EOF { ...; break; }
With such a condition, an ErrTimeout
will be silently ignored and will not interrupt your reading loop.
Also note that reaching io.EOF
would send your program in an endless loop (try using echo "Hello"
instead of ./slowroll
as a command).
You probably want to place the break
instruction after the if block :
if err != timeout_io.ErrTimeout && err != io.EOF {
fmt.Println("ReadLine got error", err)
}
break
答案3
得分: 0
昨晚意识到我在与Go的标准行为作斗争。
应该解释一下目标是能够同时监视标准输出和标准错误输出。
根据@Zombo的建议,我切换到了cmd.StdoutPipe
和cmd.StderrPipe
。
主要思路是创建读取管道并将找到的内容放入通道的goroutine,然后在通道之间使用select
语句。
所以为了展示EOF不会导致无限循环,slowroll.go
不会产生无限输出:
package main
import (
"fmt"
"os"
"time"
)
func main() {
line := 1
for {
fmt.Println("This is line", line)
line += 1
time.Sleep(2 * time.Second)
if line%3 == 0 {
fmt.Fprintf(os.Stderr, "This is error %d\n", line)
}
if line > 10 {
break
}
}
}
而更简单、可行的watcher.go
现在是这样的:
package main
import (
"bufio"
"fmt"
"os"
"os/exec"
"sync"
)
func main() {
runCommand := &exec.Cmd{
Path: "./slowroll",
}
stdOut, err := runCommand.StdoutPipe()
if err != nil {
fmt.Println("无法创建StdoutPipe:", err)
os.Exit(1)
}
stdErr, err := runCommand.StderrPipe()
if err != nil {
fmt.Println("无法创建StderrPipe:", err)
os.Exit(1)
}
var wg sync.WaitGroup
go func(wg *sync.WaitGroup) {
defer wg.Done()
err := runCommand.Run()
if err != nil {
fmt.Println("由于错误而失败:", err)
os.Exit(1)
}
}(&wg)
wg.Add(1)
stdOutChan := make(chan string, 1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
scanner := bufio.NewScanner(stdOut)
for scanner.Scan() {
stdOutChan <- string(scanner.Bytes())
}
fmt.Println("stdout输入已用完,读取线程退出。")
close(stdOutChan)
}(&wg)
wg.Add(1)
stdErrChan := make(chan string, 1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
scanner := bufio.NewScanner(stdErr)
for scanner.Scan() {
stdErrChan <- string(scanner.Bytes())
}
fmt.Println("stderr输入已用完,读取线程退出。")
close(stdErrChan)
}(&wg)
wg.Add(1)
index := 1
keepGoing := true
for keepGoing {
select {
case res, isOpen := <-stdOutChan:
if !isOpen {
fmt.Println("stdOutChan已关闭,主线程退出。")
keepGoing = false
} else {
fmt.Println(index, "s:", res)
index += 1
}
case res, isOpen := <-stdErrChan:
if !isOpen {
fmt.Println("stdErrChan已关闭,主线程退出。")
keepGoing = false
} else {
fmt.Println(index, "error s:", res)
index += 1
}
}
}
wg.Wait()
fmt.Println("完成!")
}
输出:
> go run watcher.go
1 s: This is line 1
2 s: This is line 2
3 error s: This is error 3
4 s: This is line 3
5 s: This is line 4
6 s: This is line 5
7 s: This is line 6
8 error s: This is error 6
9 s: This is line 7
10 s: This is line 8
11 s: This is line 9
12 error s: This is error 9
13 s: This is line 10
stdout输入已用完,读取线程退出。
stdOutChan已关闭,主线程退出。
stderr输入已用完,读取线程退出。
完成!
显然,它可以进行一些重构,但它能够正常工作,这就是目标。
谢谢!
英文:
Realized late last night that I was kind of fighting go's standard behavior.
Should have explained that the goal was to be able to watch stdout and stderr at the same time.
Taking @Zombo's advice above, I switched to cmd.StdoutPipe
and cmd.StderrPipe
.
The main idea is to just have goroutines that read the pipes and put content found into channels, and then select
between the channels.
So slowroll.go
does not produce infinite output, to show that EOF doesn't cause a infinite loop:
package main
import (
"fmt"
"os"
"time"
)
func main() {
line := 1
for {
fmt.Println("This is line", line)
line += 1
time.Sleep(2 * time.Second)
if line%3 == 0 {
fmt.Fprintf(os.Stderr, "This is error %d\n", line)
}
if line > 10 {
break
}
}
}
And the simpler, working watcher.go
is now:
package main
import (
"bufio"
"fmt"
"os"
"os/exec"
"sync"
)
func main() {
runCommand := &exec.Cmd{
Path: "./slowroll",
}
stdOut, err := runCommand.StdoutPipe()
if err != nil {
fmt.Println("Can't create StdoutPipe:", err)
os.Exit(1)
}
stdErr, err := runCommand.StderrPipe()
if err != nil {
fmt.Println("Can't create StderrPipe:", err)
os.Exit(1)
}
var wg sync.WaitGroup
go func(wg *sync.WaitGroup) {
defer wg.Done()
err := runCommand.Run()
if err != nil {
fmt.Println("failed due to error:", err)
os.Exit(1)
}
}(&wg)
wg.Add(1)
stdOutChan := make(chan string, 1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
scanner := bufio.NewScanner(stdOut)
for scanner.Scan() {
stdOutChan <- string(scanner.Bytes())
}
fmt.Println("Ran out of stdout input, read thread bailing.")
close(stdOutChan)
}(&wg)
wg.Add(1)
stdErrChan := make(chan string, 1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
scanner := bufio.NewScanner(stdErr)
for scanner.Scan() {
stdErrChan <- string(scanner.Bytes())
}
fmt.Println("Ran out of stderr input, read thread bailing.")
close(stdErrChan)
}(&wg)
wg.Add(1)
index := 1
keepGoing := true
for keepGoing {
select {
case res, isOpen := <-stdOutChan:
if !isOpen {
fmt.Println("stdOutChan is no longer open, main bailing.")
keepGoing = false
} else {
fmt.Println(index, "s:", res)
index += 1
}
case res, isOpen := <-stdErrChan:
if !isOpen {
fmt.Println("stdErrChan is no longer open, main bailing.")
keepGoing = false
} else {
fmt.Println(index, "error s:", res)
index += 1
}
}
}
wg.Wait()
fmt.Println("Done!")
}
Output:
> go run watcher.go
1 s: This is line 1
2 s: This is line 2
3 error s: This is error 3
4 s: This is line 3
5 s: This is line 4
6 s: This is line 5
7 s: This is line 6
8 error s: This is error 6
9 s: This is line 7
10 s: This is line 8
11 s: This is line 9
12 error s: This is error 9
13 s: This is line 10
Ran out of stdout input, read thread bailing.
stdOutChan is no longer open, main bailing.
Ran out of stderr input, read thread bailing.
Done!
Could stand with some refactoring, obviously, but it works, which is the goal.
Thanks!
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论