如何使用通道来知道由循环启动的所有goroutine何时完成。

huangapple go评论76阅读模式
英文:

How can I use channels to know when all goroutines started by a loop are completed

问题

我目前正在尝试使用range遍历map,以便并发地进行数据库请求,而不是同步地进行,这显然可以提高速度。

我的问题是我有类似这样的代码:

var mainthreads = make(chan *mainthread)
var mainthreadsFetched = make(chan struct{})
for containerid := range containers {
	go func() {
		rows, err := db.Query("SELECT thread_id, belongs_to, thread_name, access_level FROM forum_mainthread WHERE belongs_to = ?", containerid)
		defer rows.Close()
		if err != nil {
			log.Println(err)
		}
		for rows.Next() {
			mainthread := &MainThread{}
			err := rows.Scan(&mainthread.MainThreadID, &mainthread.BelongsTo, &mainthread.ThreadName, &mainthread.AccessLevel)
			if err != nil {
				log.Println(err)
			}
            mainthreads <- mainthread
		}
	}()
	mainthreadsFetched <- struct{}{}
}

// 获取所有主线程
<-mainthreadsFetched
// 在完成后执行其他操作

显然,mainthreadsFetched <- struct{}{}几乎立即被调用,因为循环完成得比眨眼的速度还快。我该如何在每次循环中创建一个新的通道,以便不阻塞每个新的goroutine的启动,而是让循环启动所有的goroutine,然后在每个goroutine完成时发送到一个通道上?

英文:

I'm currently trying to range over a map to do concurrent database requests rather than synchronously, obviously because of the speed boost.

My problem is I have something like this:

var mainthreads = make(chan *mainthread)
var mainthreadsFetched = make(chan struct{})
for containerid := range containers {
	go func() {
		rows, err := db.Query(&quot;SELECT thread_id, belongs_to, thread_name, access_level FROM forum_mainthread WHERE belongs_to = ?&quot;, containerid)
		defer rows.Close()
		if err != nil {
			log.Println(err)
		}
		for rows.Next() {
			mainthread := &amp;MainThread{}
			err := rows.Scan(&amp;mainthread.MainThreadID, &amp;mainthread.BelongsTo, &amp;mainthread.ThreadName, &amp;mainthread.AccessLevel)
			if err != nil {
				log.Println(err)
			}
            mainthreads &lt;- mainthread
		}
	}()
	mainthreadsFetched &lt;- struct{}{}
}

// Get all mainthreads
&lt;-mainthreadsFetched
// Do other stuff after complete

Obviously mainthreadsFetched &lt;- struct{}{} is being called almost instantly because the loop finishes faster than you can blink, how can I create a new channel per loop that will not block each new goroutine from starting, but rather let the loop start all goroutines and then send out on a channel when every goroutine is done.

答案1

得分: 3

使用sync.WaitGroup是一个很好的解决方案,通常也是使用的解决方案。

另外,你可以将接收操作mainthreadsFetched重复执行len(containers)次,而不仅仅是1次。这样可以确保所有的goroutine都已经完成。你需要将发送操作放在goroutine的最后(或者更好的方式是使用defer)。

另外,由于containerid在for循环中,它的值会发生变化。你需要将它作为参数传递给goroutine的闭包函数。

英文:

Using sync.WaitGroup is a great solution, and the one usually used.

Alternatively, you can receive on mainthreadsFetched len(containers) times, instead of just 1. That will guarantee that all go routines have completed. You'll need to move the send line to the end of the go routine (or, better, into a defer).

Also, since containerid is in the for loop, its value changes. You need to pass it as a parameter to the go routine closure.

答案2

得分: 0

所以我能想到的最好的方法是使用sync.WaitGroup,并且做如下操作:

var wg sync.WaitGroup
var mainThreadFetched = make(chan MainThread)
for containerid := range containers {
    wg.Add(1)
    go func(containerid int64) {
        rows, err := db.Query("SELECT thread_id, belongs_to, thread_name, access_level FROM forum_mainthread WHERE belongs_to = ?", containerid)
        defer rows.Close()
        if err != nil {
            log.Println(err)
        }
        for rows.Next() {
            mainthread := MainThread{}
            err := rows.Scan(&mainthread.MainThreadID, &mainthread.BelongsTo, &mainthread.ThreadName, &mainthread.AccessLevel)
            if err != nil {
                log.Println(err)
            }
            mainThreadFetched <- mainthread
        }
        wg.Done()
    }(containerid)
}

go func() {
    wg.Wait()
    close(mainThreadFetched)
}()

for mainthread := range mainThreadFetched {
    containers[mainthread.BelongsTo].MainThreads = append(containers[mainthread.BelongsTo].MainThreads, mainthread)
}

// 进行其他操作

现在我可以从`mainThreadFetched`通道中读取数据`WaitGroup`满足条件时它将关闭通道使循环结束并继续执行其他操作

<details>
<summary>英文:</summary>

So the best way I could come up with doing this is to use `sync.WaitGroup` and do something like this:

	var wg sync.WaitGroup
	var mainThreadFetched = make(chan MainThread)
	for containerid := range containers {
		wg.Add(1)
		go func(containerid int64) {
			rows, err := db.Query(&quot;SELECT thread_id, belongs_to, thread_name, access_level FROM forum_mainthread WHERE belongs_to = ?&quot;, containerid)
			defer rows.Close()
			if err != nil {
				log.Println(err)
			}
			for rows.Next() {
				mainthread := MainThread{}
				err := rows.Scan(&amp;mainthread.MainThreadID, &amp;mainthread.BelongsTo, &amp;mainthread.ThreadName, &amp;mainthread.AccessLevel)
				if err != nil {
					log.Println(err)
				}
				mainThreadFetched &lt;- mainthread
			}
			wg.Done()
		}(containerid)
	}
	
	go func() {
		wg.Wait()
		close(mainThreadFetched)
	}()
	
	for mainthread := range mainThreadFetched {
		containers[mainthread.BelongsTo].MainThreads = append(containers[mainthread.BelongsTo].MainThreads, mainthread)
	}

	// Do other stuff

