如何使用Goroutine批量处理文件?

huangapple go评论94阅读模式
英文:

how to batch dealing with files using Goroutine?

问题

假设我有一堆要处理的文件(比如1000个或更多),首先它们应该通过函数A()进行处理,函数A()会生成一个文件,然后这个文件将被函数B()处理。

如果我们一个一个地处理,速度太慢了,所以我想使用goroutine一次处理5个文件(因为CPU无法承受太多的处理)。

我是Go语言的新手,不确定我的想法是否正确,我认为函数A()是一个生产者,函数B()是一个消费者,函数B()将处理函数A()生成的文件。我写了一些代码如下,请原谅我,我真的不知道如何编写代码,有人可以帮助我吗?提前谢谢!

package main

import "fmt"

var Box = make(chan string, 1024)

func A(file string) {
	fmt.Println(file, "正在函数A()中处理...")
	fileGenByA := "/path/to/fileGenByA1"
	Box <- fileGenByA
}

func B(file string) {
	fmt.Println(file, "正在函数B()中处理...")
}

func main() {
    // 假设这是从目录中读取的文件列表
	fileList := []string{
		"/path/to/file1",
		"/path/to/file2",
		"/path/to/file3",
	}

	// 似乎我不能这样做,因为fileList可能有1000个或更多的文件
	for _, v := range fileList {
		go A(v)
	}

	// 我可以这样做吗?
	for file := range Box {
		go B(file)
	}
}

更新:

抱歉,可能我没有表达清楚,实际上函数A()生成的文件存储在硬盘上(通过命令行工具生成的,我只是使用exec.Command()简单地执行它),而不是存储在变量(内存)中,所以它不需要立即传递给函数B()

我认为有两种方法:

方法1
如何使用Goroutine批量处理文件?

方法2
如何使用Goroutine批量处理文件?

实际上,我更喜欢方法2,正如你所看到的,第一个B()不需要处理file1GenByA,对于B()来说,处理盒子中的任何文件都是一样的,因为file1GenByA可能在file2GenByA之后生成(可能文件更大,所以需要更多时间)。

英文:

Assuming I have a bunch of files to deal with(say 1000 or more), first they should be processed by function A(), function A() will generate a file, then this file will be processed by B().

If we do it one by one, that's too slow, so I'm thinking process 5 files at a time using goroutine(we can not process too much at a time cause the CPU cannot bear).

I'm a newbie in golang, I'm not sure if my thought is correct, I think the function A() is a producer and the function B() is a consumer, function B() will deal with the file that produced by function A(), and I wrote some code below, forgive me, I really don't know how to write the code, can anyone give me a help? Thank you in advance!

package main

import &quot;fmt&quot;

var Box = make(chan string, 1024)

func A(file string) {
	fmt.Println(file, &quot;is processing in func A()...&quot;)
	fileGenByA := &quot;/path/to/fileGenByA1&quot;
	Box &lt;- fileGenByA
}

func B(file string) {
	fmt.Println(file, &quot;is processing in func B()...&quot;)
}

func main() {
    // assuming that this is the file list read from a directory
	fileList := []string{
		&quot;/path/to/file1&quot;,
		&quot;/path/to/file2&quot;,
		&quot;/path/to/file3&quot;,
	}

	// it seems I can&#39;t do this, because fileList may have 1000 or more file
	for _, v := range fileList {
		go A(v)
	}

	// can I do this?
	for file := range Box {
		go B(file)
	}
}

Update:

sorry, maybe I haven’t made myself clear, actually the file generated by function A() is stored in the hard disk(generated by a command line tool, I just simple execute it using exec.Command()), not in a variable(the memory), so it doesn't have to be passed to function B() immediately.

I think there are 2 approach:

approach1
如何使用Goroutine批量处理文件?

approach2
如何使用Goroutine批量处理文件?

Actually I prefer approach2, as you can see, the first B() doesn't have to process the file1GenByA, it's the same for B() to process any file in the box, because file1GenByA may generated after file2GenByA(maybe the file is larger so it takes more time).

答案1

得分: 4

你可以创建5个从工作通道读取的goroutine。这样,你就可以始终保持5个goroutine在运行,而不需要将它们分批处理,以便等待5个完成后再启动下一个5个。

