持续无限运行最多两个Go协程。

huangapple go评论83阅读模式
英文:

Running a maximum of two go routines continuously forever

问题

我正在尝试同时运行一个函数。它会调用我的数据库,可能需要2-10秒的时间。我希望一旦完成,它就可以继续下一个例程,即使另一个例程仍在处理中,但最多只能同时处理2个例程。我希望这个过程无限进行下去。我觉得我已经接近成功了,但是waitGroup会强制两个例程等待完成后才能继续下一次迭代。

const ROUTINES = 2
for {
    var wg sync.WaitGroup
    _, err := db.Exec(`Random DB Call`)
    if err != nil {
        panic(err)
    }
    ch := createRoutines(db, &wg)
    wg.Add(ROUTINES)
    for i := 1; i <= ROUTINES; i++ {
        ch <- i
        time.Sleep(2 * time.Second)
    }

    close(ch)
    wg.Wait() 
}

func createRoutines(db *sqlx.DB, wg *sync.WaitGroup) chan int {
    var ch = make(chan int, 5)
    for i := 0; i < ROUTINES ; i++ {
        go func(db *sqlx.DB) {
            defer wg.Done()
            for {
                _, ok := <-ch
                if !ok { 
                    return
                }
                doStuff(db) 
            }
        }(db)
    }
    return ch
}

以上是你提供的代码。

英文:

I'm trying to run a function concurrently. It makes a call to my DB that may take 2-10 seconds. I would like it to continue on to the next routine once it has finished, even if the other one is still processing, but only ever want it be processing a max of 2 at a time. I want this to happen indefinitely. I feel like I'm almost there, but waitGroup forces both routines to wait until completion prior to continuing to another iteration.

const ROUTINES = 2;
for {
			var wg sync.WaitGroup
			_, err:= db.Exec(`Random DB Call`)
			if err != nil {
				panic(err)
			}
			ch := createRoutines(db, &amp;wg)
			wg.Add(ROUTINES)
			for i := 1; i &lt;= ROUTINES; i++ {
				ch &lt;- i
				time.Sleep(2 * time.Second)
			}

			close(ch)
			wg.Wait() 
		}


func createRoutines(db *sqlx.DB, wg *sync.WaitGroup) chan int {
	var ch = make(chan int, 5)
	for i := 0; i &lt; ROUTINES ; i++ {
		go func(db *sqlx.DB) {
			defer wg.Done()
			for {
				_, ok := &lt;-ch
				if !ok { 
					return
				}
				doStuff(db) 

			}
		}(db)

	}
	return ch
}

答案1

得分: 2

如果你只想同时运行n个goroutine,你可以使用大小为n的缓冲通道,并在没有剩余空间时使用它来阻塞创建新的goroutine,就像这样:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {

	const ROUTINES = 2
	rand.Seed(time.Now().UnixNano())

	stopper := make(chan struct{}, ROUTINES)
	var counter int

	for {
		counter++
		stopper <- struct{}{}
		go func(c int) {
			fmt.Println("+ Starting goroutine", c)
			time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
			fmt.Println("- Stopping goroutine", c)
			<-stopper
		}(counter)
	}
}

在这个例子中,你可以看到只能同时有ROUTINES个goroutine存在,每个goroutine的生命周期为0、1或2秒。在输出中,你还可以看到每当一个goroutine结束时,另一个goroutine就会开始。

英文:

If you need to only have n number of goroutines running at the same time, you can have a buffered channel of size n and use that to block creating new goroutines when there is no space left, something like this

package main

import (
	&quot;fmt&quot;
	&quot;math/rand&quot;
	&quot;time&quot;
)

func main() {

	const ROUTINES = 2
	rand.Seed(time.Now().UnixNano())

	stopper := make(chan struct{}, ROUTINES)
	var counter int

	for {
		counter++
		stopper &lt;- struct{}{}
		go func(c int) {
			fmt.Println(&quot;+ Starting goroutine&quot;, c)
			time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
			fmt.Println(&quot;- Stopping goroutine&quot;, c)
			&lt;-stopper
		}(counter)
	}
}

In this example you see how you can only have ROUTINES number of goroutines that live 0, 1 or 2 seconds. In the output you can also see how every time one goroutine ends another one starts.

答案2

得分: 0

这会添加一个外部依赖项,但考虑以下实现:

package main

import (
	"context"
	"database/sql"
	"log"

	"github.com/MicahParks/ctxerrpool"
)

func main() {

	// 创建一个包含2个工作线程的数据库查询池。记录任何错误。
	databasePool := ctxerrpool.New(2, func(_ ctxerrpool.Pool, err error) {
		log.Printf("执行数据库查询失败。\n错误:%s", err.Error())
	})

	// 获取要执行的查询列表。
	queries := []string{
		"SELECT first_name, last_name FROM customers",
		"SELECT price FROM inventory WHERE sku='1234'",
		"其他查询...",
	}

	// TODO 建立数据库连接。
	var db *sql.DB

	for _, query := range queries {

		// 为了作用域,有意遮蔽循环变量。
		query := query

		// 在一个工作线程上执行查询。如果没有可用的工作线程,它将阻塞直到有一个可用。
		databasePool.AddWorkItem(context.TODO(), func(workCtx context.Context) (err error) {
			_, err = db.ExecContext(workCtx, query)
			return err
		})
	}

	// 等待所有工作线程完成。
	databasePool.Wait()
}

希望这对你有帮助!

英文:

This adds an external dependency, but consider this implementation:

package main

import (
	&quot;context&quot;
	&quot;database/sql&quot;
	&quot;log&quot;

	&quot;github.com/MicahParks/ctxerrpool&quot;
)

func main() {

	// Create a pool of 2 workers for database queries. Log any errors.
	databasePool := ctxerrpool.New(2, func(_ ctxerrpool.Pool, err error) {
		log.Printf(&quot;Failed to execute database query.\nError: %s&quot;, err.Error())
	})

	// Get a list of queries to execute.
	queries := []string{
		&quot;SELECT first_name, last_name FROM customers&quot;,
		&quot;SELECT price FROM inventory WHERE sku=&#39;1234&#39;&quot;,
		&quot;other queries...&quot;,
	}

	// TODO Make a database connection.
	var db *sql.DB

	for _, query := range queries {

		// Intentionally shadow the looped variable for scope.
		query := query

		// Perform the query on a worker. If no worker is ready, it will block until one is.
		databasePool.AddWorkItem(context.TODO(), func(workCtx context.Context) (err error) {
			_, err = db.ExecContext(workCtx, query)
			return err
		})
	}

	// Wait for all workers to finish.
	databasePool.Wait()
}

huangapple
  • 本文由 发表于 2021年10月30日 09:01:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/69776060.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定