GoLang:在一个 goroutine 中解压 bz2 文件,在另一个 goroutine 中进行消费。

huangapple go评论75阅读模式
英文:

GoLang: Decompress bz2 in on goroutine, consume in other goroutine

问题

我是一名新毕业的软件工程师,正在学习Go语言(并且非常喜欢它)。

我正在构建一个用于解析维基百科转储文件的解析器,这个文件基本上是一个巨大的bzip2压缩的XML文件(未压缩时约50GB)。

我想要同时进行流式解压缩和解析,听起来很简单。对于解压缩,我这样做:

inputFilePath := flag.Arg(0) inputReader := bzip2.NewReader(inputFile)

然后将读取器传递给XML解析器:

decoder := xml.NewDecoder(inputFile)

然而,由于解压缩和解析都是耗时的操作,我希望它们在单独的Go协程中运行,以利用额外的核心。在Go语言中,我该如何做到这一点呢?

我唯一能想到的方法是将文件包装在一个chan []byte中,并实现io.Reader接口,但我想可能有一种内置的(更简洁)方法来实现这一点。

有人曾经做过类似的事情吗?

谢谢!
Manuel

英文:

I am a new-grad SWE learning Go (and loving it).

I am building a parser for Wikipedia dump files - basically a huge bzip2-compressed XML file (~50GB uncompressed).

I want to do both streaming decompression and parsing, which sounds simple enough. For decompression, I do:

inputFilePath := flag.Arg(0)
inputReader := bzip2.NewReader(inputFile)

And then pass the reader to the XML parser:

decoder := xml.NewDecoder(inputFile)

However, since both decompressing and parsing are expensive operations, I would like to have them run on separate Go routines to make use of additional cores. How would I go about doing this in Go?

The only thing I can think of is wrapping the file in a chan []byte, and implementing the io.Reader interface, but I presume there might be a built way (and cleaner) way of doing it.

Has anyone ever done something like this?

Thanks!
Manuel

答案1

得分: 2

你可以使用 io.Pipe 来创建一个管道,然后使用 io.Copy 将解压后的数据推送到管道中,并在另一个 goroutine 中读取它:

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"sync"
)

func main() {

	rawJson := []byte(`{
		"Foo": {
			"Bar": "Baz"
		}
	}`)

	bzip2Reader := bytes.NewReader(rawJson) // 这里代替了 bzip2.NewReader

	var wg sync.WaitGroup
	wg.Add(2)

	r, w := io.Pipe()

	go func() {
		// 将所有内容写入管道中,解压缩在这个 goroutine 中进行
		io.Copy(w, bzip2Reader)
		w.Close()
		wg.Done()
	}()

	decoder := json.NewDecoder(r)

	go func() {
		for {
			t, err := decoder.Token()
			if err != nil {
				break
			}
			fmt.Println(t)
		}
		wg.Done()
	}()

	wg.Wait()
}

你可以在 这里 运行这段代码。

英文:

You can use io.Pipe, then use io.Copy to push the decompressed data into the pipe, and read it in another goroutine:

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"sync"
)

func main() {

	rawJson := []byte(`{
    		"Foo": {
    			"Bar": "Baz"
    		}
    	}`)

	bzip2Reader := bytes.NewReader(rawJson) // this stands in for the bzip2.NewReader

	var wg sync.WaitGroup
	wg.Add(2)

	r, w := io.Pipe()

	go func() {
		// write everything into the pipe. Decompression happens in this goroutine.
		io.Copy(w, bzip2Reader)
		w.Close()
		wg.Done()
	}()

	decoder := json.NewDecoder(r)

	go func() {
		for {
			t, err := decoder.Token()
			if err != nil {
				break
			}
			fmt.Println(t)
		}
		wg.Done()
	}()

	wg.Wait()
}

http://play.golang.org/p/fXLnfnaWYA

答案2

得分: 0

一个简单的解决方案是使用我一段时间前创建的一个readahead包:https://github.com/klauspost/readahead

inputReader := bzip2.NewReader(inputFile)
ra := readahead.NewReader(input)
defer ra.Close()

然后将读取器传递给XML解析器:

decoder := xml.NewDecoder(ra)

使用默认设置,它将提前解码4个缓冲区中的4MB数据。

英文:

An easy solution is to use a readahead package I created some time back: https://github.com/klauspost/readahead

inputReader := bzip2.NewReader(inputFile)
ra := readahead.NewReader(input)
defer ra.Close()

And then pass the reader to the XML parser:

decoder := xml.NewDecoder(ra)

With default settings it will decode up to 4MB ahead of time in 4 buffers.

huangapple
  • 本文由 发表于 2016年3月26日 05:53:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/36228655.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定