sync.WaitGroup相对于Channels的优势是什么?

huangapple go评论114阅读模式
英文:

What is the Advantage of sync.WaitGroup over Channels?

问题

我正在翻译你提供的内容,请稍等片刻。

英文:

I'm working on a concurrent Go library, and I stumbled upon two distinct patterns of synchronization between goroutines whose results are similar:

Waitgroup

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func main() {
	words := []string{"foo", "bar", "baz"}

	for _, word := range words {
		wg.Add(1)
		go func(word string) {
			time.Sleep(1 * time.Second)
			defer wg.Done()
			fmt.Println(word)
		}(word)
	}
	// do concurrent things here

	// blocks/waits for waitgroup
	wg.Wait()
}

Channel

package main

import (
	"fmt"
	"time"
)

func main() {
	words := []string{"foo", "bar", "baz"}
	done := make(chan bool)
	// defer close(done)
	for _, word := range words {
		// fmt.Println(len(done), cap(done))
		go func(word string) {
			time.Sleep(1 * time.Second)
			fmt.Println(word)
			done <- true
		}(word)
	}
	// Do concurrent things here

	// This blocks and waits for signal from channel
	for range words {
		<-done
	}
}

I was advised that sync.WaitGroup is slightly more performant, and I have seen it being used commonly. However, I find channels more idiomatic. What is the real advantage of using sync.WaitGroup over channels and/or what might be the situation when it is better?

答案1

得分: 70

独立于你的第二个示例的正确性(如评论中所解释的,你并没有做你认为的那样,但很容易修复),我倾向于认为第一个示例更容易理解。

现在,我甚至不会说通道更符合惯用法。通道作为Go语言的一个特色功能,并不意味着在任何可能的情况下都习惯于使用它们。在Go语言中,符合惯用法的做法是使用最简单和最容易理解的解决方案:在这里,WaitGroup既传达了含义(你的主函数正在等待工作线程完成),又传达了机制(工作线程在完成时通知)。

除非你处于非常特殊的情况,我不建议在这里使用通道解决方案。

英文:

Independently of the correctness of your second example (as explained in the comments, you aren't doing what you think, but it's easily fixable), I tend to think that the first example is easier to grasp.

Now, I wouldn't even say that channels are more idiomatic. Channels being a signature feature of the Go language shouldn't mean that it is idiomatic to use them whenever possible. What is idiomatic in Go is to use the simplest and easiest to understand solution: here, the WaitGroup convey both the meaning (your main function is Waiting for workers to be done) and the mechanic (the workers notify when they are Done).

Unless you're in a very specific case, I don't recommend using the channel solution here.

答案2

得分: 23

对于你的简单示例(标记作业完成),WaitGroup 是一个明显的选择。Go 编译器非常友好,不会因为你使用通道来简单地标记完成任务而责备你,但是一些代码审查者可能会这样做。

  1. "WaitGroup 等待一组 goroutine 完成。主 goroutine 调用 Add(n) 来设置要等待的 goroutine 数量。然后每个 goroutine 运行并在完成时调用 Done()。同时,可以使用 Wait 来阻塞,直到所有 goroutine 完成。"
words := []string{"foo", "bar", "baz"}
var wg sync.WaitGroup
for _, word := range words {
	wg.Add(1)
	go func(word string) {
		defer wg.Done()
		time.Sleep(100 * time.Millisecond) // 一个任务
		fmt.Println(word)
	}(word)
}
wg.Wait()

可能性仅限于你的想象力:

  1. 通道可以是带缓冲的
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, len(words))
for _, word := range words {
	go func(word string) {
		time.Sleep(100 * time.Millisecond) // 一个任务
		fmt.Println(word)
		done <- struct{}{} // 不阻塞
	}(word)
}
for range words {
	<-done
}
  1. 通道可以是无缓冲的,你可以只使用一个信号通道(例如 chan struct{}):
words := []string{"foo", "bar", "baz"}
done := make(chan struct{})
for _, word := range words {
	go func(word string) {
		time.Sleep(100 * time.Millisecond) // 一个任务
		fmt.Println(word)
		done <- struct{}{} // 阻塞
	}(word)
}
for range words {
	<-done
}
  1. 你可以使用带缓冲的通道容量来限制并发作业的数量:
t0 := time.Now()
var wg sync.WaitGroup
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, 1) // 在这里设置并发作业的数量
for _, word := range words {
	wg.Add(1)
	go func(word string) {
		done <- struct{}{}
		time.Sleep(100 * time.Millisecond) // 一个任务
		fmt.Println(word, time.Since(t0))
		<-done
		wg.Done()
	}(word)
}
wg.Wait()
  1. 你可以使用通道发送消息:
done := make(chan string)
go func() {
	for _, word := range []string{"foo", "bar", "baz"} {
		done <- word
	}
	close(done)
}()
for word := range done {
	fmt.Println(word)
}

基准测试:

go test -benchmem -bench . -args -n 0
# BenchmarkEvenWaitgroup-8  1827517   652 ns/op    0 B/op  0 allocs/op
# BenchmarkEvenChannel-8    1000000  2373 ns/op  520 B/op  1 allocs/op
go test -benchmem -bench .
# BenchmarkEvenWaitgroup-8  1770260   678 ns/op    0 B/op  0 allocs/op
# BenchmarkEvenChannel-8    1560124  1249 ns/op  158 B/op  0 allocs/op

代码(main_test.go):

package main

import (
	"flag"
	"fmt"
	"os"
	"sync"
	"testing"
)

func BenchmarkEvenWaitgroup(b *testing.B) {
	evenWaitgroup(b.N)
}
func BenchmarkEvenChannel(b *testing.B) {
	evenChannel(b.N)
}
func evenWaitgroup(n int) {
	if n%2 == 1 { // 使其为偶数:
		n++
	}
	for i := 0; i < n; i++ {
		wg.Add(1)
		go func(n int) {
			select {
			case ch <- n: // 如果通道为空,则发送
			case i := <-ch: // 如果通道不为空,则接收
				// fmt.Println(n, i)
				_ = i
			}
			wg.Done()
		}(i)
	}
	wg.Wait()
}
func evenChannel(n int) {
	if n%2 == 1 { // 使其为偶数:
		n++
	}
	for i := 0; i < n; i++ {
		go func(n int) {
			select {
			case ch <- n: // 如果通道为空,则发送
			case i := <-ch: // 如果通道不为空,则接收
				// fmt.Println(n, i)
				_ = i
			}
			done <- struct{}{}
		}(i)
	}
	for i := 0; i < n; i++ {
		<-done
	}
}
func TestMain(m *testing.M) {
	var n int // 我们使用 TestMain 来设置 done 通道。
	flag.IntVar(&n, "n", 1_000_000, "chan cap")
	flag.Parse()
	done = make(chan struct{}, n)
	fmt.Println("n=", n)
	os.Exit(m.Run())
}

var (
	done chan struct{}
	ch   = make(chan int)
	wg   sync.WaitGroup
)
英文:

For your simple example (signalling the completion of jobs), the WaitGroup is the obvious choice. And the Go compiler is very kind and won't blame you for using a channel for the simple signalling of the completion task, but some code reviewer do.

  1. "A WaitGroup waits for a collection of goroutines to finish.
    The main goroutine calls Add(n) to set the number of
    goroutines to wait for. Then each of the goroutines
    runs and calls Done() when finished. At the same time,
    Wait can be used to block until all goroutines have finished."
words := []string{&quot;foo&quot;, &quot;bar&quot;, &quot;baz&quot;}
var wg sync.WaitGroup
for _, word := range words {
wg.Add(1)
go func(word string) {
defer wg.Done()
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
}(word)
}
wg.Wait()

The possibilities are limited only by your imagination:

  1. Channels can be buffered:
words := []string{&quot;foo&quot;, &quot;bar&quot;, &quot;baz&quot;}
done := make(chan struct{}, len(words))
for _, word := range words {
go func(word string) {
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
done &lt;- struct{}{} // not blocking
}(word)
}
for range words {
&lt;-done
}
  1. Channels can be unbuffered, and you may use just a signalling channel (e.g. chan struct{}):
words := []string{&quot;foo&quot;, &quot;bar&quot;, &quot;baz&quot;}
done := make(chan struct{})
for _, word := range words {
go func(word string) {
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
done &lt;- struct{}{} // blocking
}(word)
}
for range words {
&lt;-done
}
  1. You may limit the number of concurrent jobs with buffered channel capacity:
t0 := time.Now()
var wg sync.WaitGroup
words := []string{&quot;foo&quot;, &quot;bar&quot;, &quot;baz&quot;}
done := make(chan struct{}, 1) // set the number of concurrent job here
for _, word := range words {
wg.Add(1)
go func(word string) {
done &lt;- struct{}{}
time.Sleep(100 * time.Millisecond) // job
fmt.Println(word, time.Since(t0))
&lt;-done
wg.Done()
}(word)
}
wg.Wait()
  1. You may send a message using a channel:
done := make(chan string)
go func() {
for _, word := range []string{&quot;foo&quot;, &quot;bar&quot;, &quot;baz&quot;} {
done &lt;- word
}
close(done)
}()
for word := range done {
fmt.Println(word)
}

Benchmark:

	go test -benchmem -bench . -args -n 0
# BenchmarkEvenWaitgroup-8  1827517   652 ns/op    0 B/op  0 allocs/op
# BenchmarkEvenChannel-8    1000000  2373 ns/op  520 B/op  1 allocs/op
	go test -benchmem -bench .
# BenchmarkEvenWaitgroup-8  1770260   678 ns/op    0 B/op  0 allocs/op
# BenchmarkEvenChannel-8    1560124  1249 ns/op  158 B/op  0 allocs/op

Code(main_test.go):

package main
import (
&quot;flag&quot;
&quot;fmt&quot;
&quot;os&quot;
&quot;sync&quot;
&quot;testing&quot;
)
func BenchmarkEvenWaitgroup(b *testing.B) {
evenWaitgroup(b.N)
}
func BenchmarkEvenChannel(b *testing.B) {
evenChannel(b.N)
}
func evenWaitgroup(n int) {
if n%2 == 1 { // make it even:
n++
}
for i := 0; i &lt; n; i++ {
wg.Add(1)
go func(n int) {
select {
case ch &lt;- n: // tx if channel is empty
case i := &lt;-ch: // rx if channel is not empty
// fmt.Println(n, i)
_ = i
}
wg.Done()
}(i)
}
wg.Wait()
}
func evenChannel(n int) {
if n%2 == 1 { // make it even:
n++
}
for i := 0; i &lt; n; i++ {
go func(n int) {
select {
case ch &lt;- n: // tx if channel is empty
case i := &lt;-ch: // rx if channel is not empty
// fmt.Println(n, i)
_ = i
}
done &lt;- struct{}{}
}(i)
}
for i := 0; i &lt; n; i++ {
&lt;-done
}
}
func TestMain(m *testing.M) {
var n int // We use TestMain to set up the done channel.
flag.IntVar(&amp;n, &quot;n&quot;, 1_000_000, &quot;chan cap&quot;)
flag.Parse()
done = make(chan struct{}, n)
fmt.Println(&quot;n=&quot;, n)
os.Exit(m.Run())
}
var (
done chan struct{}
ch   = make(chan int)
wg   sync.WaitGroup
)

答案3

得分: 16

这取决于具体的使用情况。如果你需要并行运行一次性任务,并且不需要知道每个任务的结果,那么你可以使用WaitGroup。但是,如果你需要收集goroutine的结果,那么你应该使用通道(channel)。

由于通道可以双向传输数据,我几乎总是使用通道。

另外,正如评论中指出的,你的通道示例没有正确实现。你需要一个单独的通道来指示没有更多的任务要执行(一个示例在这里)。在你的情况下,由于你事先知道单词的数量,你可以只使用一个带缓冲的通道,并接收固定次数的数据,而不需要声明一个关闭的通道。

英文:

It depends on the use case. If you are dispatching one-off jobs to be run in parallel without needing to know the results of each job, then you can use a WaitGroup. But if you need to collect the results from the goroutines then you should use a channel.

Since a channel works both ways, I almost always use a channel.

On another note, as pointed out in the comment your channel example isn't implemented correctly. You would need a separate channel to indicate there are no more jobs to do (one example is here). In your case, since you know the number of words in advance, you could just use one buffered channel and receive a fixed number of times to avoid declaring a close channel.

答案4

得分: 1

如果你特别关注只使用通道,那么需要以不同的方式完成(如果使用你的示例,如@Not_a_Golfer所指出的,会产生错误的结果)。

