GoLang:在一个 goroutine 中解压 bz2 文件,在另一个 goroutine 中进行消费。

huangapple go评论118阅读模式
英文:

GoLang: Decompress bz2 in on goroutine, consume in other goroutine

问题

我是一名新毕业的软件工程师,正在学习Go语言(并且非常喜欢它)。

我正在构建一个用于解析维基百科转储文件的解析器,这个文件基本上是一个巨大的bzip2压缩的XML文件(未压缩时约50GB)。

我想要同时进行流式解压缩和解析,听起来很简单。对于解压缩,我这样做:

inputFilePath := flag.Arg(0) inputReader := bzip2.NewReader(inputFile)

然后将读取器传递给XML解析器:

decoder := xml.NewDecoder(inputFile)

然而,由于解压缩和解析都是耗时的操作,我希望它们在单独的Go协程中运行,以利用额外的核心。在Go语言中,我该如何做到这一点呢?

我唯一能想到的方法是将文件包装在一个chan []byte中,并实现io.Reader接口,但我想可能有一种内置的(更简洁)方法来实现这一点。

有人曾经做过类似的事情吗?

谢谢!
Manuel

英文:

I am a new-grad SWE learning Go (and loving it).

I am building a parser for Wikipedia dump files - basically a huge bzip2-compressed XML file (~50GB uncompressed).

I want to do both streaming decompression and parsing, which sounds simple enough. For decompression, I do:

inputFilePath := flag.Arg(0)
inputReader := bzip2.NewReader(inputFile)

And then pass the reader to the XML parser:

decoder := xml.NewDecoder(inputFile)

However, since both decompressing and parsing are expensive operations, I would like to have them run on separate Go routines to make use of additional cores. How would I go about doing this in Go?

The only thing I can think of is wrapping the file in a chan []byte, and implementing the io.Reader interface, but I presume there might be a built way (and cleaner) way of doing it.

Has anyone ever done something like this?

Thanks!
Manuel

答案1

得分: 2

你可以使用 io.Pipe 来创建一个管道,然后使用 io.Copy 将解压后的数据推送到管道中,并在另一个 goroutine 中读取它:

  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "sync"
  8. )
  9. func main() {
  10. rawJson := []byte(`{
  11. "Foo": {
  12. "Bar": "Baz"
  13. }
  14. }`)
  15. bzip2Reader := bytes.NewReader(rawJson) // 这里代替了 bzip2.NewReader
  16. var wg sync.WaitGroup
  17. wg.Add(2)
  18. r, w := io.Pipe()
  19. go func() {
  20. // 将所有内容写入管道中,解压缩在这个 goroutine 中进行
  21. io.Copy(w, bzip2Reader)
  22. w.Close()
  23. wg.Done()
  24. }()
  25. decoder := json.NewDecoder(r)
  26. go func() {
  27. for {
  28. t, err := decoder.Token()
  29. if err != nil {
  30. break
  31. }
  32. fmt.Println(t)
  33. }
  34. wg.Done()
  35. }()
  36. wg.Wait()
  37. }

你可以在 这里 运行这段代码。

英文:

You can use io.Pipe, then use io.Copy to push the decompressed data into the pipe, and read it in another goroutine:

  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "sync"
  8. )
  9. func main() {
  10. rawJson := []byte(`{
  11. "Foo": {
  12. "Bar": "Baz"
  13. }
  14. }`)
  15. bzip2Reader := bytes.NewReader(rawJson) // this stands in for the bzip2.NewReader
  16. var wg sync.WaitGroup
  17. wg.Add(2)
  18. r, w := io.Pipe()
  19. go func() {
  20. // write everything into the pipe. Decompression happens in this goroutine.
  21. io.Copy(w, bzip2Reader)
  22. w.Close()
  23. wg.Done()
  24. }()
  25. decoder := json.NewDecoder(r)
  26. go func() {
  27. for {
  28. t, err := decoder.Token()
  29. if err != nil {
  30. break
  31. }
  32. fmt.Println(t)
  33. }
  34. wg.Done()
  35. }()
  36. wg.Wait()
  37. }

http://play.golang.org/p/fXLnfnaWYA

答案2

得分: 0

一个简单的解决方案是使用我一段时间前创建的一个readahead包:https://github.com/klauspost/readahead

  1. inputReader := bzip2.NewReader(inputFile)
  2. ra := readahead.NewReader(input)
  3. defer ra.Close()

然后将读取器传递给XML解析器:

  1. decoder := xml.NewDecoder(ra)

使用默认设置,它将提前解码4个缓冲区中的4MB数据。

英文:

An easy solution is to use a readahead package I created some time back: https://github.com/klauspost/readahead

  1. inputReader := bzip2.NewReader(inputFile)
  2. ra := readahead.NewReader(input)
  3. defer ra.Close()

And then pass the reader to the XML parser:

  1. decoder := xml.NewDecoder(ra)

With default settings it will decode up to 4MB ahead of time in 4 buffers.

huangapple
  • 本文由 发表于 2016年3月26日 05:53:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/36228655.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定