理解Go语言中的SQL连接池的工作原理

huangapple go评论81阅读模式
英文:

Understanding SQL connection pool work with Go

问题

我有一个高负载的 RabbitMQ 队列,可能会有数百万条消息。MQ 代理从队列中读取消息并将其写入 MS SQL 数据库。我尝试使用 goroutine 非阻塞、并发地写入:

for m := range msgs {
    // ......
    se := &sqlEntity{
        body:      string(m.Body),
        cnt:       m.MessageCount,
        timeStamp: fmt.Sprintf("%v", m.Timestamp.Format("2006-01-02 15:04:05")),
        uuid:      u,
    }
    go func(se *sqlEntity) {
        writeSQL(se)
    }(se)
    // .........
}

func writeSQL(se *sqlEntity) {
    result, err := db.Exec(cmd, args...)
    // ......
}

所以,写入函数不会阻塞从 MQ 中读取。但是,如果消息太多,写入过程会耗尽 MS SQL 服务器上的所有连接。因此,我尝试设置连接池,显式设置连接数 - (DB.SetMaxOpenConns)。我确信 database/sql 驱动程序会管理连接,但实际上并不是这样。如果连接数(例如,设置 SetMaxOpenConns = 256)耗尽,在这种情况下,writeSQL() 调用不会等待连接池中的空闲连接,其中的 result, err := db.Exec(cmd, args...) 会直接返回连接错误。
那么,我该如何设计我的应用程序以在连接池限制内并发调用 writeSQL() 函数呢?现在,如果连接池耗尽,我只是丢失了我的数据。或者如果没有连接池限制,数据库会过载。

英文:

I have a rabbit MQ queue which is high load, it may have up to several millions messages. The MQ broker reads messages from queue and writes them to MS SQL DB. I tried to write non-blocking, concurrently, using a goroutine:

for m := range msgs {
//......
     se := &sqlEntity{
          body:      string(m.Body),
          cnt:       m.MessageCount,
          timeStamp: fmt.Sprintf("%v", m.Timestamp.Format("2006-01-02 15:04:05")),
          uuid:      u,
     }
     go func(se *sqlEntity) {
          writeSQL(se)
     }(se)
//.........

}
func writeSQL(se *sqlEntity) {
     result, err := db.Exec(cmd, args...)
//.......
}

So, write function does not block reading from MQ. But if there are too many messages, write process exhausts all present connections on MS SQL server. Thus I tried to setup the pool, set the number of connections explicitly - (DB.SetMaxOpenConns). I was sure that the database/sql driver will manage the connections, but it does not. If connections (for example let SetMaxOpenConns = 256) exhausts, writeSQL() call does not wait for free connection in the pool, result, err := db.Exec(cmd, args...) inside it simply returns connection error in this case.
So, how can I design my application to call writeSQL() concurrently, but strictly within the pool limits? Now I simply loose my data if the pool is exhausted. Or DB overloads if there is no pool limit.

答案1

得分: 1

你可以使用带有缓冲区的通道来控制writeSQL函数的并发性,缓冲区的大小应该等于连接池中的最大连接数。

每次调用writeSQL函数时,它会向通道发送一条消息。在执行db.Exec语句之前,它会等待从通道接收到一条消息,这表示连接池中有一个空闲连接可用。

这种方式可以处理并发的writeSQL函数数量,并确保它永远不会超过连接池中的最大连接数。当连接池耗尽时,你不会丢失任何数据。

使用你提供的代码,应该如下所示:

connPool := make(chan struct{}, maxOpenConns) // 创建一个缓冲区大小等于连接池中最大连接数的通道

for m := range msgs {
    // ...
    se := &sqlEntity{
        body:      string(m.Body),
        cnt:       m.MessageCount,
        timeStamp: fmt.Sprintf("%v", m.Timestamp.Format("2006-01-02 15:04:05")),
        uuid:      u,
    }
    go func(se *sqlEntity) {
        writeSQL(se)
    }(se)
    // ...
}

func writeSQL(se *sqlEntity) {
    connPool <- struct{}{} // 等待连接池中的空闲连接
    defer func() {
        <-connPool // 在writeSQL完成后释放连接
    }()
    result, err := db.Exec(cmd, args...)
    // 处理错误并返回
}
英文:

One thing you can do is to use a buffered channel with a size equal to the maximum number of connections in the pool to control the concurrency of writeSQL function.

Every time writeSQL is called, it sends a message to the channel.
And before executing the db.Exec statement, it waits for a message to be received from the channel which indicates a free connection is available in the pool.

This way should allow you to handle the number of concurrent writeSQL functions and ensure that it will never exceed the maximum number of connections in the pool.
You won't lose any data when the pool is exhausted.

Using the code you've provided, it should look like:

connPool := make(chan struct{}, maxOpenConns) // create a buffered channel with a size equal to the maximum number of connections in the pool

for m := range msgs {
    // ...
    se := &amp;sqlEntity{
        body:      string(m.Body),
        cnt:       m.MessageCount,
        timeStamp: fmt.Sprintf(&quot;%v&quot;, m.Timestamp.Format(&quot;2006-01-02 15:04:05&quot;)),
        uuid:      u,
    }
    go func(se *sqlEntity) {
        writeSQL(se)
    }(se)
    // ...
}

func writeSQL(se *sqlEntity) {
    connPool &lt;- struct{}{} // wait for a free connection in the pool
    defer func() {
        &lt;-connPool // release the connection after writeSQL is done
    }()
    result, err := db.Exec(cmd, args...)
    // handle error and return
}

huangapple
  • 本文由 发表于 2023年2月3日 00:25:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/75326111.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定