英文:
Buffered version of Go io.Pipe
问题
在我自己动手之前,是否有io.Pipe的缓冲版本(无论是标准库还是第三方库)?
背景:我正在尝试使用这个解决方案来解析使用bzip2压缩的JSON数据,以便并行进行解压缩和解析,但发现加速效果非常小。解析未压缩的数据每百万条记录需要约22秒。解压缩这么多数据需要大约相同的时间。在单个线程上执行它们需要约44秒,这是预期的。使用上述解决方案需要约41秒。
io.Pipe的文档中写道:
管道上的读取和写入是一对一匹配的,除非需要多次读取来消耗单个写入。也就是说,每个写入到PipeWriter都会阻塞,直到PipeReader满足了一个或多个完全消耗写入数据的读取。数据直接从写入到相应的读取(或读取)中复制;没有内部缓冲。
我怀疑这可能是个问题,取决于bzip2解压缩器写入数据的方式和JSON解析器读取数据的方式,所以我想尝试一个带缓冲的版本。
英文:
Is there a buffered version of io.Pipe (either in the standard library or a third-party library) before I roll my own?
Context: I'm trying to use this solution for parsing of JSON data compressed with bzip2, so that the decompression and parsing happen in parallel, but finding that the speed-up is very small. Parsing uncompressed data takes ~22 sec per million records. Decompressing that much data takes about the same time. Doing them on a single thread takes ~44 seconds, as expected. Using the solution above takes ~41 seconds.
The documentation for io.Pipe says:
> Reads and Writes on the pipe are matched one to one except when
> multiple Reads are needed to consume a single Write. That is, each
> Write to the PipeWriter blocks until it has satisfied one or more
> Reads from the PipeReader that fully consume the written data. The
> data is copied directly from the Write to the corresponding Read (or
> Reads); there is no internal buffering.
I suspect this could be a problem, depending on the way the bzip2 decompressor writes data and the way the JSON parser reads it, so I'd like to try a buffered version.
答案1
得分: 7
这就是bufio
包的作用。它允许你使用NewReader
将任何io.Reader
转换为带缓冲的读取器,或者使用NewWriter
将任何io.Writer
转换为带缓冲的写入器。
(至于带缓冲的IO是否能够真正解决你的具体问题,我不清楚...)
英文:
That's what the bufio
package is for. It lets you turn any io.Reader
into a buffered reader with NewReader
, or any io.Writer
into a buffered writer with NewWriter
.
(Whether buffered IO will actually help with your specific problem, I have no idea...)
答案2
得分: 1
如果你想要异步操作,其中读取是在一个单独的 goroutine 中完成的,你也可以使用我几年前制作的 readahead
:
https://github.com/klauspost/readahead
示例:
ra := readahead.NewReader(input)
defer r.Close()
pr, pw := nio.Pipe(ra)
使用默认设置,它将读取最多四个1MB的缓冲区,并在管道准备好时发送。
英文:
If you want async operation, where reads are done on a separate goroutine you can also use a readahead
I made a few years ago:
https://github.com/klauspost/readahead
Example:
ra := readahead.NewReader(input)
defer r.Close()
pr, pw := nio.Pipe(ra)
With default settings It will read up to four 1MB buffers that will be sent as the pipe is ready for it.
答案3
得分: 0
你可以使用来自https://github.com/djherbis/nio的缓冲管道,像这样:
import (
"github.com/djherbis/buffer"
"github.com/djherbis/nio"
)
...
b := buffer.New(32*1024)
pr, pw := nio.Pipe(b)
...
英文:
You can use a buffered pipe from https://github.com/djherbis/nio like this:
import (
"github.com/djherbis/buffer"
"github.com/djherbis/nio"
)
...
b := buffer.New(32*1024)
pr, pw := nio.Pipe(b)
...
答案4
得分: 0
我的https://github.com/acomagu/bufpipe有一个简单的接口,并且工作得很好。
r, w := bufpipe.New(nil)
io.WriteString(w, "abc") // 不会阻塞。
io.WriteString(w, "def") // 也不会阻塞。
w.Close()
io.Copy(os.Stdout, r)
// 输出:abcdef
英文:
My https://github.com/acomagu/bufpipe has simple interface and works well.
r, w := bufpipe.New(nil)
io.WriteString(w, "abc") // No blocking.
io.WriteString(w, "def") // No blocking, too.
w.Close()
io.Copy(os.Stdout, r)
// Output: abcdef
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论