func main() {
	stack := []string{"a", "b", "c", "d", "e", "f", "g", "h"}

	work := make(chan string)
	results := make(chan string)

	// 创建5个worker goroutine
	wg := sync.WaitGroup{}
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for s := range work {
				results <- B(A(s))
			}
		}()
	}

	// 将工作发送给worker
	// 这在一个goroutine中进行,以便不阻塞主函数,一旦所有5个worker都忙碌
	go func() {
		for _, s := range stack {
		   // 可以在这里从磁盘读取文件,并传递文件的指针
			work <- s
		}
		// 在所有工作都发送完毕后关闭工作通道
		close(work)

		// 等待worker完成,然后关闭结果通道
		wg.Wait()
		close(results)
	}()

	// 收集结果
	// 如果结果通道关闭且已接收到最后一个值,则停止迭代
	for result := range results {
	    // 可以将文件写入磁盘
		fmt.Println(result)
	}
}

https://play.golang.com/p/K-KVX4LEEoK

英文:

You could spawn 5 goroutines that read from a work channel. That way you have at all times 5 goroutines running and don't need to batch them so that you have to wait until 5 are finished to start the next 5.

func main() {
	stack := []string{&quot;a&quot;, &quot;b&quot;, &quot;c&quot;, &quot;d&quot;, &quot;e&quot;, &quot;f&quot;, &quot;g&quot;, &quot;h&quot;}

	work := make(chan string)
	results := make(chan string)

	// create worker 5 goroutines
	wg := sync.WaitGroup{}
	for i := 0; i &lt; 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for s := range work {
				results &lt;- B(A(s))
			}
		}()
	}

	// send the work to the workers
	// this happens in a goroutine in order
	// to not block the main function, once
	// all 5 workers are busy
	go func() {
		for _, s := range stack {
		   // could read the file from disk
		   // here and pass a pointer to the file
			work &lt;- s
		}
		// close the work channel after
		// all the work has been send
		close(work)

		// wait for the workers to finish
		// then close the results channel
		wg.Wait()
		close(results)
	}()

	// collect the results
	// the iteration stops if the results
	// channel is closed and the last value
	// has been received
	for result := range results {
	    // could write the file to disk
		fmt.Println(result)
	}
}

https://play.golang.com/p/K-KVX4LEEoK

答案2

得分: 2

你已经完成了一半的工作,有几个问题需要解决:

  1. 你的程序会发生死锁,因为没有关闭Box,所以主函数无法完成对它的range操作。
  2. 你没有等待goroutine完成,而且goroutine的数量超过了5个。(这两个问题的解决方法太复杂,无法单独描述)

1. 死锁

当你对一个通道进行range操作时,你会从通道中读取每个值,直到通道关闭且为空为止。由于你从未close通道,对该通道的range操作永远无法完成,程序也无法结束。

在你的情况下,解决这个问题相当简单:我们只需要在不再向通道写入数据时关闭通道。

    for _, v := range fileList {
        go A(v)
    }
    close(Box)

请记住,close一个通道并不会阻止它被读取,只会阻止它被写入。现在消费者可以区分将来可能接收更多数据的空通道和永远不会接收更多数据的空通道。

一旦添加了close(Box),程序就不会再发生死锁,但仍然无法正常工作。

2. Goroutine过多且未等待它们完成

为了同时运行一定数量的并发执行,而不是为每个输入创建一个goroutine,我们可以在一个“工作池”中创建goroutine:

  • 创建一个通道来传递工作给工作线程
  • 创建一个通道,用于goroutine返回它们的结果(如果有的话)
  • 启动所需数量的goroutine
  • 至少启动一个额外的goroutine来分派工作或收集结果,这样你就不必尝试从主goroutine中同时进行两者操作
  • 使用sync.WaitGroup等待所有数据被处理
  • close通道,以向工作线程和结果收集器发出它们的通道已经填充完毕的信号。

在我们开始实现之前,让我们讨论一下AB如何交互。

首先,它们应该由函数A()处理,函数A()将生成一个文件,然后该文件将由B()处理。

然后,A()B()必须按顺序执行。它们仍然可以通过通道传递数据,但由于它们必须按顺序执行,这对你没有任何帮助。更简单的方法是在工作线程中按顺序运行它们。为此,我们需要更改A(),要么调用B,要么返回给B的路径,然后工作线程可以调用它。我选择后者。

