英文:
Always have x number of goroutines running at any time
问题
我看到很多关于如何让Go等待x个goroutine完成的教程和示例,但我想做的是确保始终有x个goroutine在运行,所以一旦一个结束,就启动一个新的goroutine。
具体来说,我有几十万个要做的事情,这些事情是处理从MySQL中获取的一些数据。所以它的工作方式如下:
db, err := sql.Open("mysql", connection_string)
checkErr(err)
defer db.Close()
rows,err := db.Query(`SELECT id FROM table`)
checkErr(err)
defer rows.Close()
var id uint
for rows.Next() {
err := rows.Scan(&id)
checkErr(err)
go processTheThing(id)
}
checkErr(err)
rows.Close()
目前,这将启动几十万个processTheThing()
的线程。我需要的是最多启动x个(我们称之为20)goroutines。所以它从第一个20行开始启动20个goroutine,然后每当当前的goroutine之一结束时,它就会为下一个id启动一个新的goroutine。因此,在任何时刻都始终有20个正在运行。
我相信这是相当简单/标准的,但我似乎找不到任何教程或示例对此进行很好的解释或说明如何实现。
英文:
I see lots of tutorials and examples on how to make Go wait for x number of goroutines to finish, but what I'm trying to do is have ensure there are always x number running, so a new goroutine is launched as soon as one ends.
Specifically I have a few hundred thousand 'things to do' which is processing some stuff that is coming out of MySQL. So it works like this:
db, err := sql.Open("mysql", connection_string)
checkErr(err)
defer db.Close()
rows,err := db.Query(`SELECT id FROM table`)
checkErr(err)
defer rows.Close()
var id uint
for rows.Next() {
err := rows.Scan(&id)
checkErr(err)
go processTheThing(id)
}
checkErr(err)
rows.Close()
Currently that will launch several hundred thousand threads of processTheThing()
. What I need is that a maximum of x number (we'll call it 20) goroutines are launched. So it starts by launching 20 for the first 20 rows, and from then on it will launch a new goroutine for the next id the moment that one of the current goroutines has finished. So at any point in time there are always 20 running.
I'm sure this is quite simple/standard, but I can't seem to find a good explanation on any of the tutorials or examples or how this is done.
答案1
得分: 109
你可能会对Go并发模式这篇文章感兴趣,特别是其中的“有界并行性”部分,它解释了你所需要的确切模式。
你可以使用空结构体通道作为限制保护来控制并发工作协程的数量:
package main
import "fmt"
func main() {
maxGoroutines := 10
guard := make(chan struct{}, maxGoroutines)
for i := 0; i < 30; i++ {
guard <- struct{}{} // 如果guard通道已满,会阻塞在这里
go func(n int) {
worker(n)
<-guard
}(i)
}
}
func worker(i int) { fmt.Println("正在处理", i) }
英文:
You may find Go Concurrency Patterns article interesting, especially Bounded parallelism section, it explains the exact pattern you need.
You can use channel of empty structs as a limiting guard to control number of concurrent worker goroutines:
package main
import "fmt"
func main() {
maxGoroutines := 10
guard := make(chan struct{}, maxGoroutines)
for i := 0; i < 30; i++ {
guard <- struct{}{} // would block if guard channel is already filled
go func(n int) {
worker(n)
<-guard
}(i)
}
}
func worker(i int) { fmt.Println("doing work on", i) }
答案2
得分: 60
这里我认为像这样简单的代码可以工作:
package main
import "fmt"
const MAX = 20
func main() {
sem := make(chan int, MAX)
for {
sem <- 1 // 如果sem中有MAX个整数,将会阻塞
go func() {
fmt.Println("再次你好,世界")
<-sem // 从sem中移除一个整数,允许另一个继续执行
}()
}
}
英文:
Here I think something simple like this will work :
package main
import "fmt"
const MAX = 20
func main() {
sem := make(chan int, MAX)
for {
sem <- 1 // will block if there is MAX ints in sem
go func() {
fmt.Println("hello again, world")
<-sem // removes an int from sem, allowing another to proceed
}()
}
}
答案3
得分: 42
感谢大家对我提供帮助。然而,我觉得没有人真正提供了既有效又简单易懂的解决方案,尽管你们都帮助我理解了这个技术。
最终,我做的事情更容易理解和实用,作为对我具体问题的答案,我将在这里发布,以防其他人有同样的问题。
不知何故,这最后看起来很像OneOfOne发布的代码,这很好,因为现在我理解了。但是,一开始我发现OneOfOne的代码非常难以理解,因为将函数传递给函数使得很难理解哪一部分是干什么的。我认为这种方式更有意义:
package main
import (
"fmt"
"sync"
)
const xthreads = 5 // Total number of threads to use, excluding the main() thread
func doSomething(a int) {
fmt.Println("My job is", a)
return
}
func main() {
var ch = make(chan int, 50) // This number 50 can be anything as long as it's larger than xthreads
var wg sync.WaitGroup
// This starts xthreads number of goroutines that wait for something to do
wg.Add(xthreads)
for i := 0; i < xthreads; i++ {
go func() {
for {
a, ok := <-ch
if !ok { // if there is nothing to do and the channel has been closed then end the goroutine
wg.Done()
return
}
doSomething(a) // do the thing
}
}()
}
// Now the jobs can be added to the channel, which is used as a queue
for i := 0; i < 50; i++ {
ch <- i // add i to the queue
}
close(ch) // This tells the goroutines there's nothing else to do
wg.Wait() // Wait for the threads to finish
}
希望能对你有所帮助。
英文:
Thanks to everyone for helping me out with this. However, I don't feel that anyone really provided something that both worked and was simple/understandable, although you did all help me understand the technique.
What I have done in the end is I think much more understandable and practical as an answer to my specific question, so I will post it here in case anyone else has the same question.
Somehow this ended up looking a lot like what OneOfOne posted, which is great because now I understand that. But OneOfOne's code I found very difficult to understand at first because of the passing functions to functions made it quite confusing to understand what bit was for what. I think this way makes a lot more sense:
package main
import (
"fmt"
"sync"
)
const xthreads = 5 // Total number of threads to use, excluding the main() thread
func doSomething(a int) {
fmt.Println("My job is",a)
return
}
func main() {
var ch = make(chan int, 50) // This number 50 can be anything as long as it's larger than xthreads
var wg sync.WaitGroup
// This starts xthreads number of goroutines that wait for something to do
wg.Add(xthreads)
for i:=0; i<xthreads; i++ {
go func() {
for {
a, ok := <-ch
if !ok { // if there is nothing to do and the channel has been closed then end the goroutine
wg.Done()
return
}
doSomething(a) // do the thing
}
}()
}
// Now the jobs can be added to the channel, which is used as a queue
for i:=0; i<50; i++ {
ch <- i // add i to the queue
}
close(ch) // This tells the goroutines there's nothing else to do
wg.Wait() // Wait for the threads to finish
}
答案4
得分: 16
- 创建用于传递数据给 goroutine 的通道。
- 启动 20 个 goroutine,在循环中处理来自通道的数据。
- 将数据发送到通道而不是启动新的 goroutine。
英文:
- Create channel for passing data to goroutines.
- Start 20 goroutines that processes the data from channel in a loop.
- Send the data to the channel instead of starting a new goroutine.
答案5
得分: 11
Grzegorz Żur的答案是最高效的方法,但对于新手来说,如果不阅读代码,可能很难实现,所以这里提供一个非常简单的实现:
type idProcessor func(id uint)
func SpawnStuff(limit uint, proc idProcessor) chan<- uint {
ch := make(chan uint)
for i := uint(0); i < limit; i++ {
go func() {
for {
id, ok := <-ch
if !ok {
return
}
proc(id)
}
}()
}
return ch
}
func main() {
runtime.GOMAXPROCS(4)
var wg sync.WaitGroup //这只是为了演示,否则main函数会返回
fn := func(id uint) {
fmt.Println(id)
wg.Done()
}
wg.Add(1000)
ch := SpawnStuff(10, fn)
for i := uint(0); i < 1000; i++ {
ch <- i
}
close(ch) //应该这样做以使所有的goroutine正常退出
wg.Wait()
}
英文:
Grzegorz Żur's answer is the most efficient way to do it, but for a newcomer it could be hard to implement without reading code, so here's a very simple implementation:
type idProcessor func(id uint)
func SpawnStuff(limit uint, proc idProcessor) chan<- uint {
ch := make(chan uint)
for i := uint(0); i < limit; i++ {
go func() {
for {
id, ok := <-ch
if !ok {
return
}
proc(id)
}
}()
}
return ch
}
func main() {
runtime.GOMAXPROCS(4)
var wg sync.WaitGroup //this is just for the demo, otherwise main will return
fn := func(id uint) {
fmt.Println(id)
wg.Done()
}
wg.Add(1000)
ch := SpawnStuff(10, fn)
for i := uint(0); i < 1000; i++ {
ch <- i
}
close(ch) //should do this to make all the goroutines exit gracefully
wg.Wait()
}
答案6
得分: 2
这是一个简单的生产者-消费者问题,在Go语言中可以通过使用通道(channels)来缓冲数据包来轻松解决。
简单来说:创建一个接收ID的通道。运行一些循环读取通道中的ID并进行处理的例程。然后运行一个循环将ID发送到通道中。
示例代码:
func producer() {
var buffer = make(chan uint)
for i := 0; i < 20; i++ {
go consumer(buffer)
}
for _, id := range IDs {
buffer <- id
}
}
func consumer(buffer chan uint) {
for {
id := <- buffer
// 在这里进行处理
}
}
需要注意的事项:
- 无缓冲通道是阻塞的:如果写入通道的项目没有被接收,写入项目的例程将被阻塞,直到有例程接收为止。
- 我的示例缺少关闭机制:你必须找到一种方法,使生产者在返回之前等待所有消费者结束它们的循环。最简单的方法是使用另一个通道。我让你自己思考一下。
英文:
This is a simple producer-consumer problem, which in Go can be easily solved using channels to buffer the paquets.
To put it simple: create a channel that accept your IDs. Run a number of routines which will read from the channel in a loop then process the ID. Then run your loop that will feed IDs to the channel.
Example:
func producer() {
var buffer = make(chan uint)
for i := 0; i < 20; i++ {
go consumer(buffer)
}
for _, id := range IDs {
buffer <- id
}
}
func consumer(buffer chan uint) {
for {
id := <- buffer
// Do your things here
}
}
Things to know:
- Unbuffered channels are blocking: if the item wrote into the channel isn't accepted, the routine feeding the item will block until it is
- My example lack a closing mechanism: you must find a way to make the producer to wait for all consumers to end their loop before returning. The simplest way to do this is with another channel. I let you think about it.
答案7
得分: 0
我写了一个简单的包来处理Golang的并发。这个包可以帮助你限制同时运行的goroutine的数量:https://github.com/zenthangplus/goccm
示例代码:
package main
import (
"fmt"
"goccm"
"time"
)
func main() {
// 限制同时运行的goroutine数量为3个。
c := goccm.New(3)
for i := 1; i <= 10; i++ {
// 在任何goroutine之前调用这个函数
c.Wait()
go func(i int) {
fmt.Printf("Job %d is running\n", i)
time.Sleep(2 * time.Second)
// 当一个goroutine完成时调用这个函数
// 或者你可以在goroutine的顶部使用`defer c.Done()`
c.Done()
}(i)
}
// 在关闭主程序之后,调用这个函数以确保所有的goroutine都已经完成。
c.WaitAllDone()
}
英文:
I've wrote a simple package to handle concurrency for Golang. This package will help you limit the number of goroutines that are allowed to run concurrently:
https://github.com/zenthangplus/goccm
Example:
package main
import (
"fmt"
"goccm"
"time"
)
func main() {
// Limit 3 goroutines to run concurrently.
c := goccm.New(3)
for i := 1; i <= 10; i++ {
// This function have to call before any goroutine
c.Wait()
go func(i int) {
fmt.Printf("Job %d is running\n", i)
time.Sleep(2 * time.Second)
// This function have to when a goroutine has finished
// Or you can use `defer c.Done()` at the top of goroutine.
c.Done()
}(i)
}
// This function have to call to ensure all goroutines have finished
// after close the main program.
c.WaitAllDone()
}
答案8
得分: 0
还可以在这里查看:https://github.com/LiangfengChen/goutil/blob/main/concurrent.go
示例可以参考测试用例。
func TestParallelCall(t *testing.T) {
format := "test:%d"
data := make(map[int]bool)
mutex := sync.Mutex{}
val, err := ParallelCall(1000, 10, func(pos int) (interface{}, error) {
mutex.Lock()
defer mutex.Unlock()
data[pos] = true
return pos, errors.New(fmt.Sprintf(format, pos))
})
for i := 0; i < 1000; i++ {
if _, ok := data[i]; !ok {
t.Errorf("TestParallelCall pos not found: %d", i)
}
if val[i] != i {
t.Errorf("TestParallelCall return value is not right (%d,%v)", i, val[i])
}
if err[i].Error() != fmt.Sprintf(format, i) {
t.Errorf("TestParallelCall error msg is not correct (%d,%v)", i, err[i])
}
}
}
英文:
Also can take a look here: https://github.com/LiangfengChen/goutil/blob/main/concurrent.go
The example can refer the test case.
func TestParallelCall(t *testing.T) {
format := "test:%d"
data := make(map[int]bool)
mutex := sync.Mutex{}
val, err := ParallelCall(1000, 10, func(pos int) (interface{}, error) {
mutex.Lock()
defer mutex.Unlock()
data[pos] = true
return pos, errors.New(fmt.Sprintf(format, pos))
})
for i := 0; i < 1000; i++ {
if _, ok := data[i]; !ok {
t.Errorf("TestParallelCall pos not found: %d", i)
}
if val[i] != i {
t.Errorf("TestParallelCall return value is not right (%d,%v)", i, val[i])
}
if err[i].Error() != fmt.Sprintf(format, i) {
t.Errorf("TestParallelCall error msg is not correct (%d,%v)", i, err[i])
}
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论