英文:
Goroutines blocked connection pool
问题
以下是翻译好的代码部分:
package main
import (
"database/sql"
"fmt"
_ "github.com/lib/pq"
"sync"
)
func main() {
db, _ := sql.Open("postgres", fmt.Sprintf("host=%s dbname=%s user=%s sslmode=disable", "localhost", "dbname", "postgres"))
defer db.Close()
db.SetMaxOpenConns(15)
var wg sync.WaitGroup
for i := 0; i < 15; i++ {
wg.Add(1)
go func() {
defer wg.Done()
//#1
rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
for rows.Next() {
//#2
db.Exec("SELECT * FROM reviews LIMIT 1")
}
}()
}
wg.Wait()
}
查询#1打开了15个连接,当执行rows.Next()
时,它们将被关闭。但是rows.Next()
永远不会被执行,因为它包含等待空闲连接的db.Exec()
。
如何解决这个问题?
英文:
package main
import (
"database/sql"
"fmt"
_ "github.com/lib/pq"
"sync"
)
func main() {
db, _ := sql.Open("postgres", fmt.Sprintf("host=%s dbname=%s user=%s sslmode=disable", "localhost", "dbname", "postgres"))
defer db.Close()
db.SetMaxOpenConns(15)
var wg sync.WaitGroup
for i := 0; i < 15; i++ {
wg.Add(1)
go func() {
defer wg.Done()
//#1
rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
for rows.Next() {
//#2
db.Exec("SELECT * FROM reviews LIMIT 1")
}
}()
}
wg.Wait()
}
Query #1 opens 15 connections and they will be closed when rows.Next()
is executed. But rows.Next()
will be never executed because it contains db.Exec()
that waits for a free connection.
How to solve this problem?
答案1
得分: 7
你遇到的是一个死锁问题。在最坏的情况下,你有15个goroutine持有15个数据库连接,而这15个goroutine都需要一个新的连接才能继续执行。但是要获取一个新的连接,就必须先释放一个连接,这就导致了死锁。
链接的维基百科文章详细介绍了如何预防死锁。例如,代码执行只有在拥有所有需要的资源(或将需要的资源)时才能进入临界区(锁定资源)。在这种情况下,这意味着你需要预留2个连接(确切地说是2个;如果只有1个可用,就放弃它并等待),只有当你拥有这2个连接时才能继续执行查询。但是在Go中,你无法提前预留连接。它们在执行查询时根据需要分配。
通常应避免使用这种模式。你不应该编写首先预留(有限的)资源(在这种情况下是数据库连接)的代码,并在释放之前要求另一个资源。
一个简单的解决方法是执行第一个查询,保存其结果(例如,保存到Go切片中),并在完成后再继续执行后续查询(但也不要忘记首先关闭sql.Rows
)。这样,你的代码不需要同时使用2个连接。
不要忘记处理错误!为了简洁起见,我省略了错误处理,但你在代码中不应该这样做。
以下是示例代码:
go func() {
defer wg.Done()
rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
var data []int // 使用适当的类型描述你查询的数据
for rows.Next() {
var something int
rows.Scan(&something)
data = append(data, something)
}
rows.Close()
for _, v := range data {
// 如果需要,可以将v作为查询参数使用
db.Exec("SELECT * FROM reviews LIMIT 1")
}
}()
请注意,rows.Close()
应该作为 defer
语句执行,以确保它会被执行(即使发生 panic)。但是,如果你只是使用 defer rows.Close()
,那么它只会在后续查询执行后才执行,因此它无法防止死锁。因此,我建议将其重构为在另一个函数中调用它(可以是匿名函数),在该函数中你可以使用 defer
:
rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
var data []int // 使用适当的类型描述你查询的数据
func() {
defer rows.Close()
for rows.Next() {
var something int
rows.Scan(&something)
data = append(data, something)
}
}()
还要注意,在第二个 for
循环中,通过 DB.Prepare()
获取的准备好的语句(sql.Stmt
)可能是执行相同(参数化)查询的更好选择。
另一种选择是在新的goroutine中启动后续查询,这样在当前锁定的连接被释放(或任何其他goroutine锁定的连接)时,可以执行该查询,但是在没有显式同步的情况下,你无法控制它们何时执行。示例如下:
go func() {
defer wg.Done()
rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
defer rows.Close()
for rows.Next() {
var something int
rows.Scan(&something)
// 如果需要,可以传递 something
go db.Exec("SELECT * FROM reviews LIMIT 1")
}
}()
要使程序等待这些goroutine,可以使用你已经在使用的 WaitGroup
:
// 如果需要,可以传递 something
wg.Add(1)
go func() {
defer wg.Done()
db.Exec("SELECT * FROM reviews LIMIT 1")
}()
英文:
What you have is a deadlock. In the worst case scenario you have 15 goroutines holding 15 database connections, and all of those 15 goroutines require a new connection to continue. But to get a new connection, one would have to advance and release a connection: deadlock.
The linked wikipedia article details prevention of deadlock. For example a code execution should only enter a critical section (that locks resources) when it has all the resources it needs (or will need). In this case this means you would have to reserve 2 connections (exactly 2; if only 1 is available, leave it and wait), and if you have those 2, only then proceed with the queries. But in Go you can't reserve connections in advance. They are allocated as needed when you execute queries.
Generally this pattern should be avoided. You should not write code which first reserves a (finite) resource (db connection in this case), and before it would release it, it demands another one.
An easy workaround is to execute the first query, save its result (e.g. into a Go slice), and when you're done with that, then proceed with the subsequent queries (but also don't forget to close sql.Rows
first). This way your code does not need 2 connections at the same time.
And don't forget to handle errors! I omitted them for brevity, but you should not in your code.
This is how it could look like:
go func() {
defer wg.Done()
rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
var data []int // Use whatever type describes data you query
for rows.Next() {
var something int
rows.Scan(&something)
data = append(data, something)
}
rows.Close()
for _, v := range data {
// You may use v as a query parameter if needed
db.Exec("SELECT * FROM reviews LIMIT 1")
}
}()
Note that rows.Close()
should be executed as a defer
statement to make sure it will get executed (even in case of a panic). But if you simply use defer rows.Close()
, that would only be executed after the subsequent queries are executed, so it won't prevent the deadlock. So I would refactor it to call it in another function (which may be an anonymous function) in which you can use a defer
:
rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
var data []int // Use whatever type describes data you query
func() {
defer rows.Close()
for rows.Next() {
var something int
rows.Scan(&something)
data = append(data, something)
}
}()
Also note that in the second for
loop a prepared statement (sql.Stmt
) acquired by DB.Prepare()
would probably be a much better choice to execute the same (parameterized) query multiple times.
Another option is to launch subsequent queries in new goroutines so that the query executed in that can happen when the currently locked connection is released (or any other connection locked by any other goroutine), but then without explicit synchronization you don't have control when they get executed. It could look like this:
go func() {
defer wg.Done()
rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
defer rows.Close()
for rows.Next() {
var something int
rows.Scan(&something)
// Pass something if needed
go db.Exec("SELECT * FROM reviews LIMIT 1")
}
}()
To make your program wait for these goroutines too, use the WaitGroup
you already have in action:
// Pass something if needed
wg.Add(1)
go func() {
defer wg.Done()
db.Exec("SELECT * FROM reviews LIMIT 1")
}()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论