持续无限运行最多两个Go协程。

huangapple go评论106阅读模式
英文:

Running a maximum of two go routines continuously forever

问题

我正在尝试同时运行一个函数。它会调用我的数据库,可能需要2-10秒的时间。我希望一旦完成,它就可以继续下一个例程,即使另一个例程仍在处理中,但最多只能同时处理2个例程。我希望这个过程无限进行下去。我觉得我已经接近成功了,但是waitGroup会强制两个例程等待完成后才能继续下一次迭代。

  1. const ROUTINES = 2
  2. for {
  3. var wg sync.WaitGroup
  4. _, err := db.Exec(`Random DB Call`)
  5. if err != nil {
  6. panic(err)
  7. }
  8. ch := createRoutines(db, &wg)
  9. wg.Add(ROUTINES)
  10. for i := 1; i <= ROUTINES; i++ {
  11. ch <- i
  12. time.Sleep(2 * time.Second)
  13. }
  14. close(ch)
  15. wg.Wait()
  16. }
  17. func createRoutines(db *sqlx.DB, wg *sync.WaitGroup) chan int {
  18. var ch = make(chan int, 5)
  19. for i := 0; i < ROUTINES ; i++ {
  20. go func(db *sqlx.DB) {
  21. defer wg.Done()
  22. for {
  23. _, ok := <-ch
  24. if !ok {
  25. return
  26. }
  27. doStuff(db)
  28. }
  29. }(db)
  30. }
  31. return ch
  32. }

以上是你提供的代码。

英文:

I'm trying to run a function concurrently. It makes a call to my DB that may take 2-10 seconds. I would like it to continue on to the next routine once it has finished, even if the other one is still processing, but only ever want it be processing a max of 2 at a time. I want this to happen indefinitely. I feel like I'm almost there, but waitGroup forces both routines to wait until completion prior to continuing to another iteration.

  1. const ROUTINES = 2;
  2. for {
  3. var wg sync.WaitGroup
  4. _, err:= db.Exec(`Random DB Call`)
  5. if err != nil {
  6. panic(err)
  7. }
  8. ch := createRoutines(db, &amp;wg)
  9. wg.Add(ROUTINES)
  10. for i := 1; i &lt;= ROUTINES; i++ {
  11. ch &lt;- i
  12. time.Sleep(2 * time.Second)
  13. }
  14. close(ch)
  15. wg.Wait()
  16. }
  17. func createRoutines(db *sqlx.DB, wg *sync.WaitGroup) chan int {
  18. var ch = make(chan int, 5)
  19. for i := 0; i &lt; ROUTINES ; i++ {
  20. go func(db *sqlx.DB) {
  21. defer wg.Done()
  22. for {
  23. _, ok := &lt;-ch
  24. if !ok {
  25. return
  26. }
  27. doStuff(db)
  28. }
  29. }(db)
  30. }
  31. return ch
  32. }

答案1

得分: 2

如果你只想同时运行n个goroutine,你可以使用大小为n的缓冲通道,并在没有剩余空间时使用它来阻塞创建新的goroutine,就像这样:

  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "time"
  6. )
  7. func main() {
  8. const ROUTINES = 2
  9. rand.Seed(time.Now().UnixNano())
  10. stopper := make(chan struct{}, ROUTINES)
  11. var counter int
  12. for {
  13. counter++
  14. stopper <- struct{}{}
  15. go func(c int) {
  16. fmt.Println("+ Starting goroutine", c)
  17. time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
  18. fmt.Println("- Stopping goroutine", c)
  19. <-stopper
  20. }(counter)
  21. }
  22. }

在这个例子中,你可以看到只能同时有ROUTINES个goroutine存在,每个goroutine的生命周期为0、1或2秒。在输出中,你还可以看到每当一个goroutine结束时,另一个goroutine就会开始。

英文:

If you need to only have n number of goroutines running at the same time, you can have a buffered channel of size n and use that to block creating new goroutines when there is no space left, something like this

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;math/rand&quot;
  5. &quot;time&quot;
  6. )
  7. func main() {
  8. const ROUTINES = 2
  9. rand.Seed(time.Now().UnixNano())
  10. stopper := make(chan struct{}, ROUTINES)
  11. var counter int
  12. for {
  13. counter++
  14. stopper &lt;- struct{}{}
  15. go func(c int) {
  16. fmt.Println(&quot;+ Starting goroutine&quot;, c)
  17. time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
  18. fmt.Println(&quot;- Stopping goroutine&quot;, c)
  19. &lt;-stopper
  20. }(counter)
  21. }
  22. }

In this example you see how you can only have ROUTINES number of goroutines that live 0, 1 or 2 seconds. In the output you can also see how every time one goroutine ends another one starts.

答案2

得分: 0

这会添加一个外部依赖项,但考虑以下实现:

  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "log"
  6. "github.com/MicahParks/ctxerrpool"
  7. )
  8. func main() {
  9. // 创建一个包含2个工作线程的数据库查询池。记录任何错误。
  10. databasePool := ctxerrpool.New(2, func(_ ctxerrpool.Pool, err error) {
  11. log.Printf("执行数据库查询失败。\n错误:%s", err.Error())
  12. })
  13. // 获取要执行的查询列表。
  14. queries := []string{
  15. "SELECT first_name, last_name FROM customers",
  16. "SELECT price FROM inventory WHERE sku='1234'",
  17. "其他查询...",
  18. }
  19. // TODO 建立数据库连接。
  20. var db *sql.DB
  21. for _, query := range queries {
  22. // 为了作用域,有意遮蔽循环变量。
  23. query := query
  24. // 在一个工作线程上执行查询。如果没有可用的工作线程,它将阻塞直到有一个可用。
  25. databasePool.AddWorkItem(context.TODO(), func(workCtx context.Context) (err error) {
  26. _, err = db.ExecContext(workCtx, query)
  27. return err
  28. })
  29. }
  30. // 等待所有工作线程完成。
  31. databasePool.Wait()
  32. }

希望这对你有帮助!

英文:

This adds an external dependency, but consider this implementation:

  1. package main
  2. import (
  3. &quot;context&quot;
  4. &quot;database/sql&quot;
  5. &quot;log&quot;
  6. &quot;github.com/MicahParks/ctxerrpool&quot;
  7. )
  8. func main() {
  9. // Create a pool of 2 workers for database queries. Log any errors.
  10. databasePool := ctxerrpool.New(2, func(_ ctxerrpool.Pool, err error) {
  11. log.Printf(&quot;Failed to execute database query.\nError: %s&quot;, err.Error())
  12. })
  13. // Get a list of queries to execute.
  14. queries := []string{
  15. &quot;SELECT first_name, last_name FROM customers&quot;,
  16. &quot;SELECT price FROM inventory WHERE sku=&#39;1234&#39;&quot;,
  17. &quot;other queries...&quot;,
  18. }
  19. // TODO Make a database connection.
  20. var db *sql.DB
  21. for _, query := range queries {
  22. // Intentionally shadow the looped variable for scope.
  23. query := query
  24. // Perform the query on a worker. If no worker is ready, it will block until one is.
  25. databasePool.AddWorkItem(context.TODO(), func(workCtx context.Context) (err error) {
  26. _, err = db.ExecContext(workCtx, query)
  27. return err
  28. })
  29. }
  30. // Wait for all workers to finish.
  31. databasePool.Wait()
  32. }

huangapple
  • 本文由 发表于 2021年10月30日 09:01:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/69776060.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定