多个Docker容器日志

huangapple go评论80阅读模式
英文:

Multiple docker container logs

问题

我正在尝试一次性获取多个Docker容器的日志(顺序无关紧要)。如果将types.ContainerLogsOption.Follow设置为false,这将按预期工作。

如果将types.ContainerLogsOption.Follow设置为true,有时日志输出会在几条日志后停止,并且没有后续日志打印到stdout。

如果输出没有停止,它将按预期工作。

此外,如果重新启动一个或所有容器,该命令不会像docker logs -f containerName那样退出。

基本上,我的实现与docker logs https://github.com/docker/docker/blob/master/cli/command/container/logs.go 没有太大的区别,因此我想知道是什么原因导致了这些问题。

英文:

I'm trying to get the logs from multiple docker containers at once (order doesn't matter). This works as expected if types.ContainerLogsOption.Follow is set to false.

If types.ContainerLogsOption.Follow is set to true sometimes the log output get stuck after a few logs and no follow up logs are printed to stdout.

If the output doesn't get stuck it works as expected.

Additionally if I restart one or all of the containers the command doesn't exit like docker logs -f containerName does.

func (w *Whatever) Logs(options LogOptions) {
	readers := []io.Reader{}

	for _, container := range options.Containers {
		responseBody, err := w.Docker.Client.ContainerLogs(context.Background(), container, types.ContainerLogsOptions{
			ShowStdout: true,
			ShowStderr: true,
			Follow:     options.Follow,
		})
		defer responseBody.Close()

		if err != nil {
			log.Fatal(err)
		}
		readers = append(readers, responseBody)
	}

	// concatenate all readers to one
	multiReader := io.MultiReader(readers...)

	_, err := stdcopy.StdCopy(os.Stdout, os.Stderr, multiReader)
	if err != nil && err != io.EOF {
		log.Fatal(err)
	}
}

Basically there is no great difference in my implementation from that of docker logs https://github.com/docker/docker/blob/master/cli/command/container/logs.go, hence I'm wondering what causes this issues.

答案1

得分: 1

正如JimB评论的那样,由于io.MultiReader的操作方式,该方法不起作用。你需要做的是从每个响应中单独读取并组合输出。由于你处理的是日志,按照换行符进行读取是有意义的。bufio.Scanner可以为单个io.Reader完成这个任务。因此,一种选择是创建一个新类型,可以同时扫描多个读取器。

你可以像这样使用它:

scanner := NewConcurrentScanner(readers...)
for scanner.Scan() {
    fmt.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
    log.Fatalln(err)
}

下面是一个并发扫描器的示例实现:

// ConcurrentScanner的工作方式类似于io.Scanner,但可以处理多个io.Reader
type ConcurrentScanner struct {
    scans  chan []byte   // 来自读取器的扫描数据
    errors chan error    // 读取器的错误
    done   chan struct{} // 所有读取器完成的信号
    cancel func()        // 取消所有读取器(在第一个错误时停止)

    data []byte // 最后一次扫描的值
    err  error
}

// NewConcurrentScanner在单独的goroutine中开始扫描每个读取器,并返回*ConcurrentScanner。
func NewConcurrentScanner(readers ...io.Reader) *ConcurrentScanner {
    ctx, cancel := context.WithCancel(context.Background())
    s := &ConcurrentScanner{
        scans:  make(chan []byte),
        errors: make(chan error),
        done:   make(chan struct{}),
        cancel: cancel,
    }

    var wg sync.WaitGroup
    wg.Add(len(readers))

    for _, reader := range readers {
        // 在自己的goroutine中为每个读取器启动一个扫描器。
        go func(reader io.Reader) {
            defer wg.Done()
            scanner := bufio.NewScanner(reader)

            for scanner.Scan() {
                select {
                case s.scans <- scanner.Bytes():
                    // 只要有数据,就将其发送到s.scans,
                    // 这将阻塞,直到调用Scan()。
                case <-ctx.Done():
                    // 当上下文被取消时触发,
                    // 表示我们现在应该退出。
                    return
                }
            }
            if err := scanner.Err(); err != nil {
                select {
                case s.errors <- err:
                    // 报告我们遇到了错误
                case <-ctx.Done():
                    // 如果上下文被取消,则立即退出,否则发送
                    // 错误和此goroutine将永远不会退出。
                    return
                }
            }
        }(reader)
    }

    go func() {
        // 发出所有扫描器已完成的信号
        wg.Wait()
        close(s.done)
    }()

    return s
}