func A(file string) string {
	fmt.Println(file, "正在函数A()中处理...")
	fileGenByA := "/path/to/fileGenByA1"
    return fileGenByA
}

在编写工作函数之前,我们还必须考虑B的结果。目前,B不返回任何内容。在现实世界中,除非B()不会失败,否则你至少希望返回错误,或者至少panic。我将跳过收集结果的部分。

现在我们可以编写我们的工作函数了。

func worker(wg *sync.WaitGroup, incoming <-chan string) {
	defer wg.Done()
	for file := range incoming {
		B(A(file))
	}
}

现在我们只需要启动5个这样的工作线程,将输入文件写入通道,关闭它,并且wg.Wait()等待工作线程完成。

	incoming_work := make(chan string)
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go worker(&wg, incoming_work)
	}
	for _, v := range fileList {
		incoming_work <- v
	}
	close(incoming_work)
	wg.Wait()

完整示例代码请参考:https://go.dev/play/p/A1H4ArD2LD8

返回结果

能够启动goroutine并等待它们完成是很好的,但如果你需要从goroutine中获取结果怎么办?除非是最简单的情况,否则你至少希望知道文件是否处理失败,以便你可以调查错误。

我们有5个工作线程,但有很多文件,所以有很多结果。每个工作线程将不得不返回多个结果。因此,我们需要另一个通道。通常情况下,为返回值定义一个struct是值得的:

type result struct {
  file string
  err error
}

这不仅告诉我们是否有错误,还清楚地定义了错误的来源文件。

我们如何在当前代码中测试错误情况?在你的示例中,B始终从A获取相同的值。如果我们将A的输入文件名添加到它传递给B的路径中,我们可以根据子字符串模拟错误。我将模拟file3失败的错误。

func A(file string) string {
	fmt.Println(file, "正在函数A()中处理...")
	fileGenByA := "/path/to/fileGenByA1/" + file
	return fileGenByA
}

func B(file string) (r result) {
	r.file = file
	fmt.Println(file, "正在函数B()中处理...")
	if strings.Contains(file, "file3") {
		r.err = fmt.Errorf("测试错误")
	}
	return
}

我们的工作线程将发送结果,但我们需要在某个地方收集它们。main()正在将工作分派给工作线程,在写入incoming_work时会阻塞。因此,最简单的收集结果的地方是另一个goroutine。我们的结果收集器goroutine必须从结果通道中读取结果,打印出调试错误,并返回失败的总数,以便我们的程序可以返回表示整体成功或失败的最终退出状态。

	failures_chan := make(chan int)
	go func() {
		var failures int
		for result := range results {
			if result.err != nil {
				failures++
				fmt.Printf("文件 %s 失败:%s", result.file, result.err.Error())
			}
		}
		failures_chan <- failures

	}()

现在我们有另一个通道要关闭,重要的是我们在所有工作线程完成之后才关闭它。因此,在我们wg.Wait()等待工作线程之后,我们close(results)

	close(incoming_work)
	wg.Wait()
	close(results)
	if failures := <-failures_chan; failures > 0 {
		os.Exit(1)
	}

将所有这些放在一起,我们得到了这段代码

package main

import (
	"fmt"
	"os"
	"strings"
	"sync"
)

func A(file string) string {
	fmt.Println(file, "正在函数A()中处理...")
	fileGenByA := "/path/to/fileGenByA1/" + file
	return fileGenByA
}

func B(file string) (r result) {
	r.file = file
	fmt.Println(file, "正在函数B()中处理...")
	if strings.Contains(file, "file3") {
		r.err = fmt.Errorf("测试错误")
	}
	return
}

func worker(wg *sync.WaitGroup, incoming <-chan string, results chan<- result) {
	defer wg.Done()
	for file := range incoming {
		results <- B(A(file))
	}
}

type result struct {
	file string
	err  error
}

func main() {
	// 假设这是从目录中读取的文件列表
	fileList := []string{
		"/path/to/file1",
		"/path/to/file2",
		"/path/to/file3",
	}
	incoming_work := make(chan string)
	results := make(chan result)
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go worker(&wg, incoming_work, results)
	}
	failures_chan := make(chan int)
	go func() {
		var failures int
		for result := range results {
			if result.err != nil {
				failures++
				fmt.Printf("文件 %s 失败:%s", result.file, result.err.Error())
			}
		}
		failures_chan <- failures

	}()
	for _, v := range fileList {
		incoming_work <- v
	}
	close(incoming_work)
	wg.Wait()
	close(results)
	if failures := <-failures_chan; failures > 0 {
		os.Exit(1)
	}
}

