
huangapple go评论130阅读模式

How would you define a pool of goroutines to be executed at once?


TL;DR: 请直接跳到最后一部分告诉我你将如何解决这个问题。





  1. package main
  2. import "os/exec"
  3. func main() {
  4. cmd := exec.Command("zenity", "--info", "--text='Hello World'")
  5. cmd.Run()
  6. }




  1. package main
  2. import (
  3. "os/exec"
  4. "strconv"
  5. )
  6. func main() {
  7. NumEl := 8 // 调用外部程序的次数
  8. for i:=0; i<NumEl; i++ {
  9. cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
  10. cmd.Run()
  11. }
  12. }



a) 第一次尝试:到处都加上"go"


  1. package main
  2. import (
  3. "os/exec"
  4. "strconv"
  5. )
  6. func main() {
  7. NumEl := 8
  8. for i:=0; i<NumEl; i++ {
  9. go callProg(i) // <--- 在这里!
  10. }
  11. }
  12. func callProg(i int) {
  13. cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
  14. cmd.Run()
  15. }


更糟糕的是,我实际上想要调用的真正程序需要一段时间来执行。如果我在我的4核CPU上并行执行8个这样的程序,它将浪费一些时间进行大量的上下文切换...我不知道纯粹的Go goroutines的行为如何,但是exec.Command在8个不同的线程中启动zenity 8次。更糟糕的是,我想执行这个程序超过100,000次。在goroutines中一次性执行所有这些操作将一点效率都没有。尽管如此,我仍然想利用我的4核CPU!

b) 第二次尝试:使用goroutines池



  1. package main
  2. import (
  3. "os/exec"
  4. "strconv"
  5. )
  6. func main() {
  7. NumEl := 8 // 调用外部程序的次数
  8. NumCore := 4 // 可用核心数
  9. c := make(chan bool, NumCore - 1)
  10. for i:=0; i<NumEl; i++ {
  11. go callProg(i, c)
  12. c <- true // 在第NumCore次迭代时,c会阻塞
  13. }
  14. }
  15. func callProg(i int, c chan bool) {
  16. defer func () {<- c}()
  17. cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
  18. cmd.Run()
  19. }


c) 第三次尝试:当所有子进程都死掉时退出


  1. package main
  2. import (
  3. "os/exec"
  4. "strconv"
  5. "sync"
  6. )
  7. func main() {
  8. NumEl := 8 // 调用外部程序的次数
  9. NumCore := 4 // 可用核心数
  10. c := make(chan bool, NumCore - 1)
  11. wg := new(sync.WaitGroup)
  12. wg.Add(NumEl) // 设置goroutines的数量为(0 + NumEl)
  13. for i:=0; i<NumEl; i++ {
  14. go callProg(i, c, wg)
  15. c <- true // 在第NumCore次迭代时,c会阻塞
  16. }
  17. wg.Wait() // 等待所有子进程死掉
  18. close(c)
  19. }
  20. func callProg(i int, c chan bool, wg *sync.WaitGroup) {
  21. defer func () {
  22. <- c
  23. wg.Done() // 减少存活的goroutines数量
  24. }()
  25. cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
  26. cmd.Run()
  27. }



  • 你知道其他限制同时执行的goroutines的正确方法吗?


  • 你觉得这段代码看起来还可以吗?
  • 你知道如何避免在这种情况下使用虚拟通道吗?



TL;DR: Please just go to the last part and tell me how you would solve this problem.

I've begun using Go this morning coming from Python. I want to call a closed-source executable from Go several times, with a bit of concurrency, with different command line arguments. My resulting code is working just well but I'd like to get your input in order to improve it. Since I'm at an early learning stage, I'll also explain my workflow.

For the sake of simplicity, assume here that this "external closed-source program" is zenity, a Linux command line tool that can display graphical message boxes from the command line.

Calling an executable file from Go

So, in Go, I would go like this:

  1. package main
  2. import &quot;os/exec&quot;
  3. func main() {
  4. cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello World&#39;&quot;)
  5. cmd.Run()
  6. }

This should be working just right. Note that .Run() is a functional equivalent to .Start() followed by .Wait(). This is great, but if I wanted to execute this program just once, the whole programming stuff would not be worth it. So let's just do that multiple times.

Calling an executable multiple times

