Golang:同时处理5个巨大文件

huangapple go评论91阅读模式
英文:

Golang: Processing 5 huge files concurrently

问题

我有5个巨大的(每个有400万行)日志文件,目前我在Perl中处理它们,但我想尝试在Go语言中实现相同的功能,利用其并发特性。因此,作为一个对Go语言非常不熟悉的人,我考虑以下的做法。对于这种方法的任何评论将不胜感激。

一些粗略的伪代码:

var wg1 sync.WaitGroup
var wg2 sync.WaitGroup

func processRow(r Row) {
    wg2.Add(1)
    defer wg2.Done()
    res := <处理 r>
    return res
}

func processFile(f File) {
    wg1.Add(1)
    open(newfile File)
    defer wg1.Done()
    line := < f 中读取行>
    result := go processRow(line)
    newFile.Println(result) // 将新处理的行写入 newFile
    wg2.Wait()
    newFile.Close()
}

func main() {
    for each f in logfile {
        go processFile(f)
    }
    wg1.Wait()
}

所以,我的想法是并发处理这5个文件,然后每个文件的所有行也将被并发处理。

这样会起作用吗?

英文:

I have 5 huge (4 million rows each) logfiles that I process in Perl currently and I thought I may try to implement the same in Go and its concurrent features. So, being very inexperienced in Go, I was thinking of doing as below. Any comments on the approach will be greatly appreciated.
Some rough pseudocode:

var wg1 sync.WaitGroup
var wg2 sync.WaitGroup

func processRow (r Row) {
    wg2.Add(1)
    defer wg2.Done()
    res = &lt;process r&gt;
    return res
}

func processFile(f File) {
    wg1.Add(1)
    open(newfile File)
    defer wg1.Done()
    line = &lt;row from f&gt;
    result = go processRow(line)
    newFile.Println(result) // Write new processed line to newFile
    wg2.Wait()
    newFile.Close()

}

func main() {
    
    for each f logfile {
        go processFile(f)
    }
    wg1.Wait()
}

So, idea is that I process these 5 files concurrently and then all rows of each file will in turn also be processed concurrently.

Will that work?

答案1

得分: 8

你应该使用通道来管理处理过的行。或者你也可以编写另一个 goroutine 来处理输出。

var numGoWriters = 10

func processRow(r Row, ch chan<- string) {
    res := process(r)
    ch <- res
}

func writeRow(f File, ch <-chan string) {
    w := bufio.NewWriter(f)
    for s := range ch {
        _, err := w.WriteString(s + "\n")
    }
}

func processFile(f File) {
    outFile, err := os.Create("/path/to/file.out")
    if err != nil {
        // 处理错误
    }
    defer outFile.Close()
    var wg sync.WaitGroup
    ch := make(chan string, 10)  // 根据性能调整这个数字
    defer close(ch) // 一旦我们处理完所有行,关闭通道,这样我们的工作线程就会退出
    fScanner := bufio.NewScanner(f)
    for fScanner.Scan() {
        wg.Add(1)
        go func() {
            processRow(fScanner.Text(), ch)
            wg.Done()
        }()
    }
    for i := 0; i < numGoWriters; i++ {
        go writeRow(outFile, ch)
    }
    wg.Wait()  
}

func main() {
    var wg sync.WaitGroup

    filenames := [...]string{"here", "are", "some", "log", "paths"}
    for _, fname := range filenames {
        inFile, err := os.Open(fname)
        if err != nil {
            // 处理错误
        }
        defer inFile.Close()
        wg.Add(1)
        go processFile(inFile)
    }
    wg.Wait()
}

在这里,processRow 执行所有的处理(我假设是 string 类型),writeRow 执行所有的输出 I/O,processFile 将每个文件连接在一起。然后,main 只需要传递文件,生成 goroutine,et voila

英文:

You should definitely use channels to manage your processed rows. Alternatively you could also write another goroutine to handle your output.

var numGoWriters = 10
func processRow(r Row, ch chan&lt;- string) {
res := process(r)
ch &lt;- res
}
func writeRow(f File, ch &lt;-chan string) {
w := bufio.NewWriter(f)
for s := range ch {
_, err := w.WriteString(s + &quot;\n&quot;)
}
func processFile(f File) {
outFile, err := os.Create(&quot;/path/to/file.out&quot;)
if err != nil {
// handle it
}
defer outFile.Close()
var wg sync.WaitGroup
ch := make(chan string, 10)  // play with this number for performance
defer close(ch) // once we&#39;re done processing rows, we close the channel
// so our worker threads exit
fScanner := bufio.NewScanner(f)
for fScanner.Scan() {
wg.Add(1)
go func() {
processRow(fScanner.Text(), ch)
wg.Done()
}()
}
for i := 0; i &lt; numGoWriters; i++ {
go writeRow(outFile, ch)
}
wg.Wait()  
}

Here we have processRow doing all the processing (I assumed to string), writeRow doing all the out I/O, and processFile tying each file together. Then all main has to do is hand off the files, spawn the goroutines, et voila.

func main() {
var wg sync.WaitGroup
filenames := [...]string{&quot;here&quot;, &quot;are&quot;, &quot;some&quot;, &quot;log&quot;, &quot;paths&quot;}
for fname := range filenames {
inFile, err := os.Open(fname)
if err != nil {
// handle it
}
defer inFile.Close()
wg.Add(1)
go processFile(inFile)
}
wg.Wait()

huangapple
  • 本文由 发表于 2015年12月20日 16:52:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/34379341.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定