当我们运行它时,我们得到:

/path/to/file1 正在函数A()中处理...
/path/to/fileGenByA1//path/to/file1 正在函数B()中处理...
/path/to/file2 正在函数A()中处理...
/path/to/fileGenByA1//path/to/file2 正在函数B()中处理...
/path/to/file3 正在函数A()中处理...
/path/to/fileGenByA1//path/to/file3 正在函数B()中处理...
文件 /path/to/fileGenByA1//path/to/file3 失败:测试错误
程序退出。

最后一点:带缓冲的通道。

带缓冲的通道没有问题。特别是如果你知道传入工作和结果的总大小,带缓冲的通道可以省去结果收集goroutine,因为你可以分配一个足够大的缓冲通道来容纳所有结果。然而,我认为如果通道是无缓冲的,更容易理解这种模式。关键是你不需要知道传入或传出结果的数量,这两个数字可能不同,或者基于无法预先确定的东西。

英文:

you're halfway there. There's a few things you need to fix:

  1. your program deadlocks because nothing closes Box, so the main function can never get done rangeing over it.
  2. You aren't waiting for your goroutines to finish, and there than 5 goroutines. (The solutions to these are too intertwined to describe them separately)

1. Deadlock

fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()

When you range over a channel, you read each value from the channel until it is both closed and empty. Since you never close the channel, the range over that channel can never complete, and the program can never finish.

This is a fairly easy problem to solve in your case: we just need to close the channel when we know there will be no more writes to the channel.

    for _, v := range fileList {
go A(v)
}
close(Box)

Keep in mind that closeing a channel doesn't stop it from being read, only written. Now consumers can distinguish between an empty channel that may receive more data in the future, and an empty channel that will never receive more data.

Once you add the close(Box), the program doesn't deadlock anymore, but it still doesn't work.

2. Too Many Goroutines and not waiting for them to complete

To run a certain maximum number of concurrent executions, instead of creating a goroutine for each input, create the goroutines in a "worker pool":

  • Create a channel to pass the workers their work
  • Create a channel for the goroutines to return their results, if any
  • Start the number of goroutines you want
  • Start at least one additional goroutine to either dispatch work or collect the result, so you don't have to try doing both from the main goroutine
  • use a sync.WaitGroup to wait for all data to be processed
  • close the channels to signal to the workers and the results collector that their channels are done being filled.

Before we get into the implementation, let's talk aobut how A and B interact.

> first they should be processed by function A(), function A() will generate a file, then this file will be processed by B().

A() and B() must, then, execute serially. They can still pass their data through a channel, but since their execution must be serial, it does nothing for you. Simpler is to run them sequentially in the workers. For that, we'll need to change A() to either call B, or to return the path for B and the worker can call. I choose the latter.

func A(file string) string {
fmt.Println(file, &quot;is processing in func A()...&quot;)
fileGenByA := &quot;/path/to/fileGenByA1&quot;
return fileGenByA
}

Before we write our worker function, we also must consider the result of B. Currently, B returns nothing. In the real world, unless B() cannot fail, you would at least want to either return the error, or at least panic. I'll skip over collecting results for now.

Now we can write our worker function.

func worker(wg *sync.WaitGroup, incoming &lt;-chan string) {
defer wg.Done()
for file := range incoming {
B(A(file))
}
}

Now all we have to do is start 5 such workers, write the incoming files to the channel, close it, and wg.Wait() for the workers to complete.

	incoming_work := make(chan string)
var wg sync.WaitGroup
for i := 0; i &lt; 5; i++ {
wg.Add(1)
go worker(&amp;wg, incoming_work)
}
for _, v := range fileList {
incoming_work &lt;- v
}
close(incoming_work)
wg.Wait()

Full example at https://go.dev/play/p/A1H4ArD2LD8

Returning Results.

It's all well and good to be able to kick off goroutines and wait for them to complete. But what if you need results back from your goroutines? In all but the simplest of cases, you would at least want to know if files failed to process so you could investigate the errors.

We have only 5 workers, but we have many files, so we have many results. Each worker will have to return several results. So, another channel. It's usually worth defining a struct for your return:

