Golang goroutine 和 mysql

huangapple go评论102阅读模式

Golang goroutine and mysql



但每次我都会遇到"read: connection reset by peer"或"Too many connection"的错误。


  1. type Page struct {
  2. Stat int
  3. }
  4. func main() {
  5. cfg := mysql.Config{
  6. // 一些配置
  7. }
  8. // 5000个id
  9. groups := []int{}
  10. // 尝试设置缓冲区限制
  11. pages := make(chan Page, 8)
  12. for _, id := range groups {
  13. go getData(id, cfg, pages)
  14. }
  15. for _, id := range groups {
  16. page := <-pages
  17. fmt.Println(id, page.Stat)
  18. }
  19. }
  20. func getData(i int, cfg mysql.Config, channel chan Page) {
  21. db, err := sql.Open("mysql", cfg.FormatDSN())
  22. db.SetMaxOpenConns(8)
  23. db.SetMaxIdleConns(8)
  24. checkError(err)
  25. rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
  26. checkError(err)
  27. defer rows.Close()
  28. defer db.Close()
  29. count := 0;
  30. for rows.Next() {
  31. err = rows.Scan(&id1)
  32. checkError(err)
  33. count++
  34. }
  35. channel <- Page{Stat: count}
  36. }

I'm trying to make queries to the database through goroutines, but I constantly run into limits on the number of queries and I can't solve this problem in any way Golang goroutine 和 mysql

But everytime i have "read: connection reset by peer" or "Too many connection"

What am I doing wrong? Help me please. Thanks in advance. Here is my code.

  1. type Page struct {
  2. Stat int
  3. }
  4. func main() {
  5. cfg := mysql.Config{
  6. // some config
  7. }
  8. // 5000 ids
  9. groups := []int{}
  10. // trying set buffer limit
  11. pages := make(chan Page, 8)
  12. for _, id := range groups {
  13. go getData(id, cfg, pages)
  14. }
  15. for _, id := range groups {
  16. page := &lt;-pages
  17. fmt.Println(id, page.Stat)
  18. }
  19. }
  20. func getData(i int, cfg mysql.Config, channel chan Page) {
  21. db, err := sql.Open(&quot;mysql&quot;, cfg.FormatDSN())
  22. db.SetMaxOpenConns(8)
  23. db.SetMaxIdleConns(8)
  24. checkError(err)
  25. rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
  26. checkError(err)
  27. defer rows.Close()
  28. defer db.Close()
  29. count := 0;
  30. for rows.Next() {
  31. err = rows.Scan(&amp;id1)
  32. checkError(err)
  33. count++
  34. }
  35. channel &lt;- Page{Stat: count}
  36. }


得分: 1



The connection creation should be done outside getData. This code can create too many connections (~ 5000) in parallel.


得分: 0



  1. func main() {
  2. cfg := mysql.Config{
  3. // 一些配置
  4. }
  5. numberOfConns := 20
  6. db, err := sql.Open("mysql", cfg.FormatDSN())
  7. checkError(err)
  8. defer db.Close()
  9. db.SetMaxOpenConns(numberOfConns)
  10. db.SetMaxIdleConns(numberOfConns)
  11. // 5000个ids
  12. groups := []int{}
  13. // 尝试设置缓冲限制
  14. pages := make(chan Page, numberOfConns)
  15. limit := make(chan bool, numberOfConns)
  16. for _, id := range groups {
  17. limit <- true // 根据numberOfConns限制可以生成的goroutine数量
  18. go getData(id, db, pages, limit)
  19. }
  20. for _, id := range groups {
  21. page := <-pages
  22. fmt.Println(id, page.Stat)
  23. }
  24. }
  25. func getData(i int, db *sql.DB, channel chan Page, limit chan bool) {
  26. rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
  27. checkError(err)
  28. defer rows.Close()
  29. count := 0;
  30. for rows.Next() {
  31. err = rows.Scan(&id1)
  32. checkError(err)
  33. count++
  34. }
  35. channel <- Page{Stat: count}
  36. <-limit // 释放资源以便启动下一个goroutine
  37. }



Per comments, the database instance should be handled outside the goroutines. But doing this alone can still cause errors because of your settings for open connections in the connection pool. For example, you set the maximum open connections to 8, then spawn all your goroutines. Some of the goroutines might time out waiting for an available connection from the connection pool.

