How to use sync.WaitGroup in pipeline

huangapple go评论71阅读模式
英文:

How to use sync.WaitGroup in pipeline

问题

我正在尝试在Go中实现一个流水线,但遇到了一个问题:程序在其他goroutine完成之前就退出了主goroutine。

请使用**等待组(wait groups)**来修复这个问题。

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func main() {
	c1 := make(chan string)
	c2 := make(chan string)

	go sender(c1)
	go removeDuplicates(c1, c2)
	go printer(c2)

	wg.Wait()
}

func sender(outputStream chan string) {
	wg.Add(1)
	defer wg.Done()

	for _, v := range []string{"one", "one", "two", "two", "three", "three"} {
		outputStream <- v
	}

	close(outputStream)
}

func removeDuplicates(inputStream, outputStream chan string) {
	wg.Add(1)
	defer wg.Done()

	temp := ""

	for v := range inputStream {
		if v != temp {
			outputStream <- v
			temp = v
		}
	}

	close(outputStream)
}

func printer(inputStream chan string) {
	wg.Add(1)
	defer wg.Done()

	for v := range inputStream {
		fmt.Println(v)
	}
}

当我在这种情况下使用time.Sleep时,程序成功运行。

英文:

I'm trying to implement a pipeline in Go and there was a problem that the program exits the main goroutine before the rest of the goroutines are finished.

Please, help fix this case using wait groups.

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

var wg sync.WaitGroup

func main() {
	c1 := make(chan string)
	c2 := make(chan string)

	go sender(c1)
	go removeDuplicates(c1, c2)
	go printer(c2)

	wg.Wait()
}

func sender(outputStream chan string) {
	wg.Add(1)
	wg.Done()

	for _, v := range []string{&quot;one&quot;, &quot;one&quot;, &quot;two&quot;, &quot;two&quot;, &quot;three&quot;, &quot;three&quot;} {
		outputStream &lt;- v
	}

	close(outputStream)
}

func removeDuplicates(inputStream, outputStream chan string) {
	wg.Add(1)
	wg.Done()

	temp := &quot;&quot;

	for v := range inputStream {
		if v != temp {
			outputStream &lt;- v
			temp = v
		}
	}

	close(outputStream)
}

func printer(inputStream chan string) {
	wg.Add(1)
	wg.Done()

	for v := range inputStream {
		fmt.Println(v)
	}
}

When I used time.Sleep in this case, the program worked successfully.

答案1

得分: 2

使用等待组(WaitGroup)的第一条规则是:不要在你要等待的goroutine中调用Add()方法。可能会出现在Add()之前调用Wait()的情况,这不是你期望的结果。第二条规则是:在结束时调用Done()方法,而不是在开始时调用。所以,修复的代码如下:

func main() {
    c1 := make(chan string)
    c2 := make(chan string)

    wg.Add(3)

    go sender(c1)
    go removeDuplicates(c1, c2)
    go printer(c2)

    wg.Wait()
}

func sender(outputStream chan string) {
    defer wg.Done()

    for _, v := range []string{"one", "one", "two", "two", "three", "three"} {
        outputStream <- v
    }

    close(outputStream)
}

// 其他两个函数类似

请注意,这是一个示例代码,其中使用了等待组(WaitGroup)来同步goroutine的执行顺序。

英文:

The first rules of working with wait groups: don't call Add() from a goroutine you're going to wait for. It is possible, that Wait() will be called before Add(), which is not what you expect. The second rule: call Done() in the end, not in the beginning. So, the fix would be:

func main() {
    c1 := make(chan string)
    c2 := make(chan string)

    wg.Add(3)

    go sender(c1)
    go removeDuplicates(c1, c2)
    go printer(c2)

    wg.Wait()
}

func sender(outputStream chan string) {
    defer wg.Done()

    for _, v := range []string{&quot;one&quot;, &quot;one&quot;, &quot;two&quot;, &quot;two&quot;, &quot;three&quot;, &quot;three&quot;} {
        outputStream &lt;- v
    }

    close(outputStream)
}

// etc for other two functions

huangapple
  • 本文由 发表于 2021年9月6日 16:24:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/69071441.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定