Now that I had this working, I'd like to call my program multiple times, with custom command line arguments (here just i for the sake of simplicity).

  1. package main
  2. import (
  3. &quot;os/exec&quot;
  4. &quot;strconv&quot;
  5. )
  6. func main() {
  7. NumEl := 8 // Number of times the external program is called
  8. for i:=0; i&lt;NumEl; i++ {
  9. cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot; + strconv.Itoa(i) + &quot;&#39;&quot;)
  10. cmd.Run()
  11. }
  12. }

Ok, we did it! But I still can't see the advantage of Go over Python … This piece of code is actually executed in a serial fashion. I have a multiple-core CPU and I'd like to take advantage of it. So let's add some concurrency with goroutines.

Goroutines, or a way to make my program parallel

a) First attempt: just add "go"s everywhere

Let's rewrite our code to make things easier to call and reuse and add the famous go keyword:

  1. package main
  2. import (
  3. &quot;os/exec&quot;
  4. &quot;strconv&quot;
  5. )
  6. func main() {
  7. NumEl := 8
  8. for i:=0; i&lt;NumEl; i++ {
  9. go callProg(i) // &lt;--- There!
  10. }
  11. }
  12. func callProg(i int) {
  13. cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot; + strconv.Itoa(i) + &quot;&#39;&quot;)
  14. cmd.Run()
  15. }

Nothing! What is the problem? All the goroutines are executed at once. I don't really know why zenity is not executed but AFAIK, the Go program exited before the zenity external program could even be initialized. This was confirmed by the use of time.Sleep: waiting for a couple of seconds was enough to let the 8 instance of zenity launch themselves. I don't know if this can be considered a bug though.

To make it worse, the real program I'd actually like to call takes a while to execute itself. If I execute 8 instances of this program in parallel on my 4-core CPU, it's gonna waste some time doing a lot of context switching … I don't know how plain Go goroutines behave, but exec.Command will launch zenity 8 times in 8 different threads. To make it even worse, I want to execute this program more than 100,000 times. Doing all of that at once in goroutines won't be efficient at all. Still, I'd like to leverage my 4-core CPU!

b) Second attempt: use pools of goroutines

The online resources tend to recommend the use of sync.WaitGroup for this kind of work. The problem with that approach is that you are basically working with batches of goroutines: if I create of WaitGroup of 4 members, the Go program will wait for all the 4 external programs to finish before calling a new batch of 4 programs. This is not efficient: CPU is wasted, once again.

Some other resources recommended the use of a buffered channel to do the work:

  1. package main
  2. import (
  3. &quot;os/exec&quot;
  4. &quot;strconv&quot;
  5. )
  6. func main() {
  7. NumEl := 8 // Number of times the external program is called
  8. NumCore := 4 // Number of available cores
  9. c := make(chan bool, NumCore - 1)
  10. for i:=0; i&lt;NumEl; i++ {
  11. go callProg(i, c)
  12. c &lt;- true // At the NumCoreth iteration, c is blocking
  13. }
  14. }
  15. func callProg(i int, c chan bool) {
  16. defer func () {&lt;- c}()
  17. cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot; + strconv.Itoa(i) + &quot;&#39;&quot;)
  18. cmd.Run()
  19. }

This seems ugly. Channels were not intended for this purpose: I'm exploiting a side-effect. I love the concept of defer but I hate having to declare a function (even a lambda) to pop a value out of the dummy channel that I created. Oh, and of course, using a dummy channel is, by itself, ugly.

c) Third attempt: die when all the children are dead

Now we are nearly finished. I have just to take into account yet another side effect: the Go program closes before all the zenity pop-ups are closed. This is because when the loop is finised (at the 8th iteration), nothing prevents the program from finishing. This time, sync.WaitGroup will be useful.

  1. package main
  2. import (
  3. &quot;os/exec&quot;
  4. &quot;strconv&quot;
  5. &quot;sync&quot;
  6. )
  7. func main() {
  8. NumEl := 8 // Number of times the external program is called
  9. NumCore := 4 // Number of available cores
  10. c := make(chan bool, NumCore - 1)
  11. wg := new(sync.WaitGroup)
  12. wg.Add(NumEl) // Set the number of goroutines to (0 + NumEl)
  13. for i:=0; i&lt;NumEl; i++ {
  14. go callProg(i, c, wg)
  15. c &lt;- true // At the NumCoreth iteration, c is blocking
  16. }
  17. wg.Wait() // Wait for all the children to die
  18. close(c)
  19. }
  20. func callProg(i int, c chan bool, wg *sync.WaitGroup) {
  21. defer func () {
  22. &lt;- c
  23. wg.Done() // Decrease the number of alive goroutines
  24. }()
  25. cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot; + strconv.Itoa(i) + &quot;&#39;&quot;)
  26. cmd.Run()
  27. }


