处理查询的工作池

huangapple go评论92阅读模式
英文:

Worker pool to handle queries

问题

我对Go语言还不太熟悉,但是我可以帮你翻译这段代码。以下是翻译的结果:

package main

import (
	"database/sql"
	_ "github.com/go-sql-driver/mysql"
)

var query *sql.Stmt

func worker(jobs <-chan int, results chan<- int) {

	for _ = range jobs {

		_, e := query.Exec("a")

		if e != nil {

			panic(e.Error())
		}

		results <- 1
	}
}

func main() {

	workers := 100

	db, e := sql.Open("mysql", "foo:foo@/foo")

	if e != nil {

		panic(e.Error())
	}

	db.SetMaxOpenConns(workers)
	db.SetMaxIdleConns(workers)

	defer db.Close()

	query, e = db.Prepare("INSERT INTO foo (foo) values(?)")

	if e != nil {

		panic(e.Error())
	}

	total := 30000
	jobs := make(chan int, total)
	results := make(chan int, total)

	for w := 0; w < workers; w++ {

		go worker(jobs, results)
	}

	for j := 0; j < total; j++ {

		jobs <- j
	}

	close(jobs)

	for r := 0; r < total; r++ {

		<-results
	}
}

这段代码是用来处理3000个查询的,使用了100个工作线程,并确保每个工作线程都有一个数据库连接(MySQL已经配置了超过100个连接)。这段代码的实现方式是创建一个worker函数来执行查询操作,并使用通道来传递任务和结果。在main函数中,首先设置了数据库连接的最大打开连接数和最大空闲连接数,然后创建了用于传递任务和结果的通道。接着使用循环创建了100个工作线程,并将它们启动。然后,将30000个任务发送到任务通道中,并关闭任务通道。最后,通过循环从结果通道中接收结果。这段代码看起来是可行的,但是否是最佳实践,我无法确定。如果你有更好的实现方式,欢迎分享。

英文:

I'm pretty new to Go and looking for a way to handle 3000 queries using 100 workers and ensuring a connection for every worker (MySQL is already configured with more than 100 connections). This is my attempt:

package main
import (
&quot;database/sql&quot;
_ &quot;github.com/go-sql-driver/mysql&quot;
)
var query *sql.Stmt
func worker(jobs &lt;-chan int, results chan&lt;- int) {
for _ = range jobs {
_, e := query.Exec(&quot;a&quot;)
if e != nil {
panic(e.Error())
}
results &lt;- 1
}
}
func main() {
workers := 100
db, e := sql.Open(&quot;mysql&quot;, &quot;foo:foo@/foo&quot;)
if e != nil {
panic(e.Error())
}
db.SetMaxOpenConns(workers)
db.SetMaxIdleConns(workers)
defer db.Close()
query, e = db.Prepare(&quot;INSERT INTO foo (foo) values(?)&quot;)
if e != nil {
panic(e.Error())
}
total := 30000
jobs := make(chan int, total)
results := make(chan int, total)
for w := 0; w &lt; workers; w++ {
go worker(jobs, results)
}
for j := 0; j &lt; total; j++ {
jobs &lt;- j
}
close(jobs)
for r := 0; r &lt; total; r++ {
&lt;-results
}
}

It's working, but I'm not sure if is the best way of doing it.

Please, if you think this is opinion based or is not a good question at all, just mark it to be closed and leave a comment explaining why.

答案1

得分: 2

你所拥有的基本上是可行的,但为了消除缓冲,你需要同时向jobs写入并从results读取。否则,你的进程会被阻塞 - 工作线程无法发送结果,因为没有接收者,而且你也无法插入作业,因为工作线程被阻塞。

这里有一个简化的示例,展示了如何在接收到结果时在main函数中后台推送作业的工作队列:Playground上的示例

package main

import "fmt"

func worker(jobs <-chan int, results chan<- int) {
    for _ = range jobs {
        // ...在这里进行工作...
        results <- 1
    }
}

func main() {
    workers := 10
    total := 30
    jobs := make(chan int)
    results := make(chan int)

    // 启动工作线程
    for w := 0; w < workers; w++ {
        go worker(jobs, results)
    }

    // 后台插入作业
    go func() {
        for j := 0; j < total; j++ {
            jobs <- j
        }
    }()

    // 收集结果
    for i := 0; i < total; i++ {
        <-results
        fmt.Printf(".")
    }

    close(jobs)
}

要使该代码正常工作,你必须知道将获得多少个结果。如果你不知道(例如,每个作业可能产生零个或多个结果),你可以使用sync.WaitGroup来等待工作线程完成,然后关闭结果流:Playground上的示例

package main

import (
    "fmt"
    "sync"
)

func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    for _ = range jobs {
        // ...在这里进行工作...
        results <- 1
    }
    wg.Done()
}

func main() {
    workers := 10
    total := 30
    jobs := make(chan int)
    results := make(chan int)
    wg := &sync.WaitGroup{}

    // 启动工作线程
    for w := 0; w < workers; w++ {
        wg.Add(1)
        go worker(jobs, results, wg)
    }

    // 后台插入作业
    go func() {
        for j := 0; j < total; j++ {
            jobs <- j
        }
        close(jobs)
        wg.Wait()
        // 所有工作线程都完成,不再有结果
        close(results)
    }()

    // 收集结果
    for _ = range results {
        fmt.Printf(".")
    }

}

还有许多其他更复杂的技巧可以在发生错误后停止所有工作线程,将结果按照原始作业的顺序放置,或者执行其他类似的操作。不过,基本版本在这里应该是可行的。

英文:

What you've got fundamentally works, but to get rid of buffering, you need to be writing to jobs and reading from results at the same time. Otherwise, your process ends up stuck--workers can't send results because nothing is receiving them, and you can't insert jobs because workers are blocked.

Here's a boiled-down example on the Playground of how to do a work queue that pushes jobs in the background as it receives results in main:

package main
import &quot;fmt&quot;
func worker(jobs &lt;-chan int, results chan&lt;- int) {
for _ = range jobs {
// ...do work here...
results &lt;- 1
}
}
func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)
// start workers
for w := 0; w &lt; workers; w++ {
go worker(jobs, results)
}
// insert jobs in background
go func() {
for j := 0; j &lt; total; j++ {
jobs &lt;- j
}
}()
// collect results
for i := 0; i &lt; total; i++ {
&lt;-results
fmt.Printf(&quot;.&quot;)
}
close(jobs)
}

For that particular code to work, you have to know how many results you'll get. If you don't know that (say, each job could produce zero or multiple results), you can use a sync.WaitGroup to wait for the workers to finish, then close the result stream:

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
func worker(jobs &lt;-chan int, results chan&lt;- int, wg *sync.WaitGroup) {
for _ = range jobs {
// ...do work here...
results &lt;- 1
}
wg.Done()
}
func main() {
workers := 10
total := 30
jobs := make(chan int)
results := make(chan int)
wg := &amp;sync.WaitGroup{}
// start workers
for w := 0; w &lt; workers; w++ {
wg.Add(1)
go worker(jobs, results, wg)
}
// insert jobs in background
go func() {
for j := 0; j &lt; total; j++ {
jobs &lt;- j
}
close(jobs)
wg.Wait()
// all workers are done so no more results
close(results)
}()
// collect results
for _ = range results {
fmt.Printf(&quot;.&quot;)
}
}

There are many other more complicated tricks one can do to stop all workers after an error happens, put results into the same order as the original jobs, or do other things like that. Sounds as if the basic version works here, though.

huangapple
  • 本文由 发表于 2014年10月30日 23:26:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/26657039.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定