You can improve this and optimize the usage of connections in the connection pool and the number of goroutines to get the best error-free performance. The maximum number of goroutines that can be active at one moment should equal the number of available open connections, just to be on the safe side not to cause any timeout errors.

  1. func main() {
  2. cfg := mysql.Config{
  3. // some config
  4. }
  5. numberOfConns := 20
  6. db, err := sql.Open(&quot;mysql&quot;, cfg.FormatDSN())
  7. checkError(err)
  8. defer db.Close()
  9. db.SetMaxOpenConns(numberOfConns)
  10. db.SetMaxIdleConns(numberOfConns)
  11. // 5000 ids
  12. groups := []int{}
  13. // trying set buffer limit
  14. pages := make(chan Page, numberOfConns)
  15. limit := make(chan bool, numberOfConns)
  16. for _, id := range groups {
  17. limit &lt;- true //limits the number of goroutines that can be spawned based on numberOfConns
  18. go getData(id, db, pages, limit)
  19. }
  20. for _, id := range groups {
  21. page := &lt;-pages
  22. fmt.Println(id, page.Stat)
  23. }
  24. }
  25. func getData(i int, db *sql.DB, channel chan Page, limit chan bool) {
  26. rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
  27. checkError(err)
  28. defer rows.Close()
  29. count := 0;
  30. for rows.Next() {
  31. err = rows.Scan(&amp;id1)
  32. checkError(err)
  33. count++
  34. }
  35. channel &lt;- Page{Stat: count}
  36. &lt;-limit // release the resource so next goroutine can be started
  37. }


得分: -1


  1. type Page struct {
  2. Stat int
  3. Id int
  4. }
  5. func main() {
  6. cfg := mysql.Config{
  7. // 一些配置
  8. }
  9. // 5000个id
  10. groups := []int{1, 2, 3}
  11. // 这个带缓冲的通道将在并发限制处阻塞
  12. semaphoreChan := make(chan struct{}, 5)
  13. // 用于收集结果
  14. pages := make(chan *Page)
  15. defer func() {
  16. close(semaphoreChan)
  17. close(pages)
  18. }()
  19. go func() {
  20. for _, id := range groups {
  21. // 这将一个空结构体发送到semaphoreChan,基本上是说将限制增加1,但当达到限制时,阻塞直到有空间
  22. semaphoreChan <- struct{}{}
  23. go func(i int) {
  24. getData(i, cfg, pages)
  25. // 一旦完成,我们从semaphoreChan中读取,这会将限制减少1,并允许另一个goroutine开始
  26. <-semaphoreChan
  27. }(id)
  28. }
  29. }()
  30. var results []Page
  31. for {
  32. result := <-pages
  33. log.Println(result.Stat, result.Id)
  34. results = append(results, *result)
  35. if len(results) == len(groups) {
  36. break
  37. }
  38. }
  39. }
  40. func getData(i int, cfg mysql.Config, channel chan *Page) {
  41. db, err := sql.Open("mysql", cfg.FormatDSN())
  42. db.SetMaxOpenConns(8)
  43. db.SetMaxIdleConns(8)
  44. checkError(err)
  45. rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
  46. checkError(err)
  47. defer rows.Close()
  48. defer db.Close()
  49. count := 0
  50. for rows.Next() {
  51. err = rows.Scan(&id1)
  52. checkError(err)
  53. count++
  54. }
  55. channel <- &Page{Stat: count, Id: i}
  56. }



Here the proper version of your code.

  1. type Page struct {
  2. Stat int
  3. Id int
  4. }
  5. func main() {
  6. cfg := mysql.Config{
  7. // some config
  8. }
  9. // 5000 ids
  10. groups := []int{1, 2, 3}
  11. // this buffered channel will block at the concurrency limit
  12. semaphoreChan := make(chan struct{}, 5)
  13. // for collecting result
  14. pages := make(chan *Page)
  15. defer func() {
  16. close(semaphoreChan)
  17. close(pages)
  18. }()
  19. go func() {
  20. for _, id := range groups {
  21. // this sends an empty struct into the semaphoreChan which
  22. // is basically saying add one to the limit, but when the
  23. // limit has been reached block until there is room
  24. semaphoreChan &lt;- struct{}{}
  25. go func(i int) {
  26. getData(i, cfg, pages)
  27. // once we&#39;re done it&#39;s we read from the semaphoreChan which
  28. // has the effect of removing one from the limit and allowing
  29. // another goroutine to start
  30. &lt;-semaphoreChan
  31. }(id)
  32. }
  33. }()
  34. var results []Page
  35. for {
  36. result := &lt;-pages
  37. log.Println(result.Stat, result.Id)
  38. results = append(results, *result)
  39. if len(results) == len(groups) {
  40. break
  41. }
  42. }
  43. }
  44. func getData(i int, cfg mysql.Config, channel chan *Page) {
  45. db, err := sql.Open(&quot;mysql&quot;, cfg.FormatDSN())
  46. db.SetMaxOpenConns(8)
  47. db.SetMaxIdleConns(8)
  48. checkError(err)
  49. rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
  50. checkError(err)
  51. defer rows.Close()
  52. defer db.Close()
  53. count := 0
  54. for rows.Next() {
  55. err = rows.Scan(&amp;id1)
  56. checkError(err)
  57. count++
  58. }
  59. channel &lt;- &amp;Page{Stat: count, Id: i}
  60. }

  • 本文由 发表于 2023年1月6日 17:42:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/75029261.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
