DB calls in goroutine failing without error

huangapple go评论85阅读模式
英文:

DB calls in goroutine failing without error

问题

我写了一个脚本,用于将大量数据从一个数据库迁移到另一个数据库,并且已经成功运行。但是现在我想尝试使用goroutines通过并发的数据库调用来加快脚本的速度。在调用go processBatch(offset)而不是processBatch(offset)之后,我可以看到一些goroutines被启动,但脚本几乎立即完成,并且实际上没有执行任何操作。而且每次调用脚本时启动的goroutines数量都不同。没有错误(至少我看不到)。

我对goroutines和Go语言还不太熟悉,所以非常感谢任何关于我可能做错的地方的指导。我已经从下面的代码中删除了与并发或数据库访问无关的所有逻辑,因为没有这些更改,代码可以正常运行。我还在我认为它失败的地方留下了一个注释,因为该行以下的代码都没有运行(Println没有输出)。我还尝试使用sync.WaitGroup来分散数据库调用,但似乎没有改变任何东西。

var (
	legacyDB *sql.DB
	v2DB     *sql.DB
)

func main() {

	var total, loops int
	var err error

	legacyDB, err = sql.Open("mysql", "...")
	if err != nil {
		panic(err)
	}
	defer legacyDB.Close()

	v2DB, err = sql.Open("mysql", "...")
	if err != nil {
		panic(err)
	}
	defer v2DB.Close()

	err = legacyDB.QueryRow("SELECT count(*) FROM users").Scan(&total)
	checkErr(err)

	loops = int(math.Ceil(float64(total) / float64(batchsize)))

	fmt.Println("Total: " + strconv.Itoa(total))
	fmt.Println("Loops: " + strconv.Itoa(loops))

	for i := 0; i < loops; i++ {
		offset := i * batchsize

		go processBatch(offset)
	}

	legacyDB.Close()
	v2DB.Close()
}

func processBatch(offset int) {

	query := namedParameterQuery.NewNamedParameterQuery(`
		SELECT ...
		LIMIT :offset,:batchsize
	`)
	query.SetValue(...)

	rows, err := legacyDB.Query(query.GetParsedQuery(), (query.GetParsedParameters())...)
	// 以下这行之后的代码都没有执行(这里的Println没有输出)
	checkErr(err)
	defer rows.Close()

	....

	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	log.Printf("\nAlloc = %v\nTotalAlloc = %v\nSys = %v\nNumGC = %v\n\n", m.Alloc/1024/1024, m.TotalAlloc/1024/1024, m.Sys/1024/1024, m.NumGC)
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

希望这可以帮助到你。

英文:

I wrote a script to migrate lots of data from one DB to another and got it working fine, but now I want to try and use goroutines to speed up the script by using concurrent DB calls. Since making the change to calling go processBatch(offset) instead of just processBatch(offset), I can see that a few goroutines are started but the script finishes almost instantly and nothing is actually done. Also the number of started goroutines varies every time I call the script. There are no errors (that I can see).

I'm still new to goroutines and Go in general, so any pointers as to what I might be doing wrong are much appreciated. I have removed all logic from the code below that is not related to concurrency or DB access, as it runs fine without the changes. I also left a comment where I believe it fails, as nothing below that line is run (Print gives not output). I also tried using sync.WaitGroup to stagger DB calls, but it didn't seem to change anything.

var (
legacyDB     *sql.DB
v2DB         *sql.DB
)
func main() {
var total, loops int
var err error
legacyDB, err = sql.Open(&quot;mysql&quot;, &quot;...&quot;)
if err != nil {
panic(err)
}
defer legacyDB.Close()
v2DB, err = sql.Open(&quot;mysql&quot;, &quot;...&quot;)
if err != nil {
panic(err)
}
defer v2DB.Close()
err = legacyDB.QueryRow(&quot;SELECT count(*) FROM users&quot;).Scan(&amp;total)
checkErr(err)
loops = int(math.Ceil(float64(total) / float64(batchsize)))
fmt.Println(&quot;Total: &quot; + strconv.Itoa(total))
fmt.Println(&quot;Loops: &quot; + strconv.Itoa(loops))
for i := 0; i &lt; loops; i++ {
offset := i * batchsize
go processBatch(offset)
}
legacyDB.Close()
v2DB.Close()
}
func processBatch(offset int) {
query := namedParameterQuery.NewNamedParameterQuery(`
SELECT ...
LIMIT :offset,:batchsize
`)
query.SetValue(...)
rows, err := legacyDB.Query(query.GetParsedQuery(), (query.GetParsedParameters())...)
// nothing after this line gets done (Println here does not show output)
checkErr(err)
defer rows.Close()
....
var m runtime.MemStats
runtime.ReadMemStats(&amp;m)
log.Printf(&quot;\nAlloc = %v\nTotalAlloc = %v\nSys = %v\nNumGC = %v\n\n&quot;, m.Alloc/1024/1024, m.TotalAlloc/1024/1024, m.Sys/1024/1024, m.NumGC)
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}

答案1

得分: 3

根据Nadh在评论中提到的,这是因为当main函数结束时,程序会退出,而不管是否还有其他goroutine在运行。为了解决这个问题,可以使用*sync.WaitGroup。WaitGroup用于处理多个并发操作,并等待它们全部完成。可以在这里找到文档:https://golang.org/pkg/sync/#WaitGroup。

一个不使用全局变量的示例实现如下所示,将以下代码替换:

fmt.Println("Total: " + strconv.Itoa(total))
fmt.Println("Loops: " + strconv.Itoa(loops))
for i := 0; i < loops; i++ {
offset := i * batchsize
go processBatch(offset)
}

使用以下代码:

fmt.Println("Total: " + strconv.Itoa(total))
fmt.Println("Loops: " + strconv.Itoa(loops))
wg := new(sync.WaitGroup)
wg.Add(loops)
for i := 0; i < loops; i++ {
offset := i * batchsize
go func(offset int) {
defer wg.Done()
processBatch(offset)
}(offset)
}
wg.Wait()
英文:

As Nadh mentioned in a comment, that would be because the program exits when the main function finishes, regardless whether or not there are still other goroutines running. To fix this, a *sync.WaitGroup will suffice. A WaitGroup is used for cases where you have multiple concurrent operations, and you would like to wait until they have all completed. Documentation can be found here: https://golang.org/pkg/sync/#WaitGroup.

An example implementation for your program without the use of global variables would look like replacing

fmt.Println(&quot;Total: &quot; + strconv.Itoa(total))
fmt.Println(&quot;Loops: &quot; + strconv.Itoa(loops))
for i := 0; i &lt; loops; i++ {
offset := i * batchsize
go processBatch(offset)
}

with

fmt.Println(&quot;Total: &quot; + strconv.Itoa(total))
fmt.Println(&quot;Loops: &quot; + strconv.Itoa(loops))
wg := new(sync.WaitGroup)
wg.Add(loops)
for i := 0; i &lt; loops; i++ {
offset := i * batchsize
go func(offset int) {
defer wg.Done()
processBatch(offset)
}(offset)
}
wg.Wait()

huangapple
  • 本文由 发表于 2017年1月2日 13:20:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/41421696.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定