func (s *ConcurrentScanner) Scan() bool {
    select {
    case s.data = <-s.scans:
        // 从扫描器获取数据
        return true
    case <-s.done:
        // 所有扫描器都完成,无事可做。
    case s.err = <-s.errors:
        // 其中一个扫描器出错,我们完成了。
    }
    s.cancel() // 无论如何都取消上下文。
    return false
}

func (s *ConcurrentScanner) Bytes() []byte {
    return s.data
}

func (s *ConcurrentScanner) Text() string {
    return string(s.data)
}

func (s *ConcurrentScanner) Err() error {
    return s.err
}

这是它在Go Playground中工作的示例:https://play.golang.org/p/EUB0K2V7iT

你可以看到并发扫描器的输出是交错的。与io.MultiReader一次读取一个读取器的所有内容不同。

英文:

As JimB commented, that method won't work due to the operation of io.MultiReader. What you need to do is read from each from each response individually and combine the output. Since you're dealing with logs, it would make sense to break up the reads on newlines. bufio.Scanner does this for a single io.Reader. So one option would be to create a new type that scans multiple readers concurrently.

You could use it like this:

scanner := NewConcurrentScanner(readers...)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
log.Fatalln(err)
}

Example implementation of a concurrent scanner:

// ConcurrentScanner works like io.Scanner, but with multiple io.Readers
type ConcurrentScanner struct {
scans  chan []byte   // Scanned data from readers
errors chan error    // Errors from readers
done   chan struct{} // Signal that all readers have completed
cancel func()        // Cancel all readers (stop on first error)
data []byte // Last scanned value
err  error
}
// NewConcurrentScanner starts scanning each reader in a separate goroutine
// and returns a *ConcurrentScanner.
func NewConcurrentScanner(readers ...io.Reader) *ConcurrentScanner {
ctx, cancel := context.WithCancel(context.Background())
s := &amp;ConcurrentScanner{
scans:  make(chan []byte),
errors: make(chan error),
done:   make(chan struct{}),
cancel: cancel,
}
var wg sync.WaitGroup
wg.Add(len(readers))
for _, reader := range readers {
// Start a scanner for each reader in it&#39;s own goroutine.
go func(reader io.Reader) {
defer wg.Done()
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
select {
case s.scans &lt;- scanner.Bytes():
// While there is data, send it to s.scans,
// this will block until Scan() is called.
case &lt;-ctx.Done():
// This fires when context is cancelled,
// indicating that we should exit now.
return
}
}
if err := scanner.Err(); err != nil {
select {
case s.errors &lt;- err:
// Reprort we got an error
case &lt;-ctx.Done():
// Exit now if context was cancelled, otherwise sending
// the error and this goroutine will never exit.
return
}
}
}(reader)
}
go func() {
// Signal that all scanners have completed
wg.Wait()
close(s.done)
}()
return s
}
func (s *ConcurrentScanner) Scan() bool {
select {
case s.data = &lt;-s.scans:
// Got data from a scanner
return true
case &lt;-s.done:
// All scanners are done, nothing to do.
case s.err = &lt;-s.errors:
// One of the scanners error&#39;d, were done.
}
s.cancel() // Cancel context regardless of how we exited.
return false
}
func (s *ConcurrentScanner) Bytes() []byte {
return s.data
}
func (s *ConcurrentScanner) Text() string {
return string(s.data)
}
func (s *ConcurrentScanner) Err() error {
return s.err
}

Here's an example of it working in the Go Playground: https://play.golang.org/p/EUB0K2V7iT

You can see that the concurrent scanner output is interleaved. Rather than reading all of one reader, then moving on to the next, as is seen with io.MultiReader.

huangapple
  • 本文由 发表于 2016年11月3日 04:24:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/40389137.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定