英文:
Running a maximum of two go routines continuously forever
问题
我正在尝试同时运行一个函数。它会调用我的数据库,可能需要2-10秒的时间。我希望一旦完成,它就可以继续下一个例程,即使另一个例程仍在处理中,但最多只能同时处理2个例程。我希望这个过程无限进行下去。我觉得我已经接近成功了,但是waitGroup会强制两个例程等待完成后才能继续下一次迭代。
const ROUTINES = 2
for {
var wg sync.WaitGroup
_, err := db.Exec(`Random DB Call`)
if err != nil {
panic(err)
}
ch := createRoutines(db, &wg)
wg.Add(ROUTINES)
for i := 1; i <= ROUTINES; i++ {
ch <- i
time.Sleep(2 * time.Second)
}
close(ch)
wg.Wait()
}
func createRoutines(db *sqlx.DB, wg *sync.WaitGroup) chan int {
var ch = make(chan int, 5)
for i := 0; i < ROUTINES ; i++ {
go func(db *sqlx.DB) {
defer wg.Done()
for {
_, ok := <-ch
if !ok {
return
}
doStuff(db)
}
}(db)
}
return ch
}
以上是你提供的代码。
英文:
I'm trying to run a function concurrently. It makes a call to my DB that may take 2-10 seconds. I would like it to continue on to the next routine once it has finished, even if the other one is still processing, but only ever want it be processing a max of 2 at a time. I want this to happen indefinitely. I feel like I'm almost there, but waitGroup forces both routines to wait until completion prior to continuing to another iteration.
const ROUTINES = 2;
for {
var wg sync.WaitGroup
_, err:= db.Exec(`Random DB Call`)
if err != nil {
panic(err)
}
ch := createRoutines(db, &wg)
wg.Add(ROUTINES)
for i := 1; i <= ROUTINES; i++ {
ch <- i
time.Sleep(2 * time.Second)
}
close(ch)
wg.Wait()
}
func createRoutines(db *sqlx.DB, wg *sync.WaitGroup) chan int {
var ch = make(chan int, 5)
for i := 0; i < ROUTINES ; i++ {
go func(db *sqlx.DB) {
defer wg.Done()
for {
_, ok := <-ch
if !ok {
return
}
doStuff(db)
}
}(db)
}
return ch
}
答案1
得分: 2
如果你只想同时运行n个goroutine,你可以使用大小为n的缓冲通道,并在没有剩余空间时使用它来阻塞创建新的goroutine,就像这样:
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
const ROUTINES = 2
rand.Seed(time.Now().UnixNano())
stopper := make(chan struct{}, ROUTINES)
var counter int
for {
counter++
stopper <- struct{}{}
go func(c int) {
fmt.Println("+ Starting goroutine", c)
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
fmt.Println("- Stopping goroutine", c)
<-stopper
}(counter)
}
}
在这个例子中,你可以看到只能同时有ROUTINES个goroutine存在,每个goroutine的生命周期为0、1或2秒。在输出中,你还可以看到每当一个goroutine结束时,另一个goroutine就会开始。
英文:
If you need to only have n number of goroutines running at the same time, you can have a buffered channel of size n and use that to block creating new goroutines when there is no space left, something like this
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
const ROUTINES = 2
rand.Seed(time.Now().UnixNano())
stopper := make(chan struct{}, ROUTINES)
var counter int
for {
counter++
stopper <- struct{}{}
go func(c int) {
fmt.Println("+ Starting goroutine", c)
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
fmt.Println("- Stopping goroutine", c)
<-stopper
}(counter)
}
}
In this example you see how you can only have ROUTINES number of goroutines that live 0, 1 or 2 seconds. In the output you can also see how every time one goroutine ends another one starts.
答案2
得分: 0
这会添加一个外部依赖项,但考虑以下实现:
package main
import (
"context"
"database/sql"
"log"
"github.com/MicahParks/ctxerrpool"
)
func main() {
// 创建一个包含2个工作线程的数据库查询池。记录任何错误。
databasePool := ctxerrpool.New(2, func(_ ctxerrpool.Pool, err error) {
log.Printf("执行数据库查询失败。\n错误:%s", err.Error())
})
// 获取要执行的查询列表。
queries := []string{
"SELECT first_name, last_name FROM customers",
"SELECT price FROM inventory WHERE sku='1234'",
"其他查询...",
}
// TODO 建立数据库连接。
var db *sql.DB
for _, query := range queries {
// 为了作用域,有意遮蔽循环变量。
query := query
// 在一个工作线程上执行查询。如果没有可用的工作线程,它将阻塞直到有一个可用。
databasePool.AddWorkItem(context.TODO(), func(workCtx context.Context) (err error) {
_, err = db.ExecContext(workCtx, query)
return err
})
}
// 等待所有工作线程完成。
databasePool.Wait()
}
希望这对你有帮助!
英文:
This adds an external dependency, but consider this implementation:
package main
import (
"context"
"database/sql"
"log"
"github.com/MicahParks/ctxerrpool"
)
func main() {
// Create a pool of 2 workers for database queries. Log any errors.
databasePool := ctxerrpool.New(2, func(_ ctxerrpool.Pool, err error) {
log.Printf("Failed to execute database query.\nError: %s", err.Error())
})
// Get a list of queries to execute.
queries := []string{
"SELECT first_name, last_name FROM customers",
"SELECT price FROM inventory WHERE sku='1234'",
"other queries...",
}
// TODO Make a database connection.
var db *sql.DB
for _, query := range queries {
// Intentionally shadow the looped variable for scope.
query := query
// Perform the query on a worker. If no worker is ready, it will block until one is.
databasePool.AddWorkItem(context.TODO(), func(workCtx context.Context) (err error) {
_, err = db.ExecContext(workCtx, query)
return err
})
}
// Wait for all workers to finish.
databasePool.Wait()
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论