My questions

  • Do you know any other proper way to limit the number of goroutines executed at once?

I don't mean threads; how Go manages goroutines internally is not relevant. I really mean limiting the number of goroutines launched at once: exec.Command creates a new thread each time it is called, so I should control the number of time it is called.

  • Does that code look fine to you?
  • Do you know how to avoid the use of a dummy channel in that case?

I can't convince myself that such dummy channels are the way to go.


得分: 89


  1. package main
  2. import (
  3. "os/exec"
  4. "strconv"
  5. "sync"
  6. )
  7. func main() {
  8. tasks := make(chan *exec.Cmd, 64)
  9. // 生成四个工作协程
  10. var wg sync.WaitGroup
  11. for i := 0; i < 4; i++ {
  12. wg.Add(1)
  13. go func() {
  14. for cmd := range tasks {
  15. cmd.Run()
  16. }
  17. wg.Done()
  18. }()
  19. }
  20. // 生成一些任务
  21. for i := 0; i < 10; i++ {
  22. tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
  23. }
  24. close(tasks)
  25. // 等待工作协程完成
  26. wg.Wait()
  27. }



I would spawn 4 worker goroutines that read the tasks from a common channel. Goroutines that are faster than others (because they are scheduled differently or happen to get simple tasks) will receive more task from this channel than others. In addition to that, I would use a sync.WaitGroup to wait for all workers to finish. The remaining part is just the creation of the tasks. You can see an example implementation of that approach here:

  1. package main
  2. import (
  3. &quot;os/exec&quot;
  4. &quot;strconv&quot;
  5. &quot;sync&quot;
  6. )
  7. func main() {
  8. tasks := make(chan *exec.Cmd, 64)
  9. // spawn four worker goroutines
  10. var wg sync.WaitGroup
  11. for i := 0; i &lt; 4; i++ {
  12. wg.Add(1)
  13. go func() {
  14. for cmd := range tasks {
  15. cmd.Run()
  16. }
  17. wg.Done()
  18. }()
  19. }
  20. // generate some tasks
  21. for i := 0; i &lt; 10; i++ {
  22. tasks &lt;- exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot;+strconv.Itoa(i)+&quot;&#39;&quot;)
  23. }
  24. close(tasks)
  25. // wait for the workers to finish
  26. wg.Wait()
  27. }

There are probably other possible approaches, but I think this is a very clean solution that is easy to understand.


得分: 34

一个简单的限流方法(执行f() N次,但最多同时执行maxConcurrency次),只是一个方案:

  1. package main
  2. import (
  3. "sync"
  4. )
  5. const maxConcurrency = 4 // 例如
  6. var throttle = make(chan int, maxConcurrency)
  7. func main() {
  8. const N = 100 // 例如
  9. var wg sync.WaitGroup
  10. for i := 0; i < N; i++ {
  11. throttle <- 1 // 任意数字
  12. wg.Add(1)
  13. go f(i, &wg, throttle)
  14. }
  15. wg.Wait()
  16. }
  17. func f(i int, wg *sync.WaitGroup, throttle chan int) {
  18. defer wg.Done()
  19. // 任意处理
  20. println(i)
  21. <-throttle
  22. }





A simple approach to throttling (execute f() N times but maximum maxConcurrency concurrently), just a scheme:

  1. package main
  2. import (
  3. &quot;sync&quot;
  4. )
  5. const maxConcurrency = 4 // for example
  6. var throttle = make(chan int, maxConcurrency)
  7. func main() {
  8. const N = 100 // for example
  9. var wg sync.WaitGroup
  10. for i := 0; i &lt; N; i++ {
  11. throttle &lt;- 1 // whatever number
  12. wg.Add(1)
  13. go f(i, &amp;wg, throttle)
  14. }
  15. wg.Wait()
  16. }
  17. func f(i int, wg *sync.WaitGroup, throttle chan int) {
  18. defer wg.Done()
  19. // whatever processing
  20. println(i)
  21. &lt;-throttle
  22. }


