Golang从管道读取大量数据。

huangapple go评论84阅读模式
英文:

Golang read from pipe reads tons of data

问题

我正在尝试读取一个正在被tar打包的存档,通过流式传输到stdin,但是我读取到的管道中的数据量远远超过tar发送的数据量。

我像这样运行我的命令:

tar -cf - somefolder | ./my-go-binary

源代码如下:

package main

import (
	"bufio"
	"io"
	"log"
	"os"
)

// 从标准输入读取数据
func main() {
	reader := bufio.NewReader(os.Stdin)
	// 从stdin读取所有数据,将后续读取的数据处理为块。
	parts := 0
	for {
		parts++
		data := make([]byte, 4<<20) // 每次读取4MB
		_, err := reader.Read(data)
		if err == io.EOF {
			break
		} else if err != nil {
			log.Fatalf("从输入中读取数据时出现问题:%s", err)
		}
	}
	log.Printf("处理的总块数:%d\n", parts)
}

对于一个100MB的tar文件夹,我得到了1468个4MB的块(总共6.15GB)!而且,无论data []byte数组的大小如何,都没有影响:如果我将块大小设置为40MB,我仍然会得到大约1400个40MB的数据块,这一点毫无意义。

在Go中,我需要做些什么才能正确地从os.Stdin读取数据吗?

英文:

I'm trying to read an archive that's being tarred, streaming, to stdin, but I'm somehow reading far more data in the pipe than tar is sending.

I run my command like this:

tar -cf - somefolder | ./my-go-binary

The source code is like this:

package main

import (
	&quot;bufio&quot;
	&quot;io&quot;
	&quot;log&quot;
	&quot;os&quot;
)

// Read from standard input
func main() {
	reader := bufio.NewReader(os.Stdin)
	// Read all data from stdin, processing subsequent reads as chunks.
	parts := 0
	for {
		parts++
		data := make([]byte, 4&lt;&lt;20) // Read 4MB at a time
		_, err := reader.Read(data)
		if err == io.EOF {
			break
		} else if err != nil {
			log.Fatalf(&quot;Problems reading from input: %s&quot;, err)
		}
	}
	log.Printf(&quot;Total parts processed: %d\n&quot;, parts)
}

For a 100MB tarred folder, I'm getting 1468 chunks of 4MB (that's 6.15GB)! Further, it doesn't seem to matter how large the data []byte array is: if I set the chunk size to 40MB, I still get ~1400 chunks of 40MB data, which makes no sense at all.

Is there something I need to do to read data from os.Stdin properly with Go?

答案1

得分: 42

你的代码效率不高。它在每次循环中都会分配和初始化data

for {
    data := make([]byte, 4<<20) // 每次读取4MB
}

你的reader作为io.Reader的代码是错误的。例如,你忽略了_, err := reader.Read(data)读取的字节数,并且没有正确处理err错误。

io 包

import "io"

Reader 类型

type Reader interface {
        Read(p []byte) (n int, err error)
}

Reader 是包装基本 Read 方法的接口。

Read 将最多 len(p) 个字节读入 p 中。它返回读取的字节数 n(0 <= n <= len(p))和任何遇到的错误。即使 Read 返回的 n < len(p),它也可能在调用期间使用 p 的全部作为临时存储空间。如果有一些数据可用但不足 len(p) 个字节,Read 通常会返回可用的数据,而不是等待更多数据。

当 Read 在成功读取 n > 0 个字节后遇到错误或文件结束条件时,它返回读取的字节数。它可能从同一次调用返回非 nil 错误,也可能从后续调用返回错误(和 n == 0)。这种一般情况的一个实例是,在输入流的末尾返回非零字节数的 Reader 可能返回 err == EOF 或 err == nil。下一次 Read 应该返回 0, EOF。

调用者应始终在考虑错误 err 之前处理返回的 n > 0 个字节。这样做可以正确处理在读取一些字节后发生的 I/O 错误,以及两种允许的 EOF 行为。

Read 的实现不应返回零字节计数和 nil 错误,除非 len(p) == 0。调用者应将返回的 0 和 nil 视为表示什么都没有发生;特别是它不表示 EOF。

实现不得保留 p。

下面是一个符合io.Reader接口的模型文件读取程序:

package main

import (
	"bufio"
	"io"
	"log"
	"os"
)

func main() {
	nBytes, nChunks := int64(0), int64(0)
	r := bufio.NewReader(os.Stdin)
	buf := make([]byte, 0, 4*1024)
	for {
		n, err := r.Read(buf[:cap(buf)])
		buf = buf[:n]
		if n == 0 {
			if err == nil {
				continue
			}
			if err == io.EOF {
				break
			}
			log.Fatal(err)
		}
		nChunks++
		nBytes += int64(len(buf))
		// 处理 buf
		if err != nil && err != io.EOF {
			log.Fatal(err)
		}
	}
	log.Println("Bytes:", nBytes, "Chunks:", nChunks)
}

输出结果:

2014/11/29 10:00:05 Bytes: 5589891 Chunks: 1365
英文:

