如何定义一组同时执行的goroutine池?

huangapple go评论96阅读模式
英文:

How would you define a pool of goroutines to be executed at once?

问题

TL;DR: 请直接跳到最后一部分告诉我你将如何解决这个问题。

我今天早上开始使用Go,之前是用Python。我想从Go中多次调用一个闭源的可执行文件,带有一点并发性,并且使用不同的命令行参数。我的代码已经正常工作,但我想听听你的意见,以便改进它。由于我还处于学习阶段,我也会解释一下我的工作流程。

为了简单起见,假设这个“外部闭源程序”是zenity,一个可以从命令行显示图形消息框的Linux命令行工具。

从Go调用可执行文件

在Go中,我会这样写:

package main
import "os/exec"
func main() {
    cmd := exec.Command("zenity", "--info", "--text='Hello World'")
    cmd.Run()
}

这应该是正常工作的。注意,.Run().Start().Wait()的功能等效。这很好,但如果我只想执行这个程序一次,那么整个编程过程就不值得了。所以我们来多次执行它。

多次调用可执行文件

现在我想多次调用我的程序,使用自定义的命令行参数(这里只是简单地使用i)。

package main    
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 // 调用外部程序的次数
    for i:=0; i<NumEl; i++ {
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}

好了,我们做到了!但我仍然看不出Go相对于Python的优势...这段代码实际上是按顺序执行的。我有一个多核CPU,我想利用它的优势。所以让我们使用goroutines来添加一些并发性。

Goroutines,或者说让我的程序并行化

a) 第一次尝试:到处都加上"go"

让我们重写我们的代码,使调用和重用变得更容易,并添加著名的go关键字:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    for i:=0; i<NumEl; i++ {
        go callProg(i)  // <--- 在这里!
    }
}

func callProg(i int) {
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

什么都没有发生!问题出在哪里?所有的goroutines都同时执行。我不知道为什么zenity没有执行,但据我所知,Go程序在zenity外部程序甚至初始化之前就退出了。通过使用time.Sleep来确认这一点:等待几秒足以让8个zenity实例启动。不过我不知道这是否可以被视为一个bug。

更糟糕的是,我实际上想要调用的真正程序需要一段时间来执行。如果我在我的4核CPU上并行执行8个这样的程序,它将浪费一些时间进行大量的上下文切换...我不知道纯粹的Go goroutines的行为如何,但是exec.Command在8个不同的线程中启动zenity 8次。更糟糕的是,我想执行这个程序超过100,000次。在goroutines中一次性执行所有这些操作将一点效率都没有。尽管如此,我仍然想利用我的4核CPU!

b) 第二次尝试:使用goroutines池

在线资源倾向于推荐使用sync.WaitGroup来处理这种情况。但这种方法的问题在于,你基本上是在批量处理goroutines:如果我创建了一个有4个成员的WaitGroup,那么Go程序将在所有4个外部程序完成之前等待,然后再调用下一批4个程序。这是不高效的:CPU被浪费了,再次浪费。

其他一些资源推荐使用带缓冲的通道来完成工作:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8               // 调用外部程序的次数
    NumCore := 4             // 可用核心数
    c := make(chan bool, NumCore - 1) 
    for i:=0; i<NumEl; i++ {
        go callProg(i, c)
        c <- true            // 在第NumCore次迭代时,c会阻塞   
    }
}

func callProg(i int, c chan bool) {
    defer func () {<- c}()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

这看起来很丑陋。通道并不是为这个目的而设计的:我正在利用一个副作用。我喜欢defer的概念,但我讨厌不得不声明一个函数(即使是一个lambda)来从我创建的虚拟通道中取出一个值。哦,当然,使用一个虚拟通道本身就很丑陋。

c) 第三次尝试:当所有子进程都死掉时退出

现在我们几乎完成了。我只需要考虑另一个副作用:Go程序在所有zenity弹出窗口关闭之前就关闭了。这是因为当循环完成(在第8次迭代时),没有任何东西阻止程序结束。这时,sync.WaitGroup将会有用。

package main
import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    NumEl := 8               // 调用外部程序的次数
    NumCore := 4             // 可用核心数
    c := make(chan bool, NumCore - 1) 
    wg := new(sync.WaitGroup)
    wg.Add(NumEl)            // 设置goroutines的数量为(0 + NumEl)
    for i:=0; i<NumEl; i++ {
        go callProg(i, c, wg)
        c <- true            // 在第NumCore次迭代时,c会阻塞   
    }
    wg.Wait() // 等待所有子进程死掉
    close(c)
}