I wouldn't probably call the throttle channel "dummy". IMHO it's an elegant way (it's not my invention of course), how to limit concurrency.

BTW: Please note that you're ignoring the returned error from cmd.Run().


得分: 2

🧩 模块

📃 模板

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/zenthangplus/goccm"
  5. "math/rand"
  6. "runtime"
  7. )
  8. func main() {
  9. semaphore := goccm.New(runtime.NumCPU())
  10. for {
  11. semaphore.Wait()
  12. go func() {
  13. fmt.Println(rand.Int())
  14. semaphore.Done()
  15. }()
  16. }
  17. semaphore.WaitAllDone()
  18. }

🎰 最佳的协程数量

  • 如果操作是CPU密集型的:runtime.NumCPU()
  • 否则使用以下命令进行测试:time go run *.go

🔨 配置

  1. export GOPATH="$(pwd)/gopath"
  2. go mod init *.go
  3. go mod tidy

🧹 清理

  1. find "${GOPATH}" -exec chmod +w {} \;
  2. rm --recursive --force "${GOPATH}"


🧩 Modules

📃 Template

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;github.com/zenthangplus/goccm&quot;
  5. &quot;math/rand&quot;
  6. &quot;runtime&quot;
  7. )
  8. func main() {
  9. semaphore := goccm.New(runtime.NumCPU())
  10. for {
  11. semaphore.Wait()
  12. go func() {
  13. fmt.Println(rand.Int())
  14. semaphore.Done()
  15. }()
  16. }
  17. semaphore.WaitAllDone()
  18. }

🎰 Optimal routine quantity

  • If the operation is CPU bounded: runtime.NumCPU()
  • Otherwise test with: time go run *.go

🔨 Configure

  1. export GOPATH=&quot;$(pwd)/gopath&quot;
  2. go mod init *.go
  3. go mod tidy

🧹 CleanUp

  1. find &quot;${GOPATH}&quot; -exec chmod +w {} \;
  2. rm --recursive --force &quot;${GOPATH}&quot;


得分: 1


limiter := NewConcurrencyLimiter(10)
limiter.Execute(func() {


try this:

  1. limiter := NewConcurrencyLimiter(10)
  2. limiter.Execute(func() {
  3. zenity(...)
  4. })
  5. limiter.Wait()


得分: 1

你可以使用这篇文章中描述的Worker Pool模式。下面是一个实现的示例代码:

  1. package main
  2. import (
  3. "os/exec"
  4. "strconv"
  5. )
  6. func main() {
  7. NumEl := 8
  8. pool := 4
  9. intChan := make(chan int)
  10. for i:=0; i<pool; i++ {
  11. go callProg(intChan) // 启动工作协程
  12. }
  13. for i:=0;i<NumEl;i++{
  14. intChan <- i // 推送数据给工作协程
  15. }
  16. close(intChan) // 安全关闭通道并终止工作协程
  17. }
  18. func callProg(intChan chan int) {
  19. for i := range intChan{
  20. cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
  21. cmd.Run()
  22. }
  23. }



You could use Worker Pool pattern described here in this post.
This is how an implementation would look like ...

  1. package main
  2. import (
  3. &quot;os/exec&quot;
  4. &quot;strconv&quot;
  5. )
  6. func main() {
  7. NumEl := 8
  8. pool := 4
  9. intChan := make(chan int)
  10. for i:=0; i&lt;pool; i++ {
  11. go callProg(intChan) // &lt;--- launch the worker routines
  12. }
  13. for i:=0;i&lt;NumEl;i++{
  14. intChan &lt;- i // &lt;--- push data which will be received by workers
  15. }
  16. close(intChan) // &lt;--- will safely close the channel &amp; terminate worker routines
  17. }
  18. func callProg(intChan chan int) {
  19. for i := range intChan{
  20. cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot; + strconv.Itoa(i) + &quot;&#39;&quot;)
  21. cmd.Run()
  22. }
  23. }

  • 本文由 发表于 2013年8月23日 22:14:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/18405023.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