Now I can read from the `mainThreadFetched` channel and then when the `WaitGroup` is satisfied it will close the channel allowing the loop to end and continue on

</details>



# 答案3
**得分**: 0

我看不到你在哪里读取主线程如果它不是一个带缓冲的通道你需要以某种方式解决这个问题我将提供几种解决方案没有一种比其他方案更正确”,它取决于你的需求

**变体A**
这是最简单的解决方案但它假设你有其他的goroutine在读取主线程这可能已经是情况了)。

```go
var mainthreads = make(chan *mainthread)
var mainthreadsFetched = make(chan struct{})
go somethingWhichReadsMainThreads()
for containerid := range containers {
    go func(containerid int) {
        // 省略了构建查询的部分
        for rows.Next() {
            // 省略了部分内容
            mainthreads <- mainthread
        }
        mainthreadsFetched <- struct{}{}
    }(containerid)
}

for i := 0; i < len(containers); i++ {
    <-mainThreadsFetched
}
close(mainthreads)
// 完成后进行其他操作

变体B
这个解决方案使用了select语句,用于处理读取线程和完成通知的分离,而不需要另一个goroutine。

var mainthreads = make(chan *mainthread)
var mainthreadsFetched = make(chan struct{})
for containerid := range containers {
    go func(containerid int) {
        // 省略了构建查询的部分
        for rows.Next() {
            // 省略了部分内容
            mainthreads <- mainthread
        }
        mainthreadsFetched <- struct{}{}
    }(containerid)
}

numComplete := 0
readRunning := true
for readRunning {
    select {
    case thread := <-mainthreads:
        // 对线程进行操作,比如 threads = append(threads, thread)
    case <-mainthreadsFetched:
        numFetched++
        if numFetched == len(containers) {
            readRunning = False
        }
    }
}
// 完成后进行其他操作

变体C
这个解决方案利用了你没有使用“零值”(nil)来传递真实数据的事实,所以你可以将其作为一个信号值而不是一个单独的结构通道。它的优点是代码量少得多,但感觉有点像遥远的幽灵行动。

var mainthreads = make(chan *mainthread)
for containerid := range containers {
    go func(containerid int) {
        // 省略了构建查询的部分
        for rows.Next() {
            // 省略了 Scan 的部分
            mainthreads <- mainthread
        }
        mainthreads <- nil // nil 信号表示我们完成了
    }(containerid)
}

numComplete := 0
for thread := range mainthreads {
    if thread != nil {
        // 对线程进行操作,比如 threads = append(threads, thread)
    } else {
        numFetched++
        if numFetched == len(containers) {
            break
        }
    }
}
// 完成后进行其他操作

希望这些解决方案对你有帮助!

英文:

I don't see where you are reading mainthreads. If it's not a buffered channel, you would need to solve that some way or another. I'm going to provide a few solutions for solving it - None are more "correct" than the other - it just depends on your needs.

Variant A
This is the simplest solution but it presumes you have some other goroutine reading mainthreads (which may already be the case)

var mainthreads = make(chan *mainthread)
var mainthreadsFetched = make(chan struct{})
go somethingWhichReadsMainThreads()
for containerid := range containers {
go func(containerid int) {
// build query omitted for brevity
for rows.Next() {
// omitted for brevity
mainthreads &lt;- mainthread
}
mainthreadsFetched &lt;- struct{}{}
}(containerid)
}
for i := 0; i &lt; len(containers); i++ {
&lt;-mainThreadsFetched
}
close(mainthreads)
// Do other stuff after complete

Variant B
This one uses the select statement to deal with reading the threads separate from the completion notifications without needing yet another goroutine.

var mainthreads = make(chan *mainthread)
var mainthreadsFetched = make(chan struct{})
for containerid := range containers {
go func(containerid int) {
// build query omitted for brevity
for rows.Next() {
// omitted for brevity
mainthreads &lt;- mainthread
}
mainthreadsFetched &lt;- struct{}{}
}(containerid)
}
numComplete := 0
readRunning := true
for readRunning {
select {
case thread := &lt;-mainthreads:
// do something with thread, like threads = append(threads, thread)
case &lt;-mainthreadsFetched:
numFetched++
if numFetched == len(containers) {
readRunning = False
}
}
}
// Do other stuff after complete

Variant C
This one uses the fact that you aren't using the 'zero value' (nil) for passing real data, so you can use that as a signal value instead of a separate struct channel. It has the advantage that it is far less code, but it does feel like spooky action at a distance.

var mainthreads = make(chan *mainthread)
for containerid := range containers {
go func(containerid int) {
// build query omitted for brevity
for rows.Next() {
// omitted Scan for brevity
mainthreads &lt;- mainthread
}
mainthreads &lt;- nil // nil signals to us we are done
}(containerid)
}
numComplete := 0
for thread := range mainthreads {
if thread != nil {
// do something with thread, like threads = append(threads, thread)
} else {
numFetched++
if numFetched == len(containers) {
break
}
}
}
// Do other stuff after complete

huangapple
  • 本文由 发表于 2015年12月5日 12:51:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/34101421.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定