Your code is inefficient. It's allocating and initializing data each time through the loop.

for {
    data := make([]byte, 4&lt;&lt;20) // Read 4MB at a time
}

The code for your reader as an io.Reader is wrong. For example, you ignore the number of bytes read by _, err := reader.Read(data) and you don't handle err errors properly.

> Package io
>
> import "io"
>
> type Reader
>
> type Reader interface {
> Read(p []byte) (n int, err error)
> }
>
> Reader is the interface that wraps the basic Read method.
>
> Read reads up to len(p) bytes into p. It returns the number of bytes
> read (0 <= n <= len(p)) and any error encountered. Even if Read
> returns n < len(p), it may use all of p as scratch space during the
> call. If some data is available but not len(p) bytes, Read
> conventionally returns what is available instead of waiting for more.
>
> When Read encounters an error or end-of-file condition after
> successfully reading n > 0 bytes, it returns the number of bytes read.
> It may return the (non-nil) error from the same call or return the
> error (and n == 0) from a subsequent call. An instance of this general
> case is that a Reader returning a non-zero number of bytes at the end
> of the input stream may return either err == EOF or err == nil. The
> next Read should return 0, EOF regardless.
>
> Callers should always process the n > 0 bytes returned before
> considering the error err. Doing so correctly handles I/O errors that
> happen after reading some bytes and also both of the allowed EOF
> behaviors.
>
> Implementations of Read are discouraged from returning a zero byte
> count with a nil error, except when len(p) == 0. Callers should treat
> a return of 0 and nil as indicating that nothing happened; in
> particular it does not indicate EOF.
>
> Implementations must not retain p.

Here's a model file read program that conforms to the io.Reader interface:

package main

import (
	&quot;bufio&quot;
	&quot;io&quot;
	&quot;log&quot;
	&quot;os&quot;
)

func main() {
	nBytes, nChunks := int64(0), int64(0)
	r := bufio.NewReader(os.Stdin)
	buf := make([]byte, 0, 4*1024)
	for {
		n, err := r.Read(buf[:cap(buf)])
		buf = buf[:n]
		if n == 0 {
			if err == nil {
				continue
			}
			if err == io.EOF {
				break
			}
			log.Fatal(err)
		}
		nChunks++
		nBytes += int64(len(buf))
		// process buf
		if err != nil &amp;&amp; err != io.EOF {
			log.Fatal(err)
		}
	}
	log.Println(&quot;Bytes:&quot;, nBytes, &quot;Chunks:&quot;, nChunks)
}

Output:

<pre>
2014/11/29 10:00:05 Bytes: 5589891 Chunks: 1365
</pre>

答案2

得分: 8

请阅读Read函数的文档:

Read函数将数据读入p中,并返回读入p的字节数。它最多在底层Reader上调用一次Read,因此n可能小于len(p)。在EOF时,计数将为零,err将为io.EOF。

你并不是一次读取4MB的数据。你提供了缓冲区空间,并且丢弃了告诉你Read实际读取了多少字节的整数。缓冲区空间是最大的,但通常每次调用时只读取了128k的数据,至少在我的系统上是这样。你可以自己试一试:

// 从标准输入读取
func main() {
reader := bufio.NewReader(os.Stdin)
// 从stdin读取所有数据,将数据作为部分传入通道进行处理。
parts := 0
for {
parts++
data := make([]byte, 4<<20) // 每次读取4MB
amount , err := reader.Read(data)
// 不会是4MB!
log.Printf("Read: %v\n", amount)
if err == io.EOF {
break
} else if err != nil {
log.Fatalf("从输入中读取出现问题:%s", err)
}
}
log.Printf("处理的总部分数:%d\n", parts)
}

你需要实现处理不同读取量的逻辑。

英文:

Read the documentation for Read:

> Read reads data into p. It returns the number of bytes read into p. It
> calls Read at most once on the underlying Reader, hence n may be less
> than len(p). At EOF, the count will be zero and err will be io.EOF.

You are not reading 4MB at a time. You are providing buffer space and discarding the integer that would have told you how much the Read actually read. The buffer space is the maximum, but most usually 128k seems to get read per call, at least on my system. Try it out yourself:

// Read from standard input
func main() {
    reader := bufio.NewReader(os.Stdin)
    // Read all data from stdin, passing the data as parts into the channel
    // for processing.
    parts := 0
    for {
        parts++
        data := make([]byte, 4&lt;&lt;20) // Read 4MB at a time
        amount , err := reader.Read(data)
        // WILL NOT BE 4MB!
        log.Printf(&quot;Read: %v\n&quot;, amount)
        if err == io.EOF {
            break
        } else if err != nil {
            log.Fatalf(&quot;Problems reading from input: %s&quot;, err)
        }
    }
    log.Printf(&quot;Total parts processed: %d\n&quot;, parts)
}

You have to implement the logic for handling the varying read amounts.

huangapple
  • 本文由 发表于 2014年11月29日 05:34:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/27196195.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定