func callProg(i int, c chan bool, wg *sync.WaitGroup) {
    defer func () {
        <- c
        wg.Done() // 减少存活的goroutines数量
    }()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

完成。

我的问题

  • 你知道其他限制同时执行的goroutines的正确方法吗?

我不是指线程;Go如何在内部管理goroutines并不重要。我真正的意思是限制一次启动的goroutines的数量:exec.Command每次调用都会创建一个新的线程,所以我应该控制它被调用的次数。

  • 你觉得这段代码看起来还可以吗?
  • 你知道如何避免在这种情况下使用虚拟通道吗?

我无法说服自己这样的虚拟通道是正确的方法。

英文:

TL;DR: Please just go to the last part and tell me how you would solve this problem.

I've begun using Go this morning coming from Python. I want to call a closed-source executable from Go several times, with a bit of concurrency, with different command line arguments. My resulting code is working just well but I'd like to get your input in order to improve it. Since I'm at an early learning stage, I'll also explain my workflow.

For the sake of simplicity, assume here that this "external closed-source program" is zenity, a Linux command line tool that can display graphical message boxes from the command line.

Calling an executable file from Go

So, in Go, I would go like this:

package main
import &quot;os/exec&quot;
func main() {
    cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello World&#39;&quot;)
    cmd.Run()
}

This should be working just right. Note that .Run() is a functional equivalent to .Start() followed by .Wait(). This is great, but if I wanted to execute this program just once, the whole programming stuff would not be worth it. So let's just do that multiple times.

Calling an executable multiple times

Now that I had this working, I'd like to call my program multiple times, with custom command line arguments (here just i for the sake of simplicity).

package main    
import (
    &quot;os/exec&quot;
    &quot;strconv&quot;
)

func main() {
    NumEl := 8 // Number of times the external program is called
    for i:=0; i&lt;NumEl; i++ {
        cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot; + strconv.Itoa(i) + &quot;&#39;&quot;)
        cmd.Run()
    }
}

Ok, we did it! But I still can't see the advantage of Go over Python … This piece of code is actually executed in a serial fashion. I have a multiple-core CPU and I'd like to take advantage of it. So let's add some concurrency with goroutines.

Goroutines, or a way to make my program parallel

a) First attempt: just add "go"s everywhere

Let's rewrite our code to make things easier to call and reuse and add the famous go keyword:

package main
import (
    &quot;os/exec&quot;
    &quot;strconv&quot;
)

func main() {
    NumEl := 8 
    for i:=0; i&lt;NumEl; i++ {
        go callProg(i)  // &lt;--- There!
    }
}

func callProg(i int) {
    cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot; + strconv.Itoa(i) + &quot;&#39;&quot;)
    cmd.Run()
}

Nothing! What is the problem? All the goroutines are executed at once. I don't really know why zenity is not executed but AFAIK, the Go program exited before the zenity external program could even be initialized. This was confirmed by the use of time.Sleep: waiting for a couple of seconds was enough to let the 8 instance of zenity launch themselves. I don't know if this can be considered a bug though.

To make it worse, the real program I'd actually like to call takes a while to execute itself. If I execute 8 instances of this program in parallel on my 4-core CPU, it's gonna waste some time doing a lot of context switching … I don't know how plain Go goroutines behave, but exec.Command will launch zenity 8 times in 8 different threads. To make it even worse, I want to execute this program more than 100,000 times. Doing all of that at once in goroutines won't be efficient at all. Still, I'd like to leverage my 4-core CPU!

b) Second attempt: use pools of goroutines

The online resources tend to recommend the use of sync.WaitGroup for this kind of work. The problem with that approach is that you are basically working with batches of goroutines: if I create of WaitGroup of 4 members, the Go program will wait for all the 4 external programs to finish before calling a new batch of 4 programs. This is not efficient: CPU is wasted, once again.

Some other resources recommended the use of a buffered channel to do the work:

package main
import (
    &quot;os/exec&quot;
    &quot;strconv&quot;
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    for i:=0; i&lt;NumEl; i++ {
        go callProg(i, c)
        c &lt;- true            // At the NumCoreth iteration, c is blocking   
    }
}

func callProg(i int, c chan bool) {
    defer func () {&lt;- c}()
    cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot; + strconv.Itoa(i) + &quot;&#39;&quot;)
    cmd.Run()
}

This seems ugly. Channels were not intended for this purpose: I'm exploiting a side-effect. I love the concept of defer but I hate having to declare a function (even a lambda) to pop a value out of the dummy channel that I created. Oh, and of course, using a dummy channel is, by itself, ugly.

