英文:
GoLang: Decompress bz2 in on goroutine, consume in other goroutine
问题
我是一名新毕业的软件工程师,正在学习Go语言(并且非常喜欢它)。
我正在构建一个用于解析维基百科转储文件的解析器,这个文件基本上是一个巨大的bzip2压缩的XML文件(未压缩时约50GB)。
我想要同时进行流式解压缩和解析,听起来很简单。对于解压缩,我这样做:
inputFilePath := flag.Arg(0) inputReader := bzip2.NewReader(inputFile)
然后将读取器传递给XML解析器:
decoder := xml.NewDecoder(inputFile)
然而,由于解压缩和解析都是耗时的操作,我希望它们在单独的Go协程中运行,以利用额外的核心。在Go语言中,我该如何做到这一点呢?
我唯一能想到的方法是将文件包装在一个chan []byte
中,并实现io.Reader
接口,但我想可能有一种内置的(更简洁)方法来实现这一点。
有人曾经做过类似的事情吗?
谢谢!
Manuel
英文:
I am a new-grad SWE learning Go (and loving it).
I am building a parser for Wikipedia dump files - basically a huge bzip2-compressed XML file (~50GB uncompressed).
I want to do both streaming decompression and parsing, which sounds simple enough. For decompression, I do:
inputFilePath := flag.Arg(0)
inputReader := bzip2.NewReader(inputFile)
And then pass the reader to the XML parser:
decoder := xml.NewDecoder(inputFile)
However, since both decompressing and parsing are expensive operations, I would like to have them run on separate Go routines to make use of additional cores. How would I go about doing this in Go?
The only thing I can think of is wrapping the file in a chan []byte, and implementing the io.Reader interface, but I presume there might be a built way (and cleaner) way of doing it.
Has anyone ever done something like this?
Thanks!
Manuel
答案1
得分: 2
你可以使用 io.Pipe 来创建一个管道,然后使用 io.Copy 将解压后的数据推送到管道中,并在另一个 goroutine 中读取它:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"sync"
)
func main() {
rawJson := []byte(`{
"Foo": {
"Bar": "Baz"
}
}`)
bzip2Reader := bytes.NewReader(rawJson) // 这里代替了 bzip2.NewReader
var wg sync.WaitGroup
wg.Add(2)
r, w := io.Pipe()
go func() {
// 将所有内容写入管道中,解压缩在这个 goroutine 中进行
io.Copy(w, bzip2Reader)
w.Close()
wg.Done()
}()
decoder := json.NewDecoder(r)
go func() {
for {
t, err := decoder.Token()
if err != nil {
break
}
fmt.Println(t)
}
wg.Done()
}()
wg.Wait()
}
你可以在 这里 运行这段代码。
英文:
You can use io.Pipe, then use io.Copy to push the decompressed data into the pipe, and read it in another goroutine:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"sync"
)
func main() {
rawJson := []byte(`{
"Foo": {
"Bar": "Baz"
}
}`)
bzip2Reader := bytes.NewReader(rawJson) // this stands in for the bzip2.NewReader
var wg sync.WaitGroup
wg.Add(2)
r, w := io.Pipe()
go func() {
// write everything into the pipe. Decompression happens in this goroutine.
io.Copy(w, bzip2Reader)
w.Close()
wg.Done()
}()
decoder := json.NewDecoder(r)
go func() {
for {
t, err := decoder.Token()
if err != nil {
break
}
fmt.Println(t)
}
wg.Done()
}()
wg.Wait()
}
答案2
得分: 0
一个简单的解决方案是使用我一段时间前创建的一个readahead包:https://github.com/klauspost/readahead
inputReader := bzip2.NewReader(inputFile)
ra := readahead.NewReader(input)
defer ra.Close()
然后将读取器传递给XML解析器:
decoder := xml.NewDecoder(ra)
使用默认设置,它将提前解码4个缓冲区中的4MB数据。
英文:
An easy solution is to use a readahead package I created some time back: https://github.com/klauspost/readahead
inputReader := bzip2.NewReader(inputFile)
ra := readahead.NewReader(input)
defer ra.Close()
And then pass the reader to the XML parser:
decoder := xml.NewDecoder(ra)
With default settings it will decode up to 4MB ahead of time in 4 buffers.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论