Go io.Pipe的缓冲版本

huangapple go评论89阅读模式
英文:

Buffered version of Go io.Pipe

问题

在我自己动手之前,是否有io.Pipe的缓冲版本(无论是标准库还是第三方库)?

背景:我正在尝试使用这个解决方案来解析使用bzip2压缩的JSON数据,以便并行进行解压缩和解析,但发现加速效果非常小。解析未压缩的数据每百万条记录需要约22秒。解压缩这么多数据需要大约相同的时间。在单个线程上执行它们需要约44秒,这是预期的。使用上述解决方案需要约41秒。

io.Pipe的文档中写道:

管道上的读取和写入是一对一匹配的,除非需要多次读取来消耗单个写入。也就是说,每个写入到PipeWriter都会阻塞,直到PipeReader满足了一个或多个完全消耗写入数据的读取。数据直接从写入到相应的读取(或读取)中复制;没有内部缓冲。

我怀疑这可能是个问题,取决于bzip2解压缩器写入数据的方式和JSON解析器读取数据的方式,所以我想尝试一个带缓冲的版本。

英文:

Is there a buffered version of io.Pipe (either in the standard library or a third-party library) before I roll my own?

Context: I'm trying to use this solution for parsing of JSON data compressed with bzip2, so that the decompression and parsing happen in parallel, but finding that the speed-up is very small. Parsing uncompressed data takes ~22 sec per million records. Decompressing that much data takes about the same time. Doing them on a single thread takes ~44 seconds, as expected. Using the solution above takes ~41 seconds.

The documentation for io.Pipe says:

> Reads and Writes on the pipe are matched one to one except when
> multiple Reads are needed to consume a single Write. That is, each
> Write to the PipeWriter blocks until it has satisfied one or more
> Reads from the PipeReader that fully consume the written data. The
> data is copied directly from the Write to the corresponding Read (or
> Reads); there is no internal buffering.

I suspect this could be a problem, depending on the way the bzip2 decompressor writes data and the way the JSON parser reads it, so I'd like to try a buffered version.

答案1

得分: 7

这就是bufio包的作用。它允许你使用NewReader将任何io.Reader转换为带缓冲的读取器,或者使用NewWriter将任何io.Writer转换为带缓冲的写入器。

(至于带缓冲的IO是否能够真正解决你的具体问题,我不清楚...)

英文:

That's what the bufio package is for. It lets you turn any io.Reader into a buffered reader with NewReader, or any io.Writer into a buffered writer with NewWriter.

(Whether buffered IO will actually help with your specific problem, I have no idea...)

答案2

得分: 1

如果你想要异步操作,其中读取是在一个单独的 goroutine 中完成的,你也可以使用我几年前制作的 readahead

https://github.com/klauspost/readahead

示例:

ra := readahead.NewReader(input)
defer r.Close()
pr, pw := nio.Pipe(ra)

使用默认设置,它将读取最多四个1MB的缓冲区,并在管道准备好时发送。

英文:

If you want async operation, where reads are done on a separate goroutine you can also use a readahead I made a few years ago:

https://github.com/klauspost/readahead

Example:

ra := readahead.NewReader(input)
defer r.Close()
pr, pw := nio.Pipe(ra)

With default settings It will read up to four 1MB buffers that will be sent as the pipe is ready for it.

答案3

得分: 0

你可以使用来自https://github.com/djherbis/nio的缓冲管道,像这样:

import (
    "github.com/djherbis/buffer"
    "github.com/djherbis/nio"
)

...

b := buffer.New(32*1024)
pr, pw := nio.Pipe(b)

...
英文:

You can use a buffered pipe from https://github.com/djherbis/nio like this:

import (
    "github.com/djherbis/buffer"
    "github.com/djherbis/nio"
)

...

b := buffer.New(32*1024)
pr, pw := nio.Pipe(b)

...

答案4

得分: 0

我的https://github.com/acomagu/bufpipe有一个简单的接口,并且工作得很好。

r, w := bufpipe.New(nil)
io.WriteString(w, "abc") // 不会阻塞。
io.WriteString(w, "def") // 也不会阻塞。
w.Close()
io.Copy(os.Stdout, r)
// 输出:abcdef

Playground

英文:

My https://github.com/acomagu/bufpipe has simple interface and works well.

r, w := bufpipe.New(nil)
io.WriteString(w, "abc") // No blocking.
io.WriteString(w, "def") // No blocking, too.
w.Close()
io.Copy(os.Stdout, r)
// Output: abcdef

Playground

huangapple
  • 本文由 发表于 2017年7月14日 03:14:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/45089248.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定