c) Third attempt: die when all the children are dead

Now we are nearly finished. I have just to take into account yet another side effect: the Go program closes before all the zenity pop-ups are closed. This is because when the loop is finised (at the 8th iteration), nothing prevents the program from finishing. This time, sync.WaitGroup will be useful.

package main
import (
    &quot;os/exec&quot;
    &quot;strconv&quot;
    &quot;sync&quot;
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    wg := new(sync.WaitGroup)
    wg.Add(NumEl)            // Set the number of goroutines to (0 + NumEl)
    for i:=0; i&lt;NumEl; i++ {
        go callProg(i, c, wg)
        c &lt;- true            // At the NumCoreth iteration, c is blocking   
    }
    wg.Wait() // Wait for all the children to die
    close(c)
}

func callProg(i int, c chan bool, wg *sync.WaitGroup) {
    defer func () {
        &lt;- c
        wg.Done() // Decrease the number of alive goroutines
    }()
    cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot; + strconv.Itoa(i) + &quot;&#39;&quot;)
    cmd.Run()
}

Done.

My questions

  • Do you know any other proper way to limit the number of goroutines executed at once?

I don't mean threads; how Go manages goroutines internally is not relevant. I really mean limiting the number of goroutines launched at once: exec.Command creates a new thread each time it is called, so I should control the number of time it is called.

  • Does that code look fine to you?
  • Do you know how to avoid the use of a dummy channel in that case?

I can't convince myself that such dummy channels are the way to go.

答案1

得分: 89

我会生成4个工作协程,从一个共享通道中读取任务。那些比其他协程更快的协程(因为它们被调度方式不同或者碰巧得到简单的任务)将从该通道中接收到更多的任务。除此之外,我会使用sync.WaitGroup来等待所有的工作协程完成。剩下的部分只是任务的创建。你可以在这里看到这种方法的一个示例实现:

package main

import (
	"os/exec"
	"strconv"
	"sync"
)

func main() {
	tasks := make(chan *exec.Cmd, 64)

	// 生成四个工作协程
	var wg sync.WaitGroup
	for i := 0; i < 4; i++ {
		wg.Add(1)
		go func() {
			for cmd := range tasks {
				cmd.Run()
			}
			wg.Done()
		}()
	}

	// 生成一些任务
	for i := 0; i < 10; i++ {
		tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
	}
	close(tasks)

	// 等待工作协程完成
	wg.Wait()
}

可能还有其他可行的方法,但我认为这是一个非常简洁易懂的解决方案。

英文:

I would spawn 4 worker goroutines that read the tasks from a common channel. Goroutines that are faster than others (because they are scheduled differently or happen to get simple tasks) will receive more task from this channel than others. In addition to that, I would use a sync.WaitGroup to wait for all workers to finish. The remaining part is just the creation of the tasks. You can see an example implementation of that approach here:

package main

import (
	&quot;os/exec&quot;
	&quot;strconv&quot;
	&quot;sync&quot;
)

func main() {
	tasks := make(chan *exec.Cmd, 64)

	// spawn four worker goroutines
	var wg sync.WaitGroup
	for i := 0; i &lt; 4; i++ {
		wg.Add(1)
		go func() {
			for cmd := range tasks {
				cmd.Run()
			}
			wg.Done()
		}()
	}

	// generate some tasks
	for i := 0; i &lt; 10; i++ {
		tasks &lt;- exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot;+strconv.Itoa(i)+&quot;&#39;&quot;)
	}
	close(tasks)

	// wait for the workers to finish
	wg.Wait()
}

There are probably other possible approaches, but I think this is a very clean solution that is easy to understand.

答案2

得分: 34

一个简单的限流方法(执行f() N次,但最多同时执行maxConcurrency次),只是一个方案:

package main

import (
	"sync"
)

const maxConcurrency = 4 // 例如

var throttle = make(chan int, maxConcurrency)

func main() {
	const N = 100 // 例如
	var wg sync.WaitGroup
	for i := 0; i < N; i++ {
		throttle <- 1 // 任意数字
		wg.Add(1)
		go f(i, &wg, throttle)
	}
	wg.Wait()
}

func f(i int, wg *sync.WaitGroup, throttle chan int) {
	defer wg.Done()
	// 任意处理
	println(i)
	<-throttle
}

Playground

我可能不会将throttle通道称为“dummy”。在我看来,这是一种优雅的方式(当然不是我的发明),用于限制并发。

顺便说一下:请注意您正在忽略cmd.Run()返回的错误。

英文:

A simple approach to throttling (execute f() N times but maximum maxConcurrency concurrently), just a scheme:

package main

import (
        &quot;sync&quot;
)

const maxConcurrency = 4 // for example

var throttle = make(chan int, maxConcurrency)

func main() {
        const N = 100 // for example
        var wg sync.WaitGroup
        for i := 0; i &lt; N; i++ {
                throttle &lt;- 1 // whatever number
                wg.Add(1)
                go f(i, &amp;wg, throttle)
        }
        wg.Wait()
}

func f(i int, wg *sync.WaitGroup, throttle chan int) {
        defer wg.Done()
        // whatever processing
        println(i)
        &lt;-throttle
}

Playground

I wouldn't probably call the throttle channel "dummy". IMHO it's an elegant way (it's not my invention of course), how to limit concurrency.

BTW: Please note that you're ignoring the returned error from cmd.Run().

答案3

得分: 2

🧩 模块


📃 模板

package main

import (
    "fmt"
    "github.com/zenthangplus/goccm"
    "math/rand"
    "runtime"
)

func main() {
    semaphore := goccm.New(runtime.NumCPU())
	
    for {
        semaphore.Wait()

        go func() {
            fmt.Println(rand.Int())
            semaphore.Done()
        }()
    }
    
    semaphore.WaitAllDone()
}

🎰 最佳的协程数量

  • 如果操作是CPU密集型的:runtime.NumCPU()
  • 否则使用以下命令进行测试:time go run *.go

🔨 配置

export GOPATH="$(pwd)/gopath"
go mod init *.go
go mod tidy

🧹 清理

find "${GOPATH}" -exec chmod +w {} \;
rm --recursive --force "${GOPATH}"

英文:

🧩 Modules


📃 Template

package main

import (
    &quot;fmt&quot;
    &quot;github.com/zenthangplus/goccm&quot;
    &quot;math/rand&quot;
    &quot;runtime&quot;
)

func main() {
    semaphore := goccm.New(runtime.NumCPU())
	
    for {
        semaphore.Wait()

        go func() {
            fmt.Println(rand.Int())
            semaphore.Done()
        }()
    }
    
    semaphore.WaitAllDone()
}

🎰 Optimal routine quantity

  • If the operation is CPU bounded: runtime.NumCPU()
  • Otherwise test with: time go run *.go

🔨 Configure

export GOPATH=&quot;$(pwd)/gopath&quot;
go mod init *.go
go mod tidy

🧹 CleanUp

find &quot;${GOPATH}&quot; -exec chmod +w {} \;
rm --recursive --force &quot;${GOPATH}&quot;

答案4

得分: 1

请尝试这个链接:https://github.com/korovkin/limiter

limiter := NewConcurrencyLimiter(10)
limiter.Execute(func() {
zenity(...)
})
limiter.Wait()

英文:

try this:
https://github.com/korovkin/limiter

 limiter := NewConcurrencyLimiter(10)
 limiter.Execute(func() {
  		zenity(...) 
 })
 limiter.Wait()

答案5

得分: 1

你可以使用这篇文章中描述的Worker Pool模式。下面是一个实现的示例代码:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    pool := 4
    intChan := make(chan int)

    for i:=0; i<pool; i++ {
        go callProg(intChan)  // 启动工作协程
    }

    for i:=0;i<NumEl;i++{
        intChan <- i 		// 推送数据给工作协程
    }

    close(intChan) // 安全关闭通道并终止工作协程
}

func callProg(intChan chan int) {
    for i := range intChan{
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}

希望对你有帮助!

英文:

You could use Worker Pool pattern described here in this post.
This is how an implementation would look like ...

package main
import (
    &quot;os/exec&quot;
    &quot;strconv&quot;
)

func main() {
    NumEl := 8 
    pool := 4
    intChan := make(chan int)


    for i:=0; i&lt;pool; i++ {
        go callProg(intChan)  // &lt;--- launch the worker routines
    }

    for i:=0;i&lt;NumEl;i++{
    	intChan &lt;- i 		// &lt;--- push data which will be received by workers
    }

	close(intChan) // &lt;--- will safely close the channel &amp; terminate worker routines
}

func callProg(intChan chan int) {
	for i := range intChan{
		cmd := exec.Command(&quot;zenity&quot;, &quot;--info&quot;, &quot;--text=&#39;Hello from iteration n.&quot; + strconv.Itoa(i) + &quot;&#39;&quot;)
    	cmd.Run()
	}
}

huangapple
  • 本文由 发表于 2013年8月23日 22:14:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/18405023.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定