英文:
Worker pool to handle queries
问题
我对Go语言还不太熟悉,但是我可以帮你翻译这段代码。以下是翻译的结果:
package main
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
var query *sql.Stmt
func worker(jobs <-chan int, results chan<- int) {
for _ = range jobs {
_, e := query.Exec("a")
if e != nil {
panic(e.Error())
}
results <- 1
}
}
func main() {
workers := 100
db, e := sql.Open("mysql", "foo:foo@/foo")
if e != nil {
panic(e.Error())
}
db.SetMaxOpenConns(workers)
db.SetMaxIdleConns(workers)
defer db.Close()
query, e = db.Prepare("INSERT INTO foo (foo) values(?)")
if e != nil {
panic(e.Error())
}
total := 30000
jobs := make(chan int, total)
results := make(chan int, total)
for w := 0; w < workers; w++ {
go worker(jobs, results)
}
for j := 0; j < total; j++ {
jobs <- j
}
close(jobs)
for r := 0; r < total; r++ {
<-results
}
}
这段代码是用来处理3000个查询的,使用了100个工作线程,并确保每个工作线程都有一个数据库连接(MySQL已经配置了超过100个连接)。这段代码的实现方式是创建一个worker
函数来执行查询操作,并使用通道来传递任务和结果。在main
函数中,首先设置了数据库连接的最大打开连接数和最大空闲连接数,然后创建了用于传递任务和结果的通道。接着使用循环创建了100个工作线程,并将它们启动。然后,将30000个任务发送到任务通道中,并关闭任务通道。最后,通过循环从结果通道中接收结果。这段代码看起来是可行的,但是否是最佳实践,我无法确定。如果你有更好的实现方式,欢迎分享。
英文:
I'm pretty new to Go and looking for a way to handle 3000 queries using 100 workers and ensuring a connection for every worker (MySQL is already configured with more than 100 connections). This is my attempt:
package main
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
var query *sql.Stmt
func worker(jobs <-chan int, results chan<- int) {
for _ = range jobs {
_, e := query.Exec("a")
if e != nil {
panic(e.Error())
}
results <- 1
}
}
func main() {
workers := 100
db, e := sql.Open("mysql", "foo:foo@/foo")
if e != nil {
panic(e.Error())
}
db.SetMaxOpenConns(workers)
db.SetMaxIdleConns(workers)
defer db.Close()
query, e = db.Prepare("INSERT INTO foo (foo) values(?)")
if e != nil {
panic(e.Error())
}
total := 30000
jobs := make(chan int, total)
results := make(chan int, total)
for w := 0; w < workers; w++ {
go worker(jobs, results)
}
for j := 0; j < total; j++ {
jobs <- j
}
close(jobs)
for r := 0; r < total; r++ {
<-results
}
}
It's working, but I'm not sure if is the best way of doing it.
Please, if you think this is opinion based or is not a good question at all, just mark it to be closed and leave a comment explaining why.
答案1
得分: 2
你所拥有的基本上是可行的,但为了消除缓冲,你需要同时向jobs
写入并从results
读取。否则,你的进程会被阻塞 - 工作线程无法发送结果,因为没有接收者,而且你也无法插入作业,因为工作线程被阻塞。
这里有一个简化的示例,展示了如何在接收到结果时在main
函数中后台推送作业的工作队列:Playground上的示例
package main
import "fmt"
func worker(jobs <-chan int, results chan<- int) {
for _ = range jobs {
// ...在这里进行工作...
results <- 1
}
}
func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)
// 启动工作线程
for w := 0; w < workers; w++ {
go worker(jobs, results)
}
// 后台插入作业
go func() {
for j := 0; j < total; j++ {
jobs <- j
}
}()
// 收集结果
for i := 0; i < total; i++ {
<-results
fmt.Printf(".")
}
close(jobs)
}
要使该代码正常工作,你必须知道将获得多少个结果。如果你不知道(例如,每个作业可能产生零个或多个结果),你可以使用sync.WaitGroup
来等待工作线程完成,然后关闭结果流:Playground上的示例
package main
import (
"fmt"
"sync"
)
func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
for _ = range jobs {
// ...在这里进行工作...
results <- 1
}
wg.Done()
}
func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)
wg := &sync.WaitGroup{}
// 启动工作线程
for w := 0; w < workers; w++ {
wg.Add(1)
go worker(jobs, results, wg)
}
// 后台插入作业
go func() {
for j := 0; j < total; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
// 所有工作线程都完成,不再有结果
close(results)
}()
// 收集结果
for _ = range results {
fmt.Printf(".")
}
}
还有许多其他更复杂的技巧可以在发生错误后停止所有工作线程,将结果按照原始作业的顺序放置,或者执行其他类似的操作。不过,基本版本在这里应该是可行的。
英文:
What you've got fundamentally works, but to get rid of buffering, you need to be writing to jobs
and reading from results
at the same time. Otherwise, your process ends up stuck--workers can't send results because nothing is receiving them, and you can't insert jobs because workers are blocked.
Here's a boiled-down example on the Playground of how to do a work queue that pushes jobs in the background as it receives results in main
:
package main
import "fmt"
func worker(jobs <-chan int, results chan<- int) {
for _ = range jobs {
// ...do work here...
results <- 1
}
}
func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)
// start workers
for w := 0; w < workers; w++ {
go worker(jobs, results)
}
// insert jobs in background
go func() {
for j := 0; j < total; j++ {
jobs <- j
}
}()
// collect results
for i := 0; i < total; i++ {
<-results
fmt.Printf(".")
}
close(jobs)
}
For that particular code to work, you have to know how many results you'll get. If you don't know that (say, each job could produce zero or multiple results), you can use a sync.WaitGroup
to wait for the workers to finish, then close the result stream:
package main
import (
"fmt"
"sync"
)
func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
for _ = range jobs {
// ...do work here...
results <- 1
}
wg.Done()
}
func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)
wg := &sync.WaitGroup{}
// start workers
for w := 0; w < workers; w++ {
wg.Add(1)
go worker(jobs, results, wg)
}
// insert jobs in background
go func() {
for j := 0; j < total; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
// all workers are done so no more results
close(results)
}()
// collect results
for _ = range results {
fmt.Printf(".")
}
}
There are many other more complicated tricks one can do to stop all workers after an error happens, put results into the same order as the original jobs, or do other things like that. Sounds as if the basic version works here, though.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论