type result struct {
file string
err error
}

This tells us not just whether there was an error but also clearly defines which file from which the error resulted.

How will we test an error case in our current code? In your example, B always gets the same value from A. If we add A's incoming file name to the path it passes to B, we can mock an error based on a substring. My mocked error will be that file3 fails.

func A(file string) string {
fmt.Println(file, &quot;is processing in func A()...&quot;)
fileGenByA := &quot;/path/to/fileGenByA1/&quot; + file
return fileGenByA
}
func B(file string) (r result) {
r.file = file
fmt.Println(file, &quot;is processing in func B()...&quot;)
if strings.Contains(file, &quot;file3&quot;) {
r.err = fmt.Errorf(&quot;Test error&quot;)
}
return
}

Our workers will be sending results, but we need to collect them somewhere. main() is busy dispatching work to the workers, blocking on its write to incoming_work when the workers are all busy. So the simplest place to collect the results is another goroutine. Our results collector goroutine has to read from a results channel, print out errors for debugging, and the return the total number of failures so our program can return a final exit status indicating overall success or failure.

	failures_chan := make(chan int)
go func() {
var failures int
for result := range results {
if result.err != nil {
failures++
fmt.Printf(&quot;File %s failed: %s&quot;, result.file, result.err.Error())
}
}
failures_chan &lt;- failures
}()

Now we have another channel to close, and it's important we close it after all workers are done. So we close(results) after we wg.Wait() for the workers.

	close(incoming_work)
wg.Wait()
close(results)
if failures := &lt;-failures_chan; failures &gt; 0 {
os.Exit(1)
}

Putting all that together, we end up with this code:

package main
import (
&quot;fmt&quot;
&quot;os&quot;
&quot;strings&quot;
&quot;sync&quot;
)
func A(file string) string {
fmt.Println(file, &quot;is processing in func A()...&quot;)
fileGenByA := &quot;/path/to/fileGenByA1/&quot; + file
return fileGenByA
}
func B(file string) (r result) {
r.file = file
fmt.Println(file, &quot;is processing in func B()...&quot;)
if strings.Contains(file, &quot;file3&quot;) {
r.err = fmt.Errorf(&quot;Test error&quot;)
}
return
}
func worker(wg *sync.WaitGroup, incoming &lt;-chan string, results chan&lt;- result) {
defer wg.Done()
for file := range incoming {
results &lt;- B(A(file))
}
}
type result struct {
file string
err  error
}
func main() {
// assuming that this is the file list read from a directory
fileList := []string{
&quot;/path/to/file1&quot;,
&quot;/path/to/file2&quot;,
&quot;/path/to/file3&quot;,
}
incoming_work := make(chan string)
results := make(chan result)
var wg sync.WaitGroup
for i := 0; i &lt; 5; i++ {
wg.Add(1)
go worker(&amp;wg, incoming_work, results)
}
failures_chan := make(chan int)
go func() {
var failures int
for result := range results {
if result.err != nil {
failures++
fmt.Printf(&quot;File %s failed: %s&quot;, result.file, result.err.Error())
}
}
failures_chan &lt;- failures
}()
for _, v := range fileList {
incoming_work &lt;- v
}
close(incoming_work)
wg.Wait()
close(results)
if failures := &lt;-failures_chan; failures &gt; 0 {
os.Exit(1)
}
}

And when we run it, we get:

/path/to/file1 is processing in func A()...
/path/to/fileGenByA1//path/to/file1 is processing in func B()...
/path/to/file2 is processing in func A()...
/path/to/fileGenByA1//path/to/file2 is processing in func B()...
/path/to/file3 is processing in func A()...
/path/to/fileGenByA1//path/to/file3 is processing in func B()...
File /path/to/fileGenByA1//path/to/file3 failed: Test error
Program exited.

A final thought: buffered channels.

There is nothing wrong with buffered channels. Especially if you know the overall size of incoming work and results, buffered channels can obviate the results collector goroutine because you can allocate a buffered channel big enough to hold all results. However, I think it's more straightforward to understand this pattern if the channels are unbuffered. The key takeaway is that you don't need to know the number of incoming or outgoing results, which could indeed be different numbers or based on something that can't be predetermined.

huangapple
  • 本文由 发表于 2022年3月13日 23:37:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/71458290.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定