一种方法是创建一个 int 类型的通道。在工作进程中,每次完成任务时发送一个数字(如果需要,可以将其作为唯一的任务 ID,在接收器中可以跟踪此 ID)。

在接收器的主 go 协程中(它将知道提交的作业的确切数量),使用一个通道进行循环迭代,直到提交的作业数量完成为止,并在所有作业完成时跳出循环。如果你想要跟踪每个作业的完成情况(并在需要时执行某些操作),这是一个很好的方法。

以下是你参考的代码。在通道的范围循环中递减 totalJobsLeft 是安全的,因为它只会在范围循环中执行!

// 这只是一个示例,演示如何使用通道同步多个作业的完成
// 在许多情况下,更好的方法可能是使用等待组

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {

	comChannel := make(chan int)
	words := []string{"foo", "bar", "baz"}

	totalJobsLeft := len(words)

	// 我们知道有多少个作业正在发送

	for j, word := range words {
		jobId := j + 1
		go func(word string, jobId int) {

			fmt.Println("Job ID:", jobId, "Word:", word)
			// 在这里做一些工作,可能调用你需要的函数
			// 为了模拟这个过程,随机休眠 5 秒钟
			randInt := rand.Intn(5)
			//fmt.Println("Got random number", randInt)
			time.Sleep(time.Duration(randInt) * time.Second)
			comChannel <- jobId
		}(word, jobId)
	}

	for j := range comChannel {
		fmt.Println("Got job ID", j)
		totalJobsLeft--
		fmt.Println("Total jobs left", totalJobsLeft)
		if totalJobsLeft == 0 {
			break
		}
	}
	fmt.Println("Closing communication channel. All jobs completed!")
	close(comChannel)

}
英文:

If you are particularly sticky about using only channels, then it needs to be done differently (if we use your example does, as @Not_a_Golfer points out, it'll produce incorrect results).

One way is to make a channel of type int. In the worker process send a number each time it completes the job (this can be the unique job id too, if you want you can track this in the receiver).

In the receiver main go routine (which will know the exact number of jobs submitted) - do a range loop over a channel, count on till the number of jobs submitted are not done, and break out of the loop when all jobs are completed. This is a good way if you want to track each of the jobs completion (and maybe do something if needed).

Here's the code for your reference. Decrementing totalJobsLeft will be safe as it'll ever be done only in the range loop of the channel!

//This is just an illustration of how to sync completion of multiple jobs using a channel
//A better way many a times might be to use wait groups
package main
import (
&quot;fmt&quot;
&quot;math/rand&quot;
&quot;time&quot;
)
func main() {
comChannel := make(chan int)
words := []string{&quot;foo&quot;, &quot;bar&quot;, &quot;baz&quot;}
totalJobsLeft := len(words)
//We know how many jobs are being sent
for j, word := range words {
jobId := j + 1
go func(word string, jobId int) {
fmt.Println(&quot;Job ID:&quot;, jobId, &quot;Word:&quot;, word)
//Do some work here, maybe call functions that you need
//For emulating this - Sleep for a random time upto 5 seconds
randInt := rand.Intn(5)
//fmt.Println(&quot;Got random number&quot;, randInt)
time.Sleep(time.Duration(randInt) * time.Second)
comChannel &lt;- jobId
}(word, jobId)
}
for j := range comChannel {
fmt.Println(&quot;Got job ID&quot;, j)
totalJobsLeft--
fmt.Println(&quot;Total jobs left&quot;, totalJobsLeft)
if totalJobsLeft == 0 {
break
}
}
fmt.Println(&quot;Closing communication channel. All jobs completed!&quot;)
close(comChannel)
}

答案5

得分: 1

我经常使用通道来收集可能产生错误的goroutine的错误消息。这里有一个简单的示例:

func couldGoWrong() (err error) {
    errorChannel := make(chan error, 3)

    // 启动一个goroutine
    go func() (err error) {
        defer func() { errorChannel <- err }()

        for c := 0; c < 10; c++ {
            _, err = fmt.Println(c)
            if err != nil {
                return
            }
        }

        return
    }()

    // 启动另一个goroutine
    go func() (err error) {
        defer func() { errorChannel <- err }()

        for c := 10; c < 100; c++ {
            _, err = fmt.Println(c)
            if err != nil {
                return
            }
        }

        return
    }()

    // 启动另一个goroutine
    go func() (err error) {
        defer func() { errorChannel <- err }()

        for c := 100; c < 1000; c++ {
            _, err = fmt.Println(c)
            if err != nil {
                return
            }
        }

        return
    }()

    // 同步goroutine并在此处收集错误
    for c := 0; c < cap(errorChannel); c++ {
        err = <-errorChannel
        if err != nil {
            return
        }
    }

    return
}

这段代码展示了如何使用通道来收集多个goroutine可能产生的错误。通过创建一个带有缓冲区的通道,并在每个goroutine中使用defer语句将错误发送到通道中,然后在主goroutine中使用循环从通道中接收错误并进行处理。这样可以确保在所有goroutine完成后,主函数可以收集到所有的错误信息。

英文:

I often use channels to collect error messages from goroutines that could produce an error. Here is a simple example:

func couldGoWrong() (err error) {
errorChannel := make(chan error, 3)
// start a go routine
go func() (err error) {
defer func() { errorChannel &lt;- err }()
for c := 0; c &lt; 10; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// start another go routine
go func() (err error) {
defer func() { errorChannel &lt;- err }()
for c := 10; c &lt; 100; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// start yet another go routine
go func() (err error) {
defer func() { errorChannel &lt;- err }()
for c := 100; c &lt; 1000; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// synchronize go routines and collect errors here
for c := 0; c &lt; cap(errorChannel); c++ {
err = &lt;-errorChannel
if err != nil {
return
}
}
return
}

答案6

得分: 1

这里已经有了一些很好的答案,说明了通道并不总是最合适的选择。例如,当实现工作池时,使用等待组(wait group)会更清晰。

还指出你的通道实现不正确,因为它在第一个条目之后退出,而不是最后一个。

我决定修复它:

package main

import (
	"fmt"
	"time"
)

func main() {
	words := []string{"foo", "bar", "baz", "fax", "bor", "far"}
	workersCount := len(words)
	workersChan := make(chan bool, workersCount)

	for _, word := range words {
		go func(word string) {
			time.Sleep(1 * time.Second)
			fmt.Println(word)
			workersChan <- true
		}(word)
	}

	for i := 0; i != workersCount; i++ {
		<-workersChan
	}
}

英文:

There are already nice answers here that channels are not alway idiomatic. For instance when worker pools is implemented it is more clear to use wait group.

Also it was noted that your channels implementation is not correct because it exits after first entry not the last one.

I decided to fix it:

package main

import (
	&quot;fmt&quot;
	&quot;time&quot;
)

func main() {
	words := []string{&quot;foo&quot;, &quot;bar&quot;, &quot;baz&quot;, &quot;fax&quot;, &quot;bor&quot;, &quot;far&quot;}
	workersCount := len(words)
	workersChan := make(chan bool, workersCount)

	for _, word := range words {
		go func(word string) {
			time.Sleep(1 * time.Second)
			fmt.Println(word)
			workersChan &lt;- true
		}(word)
	}

	for i := 0; i != workersCount; i++ {
		&lt;-workersChan
	}
}

答案7

得分: -2

同时建议使用waitgroup,但如果你仍然想使用channel,那么下面我提到了一个简单的channel使用方法。

package main

import (
	"fmt"
	"time"
)

func main() {
	c := make(chan string)
	words := []string{"foo", "bar", "baz"}

	go printWords(words, c)

	for j := range c {
		fmt.Println(j)
	}
}

func printWords(words []string, c chan string) {
	defer close(c)
	for _, word := range words {
		time.Sleep(1 * time.Second)
		c <- word
	}
}

请注意,我只翻译了代码部分,其他内容不做翻译。

英文:

Also suggest to use waitgroup but still you want to do it with channel then below i mention a simple use of channel

package main
import (
&quot;fmt&quot;
&quot;time&quot;
)
func main() {
c := make(chan string)
words := []string{&quot;foo&quot;, &quot;bar&quot;, &quot;baz&quot;}
go printWordrs(words, c)
for j := range c {
fmt.Println(j)
}
}
func printWordrs(words []string, c chan string) {
defer close(c)
for _, word := range words {
time.Sleep(1 * time.Second)
c &lt;- word
}	
}

huangapple
  • 本文由 发表于 2016年3月17日 17:37:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/36056615.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定