限制并发运行的任务数量

huangapple go评论95阅读模式
英文:

Limiting the number of concurrent tasks running

问题

所以我经常在go中遇到这个问题。假设我有一个包含100,000行文本的文本文件。现在我想将所有这些行保存到数据库中。我会这样做:

file, _ := ioutil.ReadFile("file.txt")

fileLines := strings.Split(string(file), "\n")

然后我会遍历文件中的所有行:

for _, l := range fileLines{
  saveToDB(l)
}

现在我想并发地运行这个saveToDB函数:

var wg sync.WaitGroup

for _, l := range fileLines{
  wg.Add(1)
  go saveToDB(l, &wg)
}

wg.Wait()

我不知道这是否是一个问题,但这将同时运行100,000个函数。有没有办法说“嘿,运行100个并发函数,等待所有这些函数完成后再运行100个”?

for i, _ := range fileLine {
  for t = 0; t < 100; t++{
    wg.Add(1)
    go saveToDB(fileLine[i], &wg)
  }
  wg.Wait()
}

我需要像这样做些什么,还是有更简洁的方法来处理这个问题?或者我同时运行100,000个任务是否有问题?

英文:

So I come accross this issue with go a lot. Let's say I have a text file with 100,000 lines of text. Now I wanna save all these lines to a db. So I would do something like this:

file, _ := iotuil.ReadFile(&quot;file.txt&quot;)

fileLines := strings.Split(string(file), &quot;\n&quot;)

Now I would loop over all the lines in the file:

for _, l := range fileLines{
  saveToDB(l)
}

Now I wanna run this saveToDB func concurrently:

var wg sync.WaitGroup

for _, l := range fileLines{
  wg.Add(1)
  go saveToDB(l, &amp;wg)
}

wg.Wait()

I don't know if this is a problem or not but that would run 100,000 concurrent functions. Is there any way of saying hey run 100 concurrent functions wait for all of those to finish then run 100 more.

for i, _ := range fileLine {
  for t = 0; t &lt; 100; t++{
    wg.Add(1)
    go saveToDB(fileLine[i], &amp;wg)
  }
  wg.Wait()
}

Do I need to do something like that or is there a cleaner way to go about this? Or is me running the 100,000 concurrent tasks not an issue?

答案1

得分: 20

我认为最好的方法是保持一个工作goroutine池,通过通道分发工作给它们,然后关闭通道以使它们退出。

代码示例如下:

// 创建一个用于工作任务的通道
ch := make(chan string)

wg := sync.WaitGroup{}

// 启动工作goroutine
for t := 0; t < 100; t++ {
wg.Add(1)
go saveToDB(ch, &wg)
}

// 将行推送到队列通道以进行处理
for _, line := range fileline {
ch <- line
}

// 这将导致工作goroutine停止并退出它们的接收循环
close(ch)

// 确保它们都退出
wg.Wait()

saveToDB函数的代码如下:

func saveToDB(ch chan string, wg *sync.WaitGroup) {
// 消费一行
for line := range ch {
// 执行工作
actuallySaveToDB(line)
}
// 当调度程序关闭通道时,我们已经退出了循环,
// 现在我们只需通知工作组我们已完成
wg.Done()
}

英文:

I think the best approach for this would be to keep a pool of worker goroutines, dispatch the work for them in channels, and then close the channel so they would exit.

something like this:

// create a channel for work &quot;tasks&quot;
ch := make(chan string)

wg := sync.WaitGroup{}

// start the workers
for t := 0; t &lt; 100; t++{
    wg.Add(1)
    go saveToDB(ch, &amp;wg)
}

// push the lines to the queue channel for processing
for _, line := range fileline {
    ch &lt;- line
}

// this will cause the workers to stop and exit their receive loop
close(ch)

// make sure they all exit
wg.Wait()

and then the saveFunction looks like this:

func saveToDB(ch chan string, wg *sync.WaitGroup) {
    // cnosume a line
    for line := range ch {
        // do work
        actuallySaveToDB(line)
    }
    // we&#39;ve exited the loop when the dispatcher closed the channel, 
    // so now we can just signal the workGroup we&#39;re done
    wg.Done()
}

huangapple
  • 本文由 发表于 2016年2月11日 06:54:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/35327609.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定