英文:
Understanding SQL connection pool work with Go
问题
我有一个高负载的 RabbitMQ 队列,可能会有数百万条消息。MQ 代理从队列中读取消息并将其写入 MS SQL 数据库。我尝试使用 goroutine 非阻塞、并发地写入:
for m := range msgs {
// ......
se := &sqlEntity{
body: string(m.Body),
cnt: m.MessageCount,
timeStamp: fmt.Sprintf("%v", m.Timestamp.Format("2006-01-02 15:04:05")),
uuid: u,
}
go func(se *sqlEntity) {
writeSQL(se)
}(se)
// .........
}
func writeSQL(se *sqlEntity) {
result, err := db.Exec(cmd, args...)
// ......
}
所以,写入函数不会阻塞从 MQ 中读取。但是,如果消息太多,写入过程会耗尽 MS SQL 服务器上的所有连接。因此,我尝试设置连接池,显式设置连接数 - (DB.SetMaxOpenConns)。我确信 database/sql 驱动程序会管理连接,但实际上并不是这样。如果连接数(例如,设置 SetMaxOpenConns = 256)耗尽,在这种情况下,writeSQL() 调用不会等待连接池中的空闲连接,其中的 result, err := db.Exec(cmd, args...) 会直接返回连接错误。
那么,我该如何设计我的应用程序以在连接池限制内并发调用 writeSQL() 函数呢?现在,如果连接池耗尽,我只是丢失了我的数据。或者如果没有连接池限制,数据库会过载。
英文:
I have a rabbit MQ queue which is high load, it may have up to several millions messages. The MQ broker reads messages from queue and writes them to MS SQL DB. I tried to write non-blocking, concurrently, using a goroutine:
for m := range msgs {
//......
se := &sqlEntity{
body: string(m.Body),
cnt: m.MessageCount,
timeStamp: fmt.Sprintf("%v", m.Timestamp.Format("2006-01-02 15:04:05")),
uuid: u,
}
go func(se *sqlEntity) {
writeSQL(se)
}(se)
//.........
}
func writeSQL(se *sqlEntity) {
result, err := db.Exec(cmd, args...)
//.......
}
So, write function does not block reading from MQ. But if there are too many messages, write process exhausts all present connections on MS SQL server. Thus I tried to setup the pool, set the number of connections explicitly - (DB.SetMaxOpenConns). I was sure that the database/sql driver will manage the connections, but it does not. If connections (for example let SetMaxOpenConns = 256) exhausts, writeSQL() call does not wait for free connection in the pool, result, err := db.Exec(cmd, args...) inside it simply returns connection error in this case.
So, how can I design my application to call writeSQL() concurrently, but strictly within the pool limits? Now I simply loose my data if the pool is exhausted. Or DB overloads if there is no pool limit.
答案1
得分: 1
你可以使用带有缓冲区的通道来控制writeSQL
函数的并发性,缓冲区的大小应该等于连接池中的最大连接数。
每次调用writeSQL
函数时,它会向通道发送一条消息。在执行db.Exec
语句之前,它会等待从通道接收到一条消息,这表示连接池中有一个空闲连接可用。
这种方式可以处理并发的writeSQL
函数数量,并确保它永远不会超过连接池中的最大连接数。当连接池耗尽时,你不会丢失任何数据。
使用你提供的代码,应该如下所示:
connPool := make(chan struct{}, maxOpenConns) // 创建一个缓冲区大小等于连接池中最大连接数的通道
for m := range msgs {
// ...
se := &sqlEntity{
body: string(m.Body),
cnt: m.MessageCount,
timeStamp: fmt.Sprintf("%v", m.Timestamp.Format("2006-01-02 15:04:05")),
uuid: u,
}
go func(se *sqlEntity) {
writeSQL(se)
}(se)
// ...
}
func writeSQL(se *sqlEntity) {
connPool <- struct{}{} // 等待连接池中的空闲连接
defer func() {
<-connPool // 在writeSQL完成后释放连接
}()
result, err := db.Exec(cmd, args...)
// 处理错误并返回
}
英文:
One thing you can do is to use a buffered channel with a size equal to the maximum number of connections in the pool to control the concurrency of writeSQL
function.
Every time writeSQL
is called, it sends a message to the channel.
And before executing the db.Exec
statement, it waits for a message to be received from the channel which indicates a free connection is available in the pool.
This way should allow you to handle the number of concurrent writeSQL
functions and ensure that it will never exceed the maximum number of connections in the pool.
You won't lose any data when the pool is exhausted.
Using the code you've provided, it should look like:
connPool := make(chan struct{}, maxOpenConns) // create a buffered channel with a size equal to the maximum number of connections in the pool
for m := range msgs {
// ...
se := &sqlEntity{
body: string(m.Body),
cnt: m.MessageCount,
timeStamp: fmt.Sprintf("%v", m.Timestamp.Format("2006-01-02 15:04:05")),
uuid: u,
}
go func(se *sqlEntity) {
writeSQL(se)
}(se)
// ...
}
func writeSQL(se *sqlEntity) {
connPool <- struct{}{} // wait for a free connection in the pool
defer func() {
<-connPool // release the connection after writeSQL is done
}()
result, err := db.Exec(cmd, args...)
// handle error and return
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论