Golang goroutine 和 mysql

huangapple go评论83阅读模式
英文:

Golang goroutine and mysql

问题

我正在尝试通过goroutines对数据库进行查询,但我不断遇到查询数量限制的问题,无论如何都无法解决这个问题:(

但每次我都会遇到"read: connection reset by peer"或"Too many connection"的错误。

我做错了什么?请帮帮我。提前谢谢。这是我的代码。

type Page struct {
    Stat int
}

func main() {
    cfg := mysql.Config{
        // 一些配置
    }
    // 5000个id
    groups := []int{}

    // 尝试设置缓冲区限制
    pages := make(chan Page, 8)
    for _, id := range groups {
        go getData(id, cfg, pages)
    }
    for _, id := range groups {
        page := <-pages
        fmt.Println(id, page.Stat)
    }
}

func getData(i int, cfg mysql.Config, channel chan Page) {
    db, err := sql.Open("mysql", cfg.FormatDSN())
    db.SetMaxOpenConns(8)
    db.SetMaxIdleConns(8)
    checkError(err)
    rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
    checkError(err)

    defer rows.Close()
    defer db.Close()

    count := 0;
    for rows.Next() {
        err = rows.Scan(&id1)
        checkError(err)
        count++
    }
    channel <- Page{Stat: count}
}
英文:

I'm trying to make queries to the database through goroutines, but I constantly run into limits on the number of queries and I can't solve this problem in any way Golang goroutine 和 mysql

But everytime i have "read: connection reset by peer" or "Too many connection"

What am I doing wrong? Help me please. Thanks in advance. Here is my code.

type Page struct {
    Stat int
}

func main() {
    cfg := mysql.Config{
        // some config
    }
    // 5000 ids
    groups := []int{}

    // trying set buffer limit
    pages := make(chan Page, 8)
    for _, id := range groups {
        go getData(id, cfg, pages)
    }
    for _, id := range groups {
        page := &lt;-pages
        fmt.Println(id, page.Stat)
    }
}

func getData(i int, cfg mysql.Config, channel chan Page) {
    db, err := sql.Open(&quot;mysql&quot;, cfg.FormatDSN())
    db.SetMaxOpenConns(8)
    db.SetMaxIdleConns(8)
    checkError(err)
    rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
    checkError(err)

    defer rows.Close()
    defer db.Close()

    count := 0;
    for rows.Next() {
        err = rows.Scan(&amp;id1)
        checkError(err)
        count++
    }
    channel &lt;- Page{Stat: count}
}

答案1

得分: 1

连接的创建应该在getData之外完成。这段代码可能会并行创建太多的连接(约5000个)。

英文:

The connection creation should be done outside getData. This code can create too many connections (~ 5000) in parallel.

答案2

得分: 0

根据评论,数据库实例应该在goroutine之外处理。但仅仅这样做仍然可能会因为连接池中的打开连接设置而导致错误。例如,你将最大打开连接数设置为8,然后生成所有的goroutine。其中一些goroutine可能会超时等待连接池中的可用连接。

为了获得最佳的无错误性能,你可以改进这一点,优化连接池中连接的使用和goroutine的数量。可以同时活动的最大goroutine数应该等于可用的打开连接数,这样可以确保不会导致任何超时错误。

func main() {

    cfg := mysql.Config{
        // 一些配置
    }

    numberOfConns := 20
    db, err := sql.Open("mysql", cfg.FormatDSN())
    checkError(err)
    
    defer db.Close()
    db.SetMaxOpenConns(numberOfConns)
    db.SetMaxIdleConns(numberOfConns)

    // 5000个ids
    groups := []int{}

    // 尝试设置缓冲限制
    pages := make(chan Page, numberOfConns)
    limit := make(chan bool, numberOfConns)
    for _, id := range groups {
        limit <- true // 根据numberOfConns限制可以生成的goroutine数量
        go getData(id, db, pages, limit)
    }
    for _, id := range groups {
        page := <-pages
        fmt.Println(id, page.Stat)
    }
}

func getData(i int, db *sql.DB, channel chan Page, limit chan bool) {
    rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
    checkError(err)

    defer rows.Close()

    count := 0;
    for rows.Next() {
        err = rows.Scan(&id1)
        checkError(err)
        count++
    }
    channel <- Page{Stat: count}
    <-limit // 释放资源以便启动下一个goroutine
}

希望对你有帮助!

英文:

Per comments, the database instance should be handled outside the goroutines. But doing this alone can still cause errors because of your settings for open connections in the connection pool. For example, you set the maximum open connections to 8, then spawn all your goroutines. Some of the goroutines might time out waiting for an available connection from the connection pool.

You can improve this and optimize the usage of connections in the connection pool and the number of goroutines to get the best error-free performance. The maximum number of goroutines that can be active at one moment should equal the number of available open connections, just to be on the safe side not to cause any timeout errors.

func main() {

    cfg := mysql.Config{
        // some config
    }

    numberOfConns := 20
    db, err := sql.Open(&quot;mysql&quot;, cfg.FormatDSN())
    checkError(err)
    
    defer db.Close()
    db.SetMaxOpenConns(numberOfConns)
    db.SetMaxIdleConns(numberOfConns)

    // 5000 ids
    groups := []int{}

    // trying set buffer limit
    pages := make(chan Page, numberOfConns)
    limit := make(chan bool, numberOfConns)
    for _, id := range groups {
        limit &lt;- true //limits the number of goroutines that can be spawned based on numberOfConns 
        go getData(id, db, pages, limit)
    }
    for _, id := range groups {
        page := &lt;-pages
        fmt.Println(id, page.Stat)
    }
}

func getData(i int, db *sql.DB, channel chan Page, limit chan bool) {
    rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
    checkError(err)

    defer rows.Close()

    count := 0;
    for rows.Next() {
        err = rows.Scan(&amp;id1)
        checkError(err)
        count++
    }
    channel &lt;- Page{Stat: count}
    &lt;-limit // release the resource so next goroutine can be started
}

答案3

得分: -1

这是你的代码的正确版本。

type Page struct {
	Stat int
	Id   int
}

func main() {
	cfg := mysql.Config{
		// 一些配置
	}
	// 5000个id
	groups := []int{1, 2, 3}

	// 这个带缓冲的通道将在并发限制处阻塞
	semaphoreChan := make(chan struct{}, 5)

	// 用于收集结果
	pages := make(chan *Page)

	defer func() {
		close(semaphoreChan)
		close(pages)
	}()

	go func() {
		for _, id := range groups {
			// 这将一个空结构体发送到semaphoreChan,基本上是说将限制增加1,但当达到限制时,阻塞直到有空间
			semaphoreChan <- struct{}{}

			go func(i int) {

				getData(i, cfg, pages)
				// 一旦完成,我们从semaphoreChan中读取,这会将限制减少1,并允许另一个goroutine开始
				<-semaphoreChan

			}(id)
		}
	}()

	var results []Page

	for {
		result := <-pages
		log.Println(result.Stat, result.Id)
		results = append(results, *result)
		if len(results) == len(groups) {
			break
		}
	}

}

func getData(i int, cfg mysql.Config, channel chan *Page) {
	db, err := sql.Open("mysql", cfg.FormatDSN())
	db.SetMaxOpenConns(8)
	db.SetMaxIdleConns(8)
	checkError(err)
	rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
	checkError(err)

	defer rows.Close()
	defer db.Close()

	count := 0
	for rows.Next() {
		err = rows.Scan(&id1)
		checkError(err)
		count++
	}
	channel <- &Page{Stat: count, Id: i}
}

希望对你有帮助!

英文:

Here the proper version of your code.

type Page struct {
Stat int
Id   int
}
func main() {
cfg := mysql.Config{
// some config
}
// 5000 ids
groups := []int{1, 2, 3}
// this buffered channel will block at the concurrency limit
semaphoreChan := make(chan struct{}, 5)
// for collecting result
pages := make(chan *Page)
defer func() {
close(semaphoreChan)
close(pages)
}()
go func() {
for _, id := range groups {
// this sends an empty struct into the semaphoreChan which
// is basically saying add one to the limit, but when the
// limit has been reached block until there is room
semaphoreChan &lt;- struct{}{}
go func(i int) {
getData(i, cfg, pages)
// once we&#39;re done it&#39;s we read from the semaphoreChan which
// has the effect of removing one from the limit and allowing
// another goroutine to start
&lt;-semaphoreChan
}(id)
}
}()
var results []Page
for {
result := &lt;-pages
log.Println(result.Stat, result.Id)
results = append(results, *result)
if len(results) == len(groups) {
break
}
}
}
func getData(i int, cfg mysql.Config, channel chan *Page) {
db, err := sql.Open(&quot;mysql&quot;, cfg.FormatDSN())
db.SetMaxOpenConns(8)
db.SetMaxIdleConns(8)
checkError(err)
rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
checkError(err)
defer rows.Close()
defer db.Close()
count := 0
for rows.Next() {
err = rows.Scan(&amp;id1)
checkError(err)
count++
}
channel &lt;- &amp;Page{Stat: count, Id: i}
}

huangapple
  • 本文由 发表于 2023年1月6日 17:42:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/75029261.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定