Goroutines与sync.WaitGroup在最后一个wg.Done()之前结束。

huangapple go评论82阅读模式
英文:

Goroutines with sync.WaitGroup end before last wg.Done()

问题

我有一个示例代码(你可以在Go Playground上找到):

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    messages := make(chan int)
    var wg sync.WaitGroup
    var result []int

    // 如果需要,你也可以逐个添加这些

    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 1
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 2
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 3
    }()
    go func() {
        for i := range messages {
            fmt.Println(i)
            result = append(result, i)
        }

    }()

    wg.Wait()
    fmt.Println(result)
}

我得到了以下输出:

2
1
[2 1]

我认为我知道为什么会发生这种情况,但我无法解决它。WaitGroup中有3个项目,也就是三个goroutine,而第四个goroutine从通道中消费数据。当最后一个goroutine调用wg.Done()时,程序结束了,因为wg.Wait()表示每个goroutine都已经完成,而最后一个goroutine无法消费数据,因为程序已经结束。我尝试在第四个函数中使用wg.Add(1)wg.Done()来增加一个计数,但在这种情况下,我遇到了死锁。

英文:

I have an example code (you can find it on Go Playground):

package main

import (
    &quot;fmt&quot;
    &quot;sync&quot;
    &quot;time&quot;
)

func main() {
    messages := make(chan int)
    var wg sync.WaitGroup
    var result []int

    // you can also add these one at 
    // a time if you need to 

    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages &lt;- 1
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages &lt;- 2
    }() 
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages &lt;- 3
    }()
    go func() {
        for i := range messages {
            fmt.Println(i)
	    result = append(result, i)
        }
	
    }()

    wg.Wait()
    fmt.Println(result)
}

I got this output:

2
1
[2 1]

I think I know why is it happening, but I can't solve it. There is 3 item in the WaitGroup I mean the three goroutine, and the 4th groutine consume the data from the channel. When the last groutine say wg.Done() the program is over because of the wg.Wait() said every goroutine finished and the last goroutine result the 4th goroutine couldn't have consume, because the program ended. I tryed to add plus one with wg.Add(1) and the wg.Done() in the 4th function but in this case I got deadlock.

答案1

得分: 6

关闭通道是Go语言中的一种惯用模式,用于信号传递。如果关闭一个带缓冲的通道,消费者可以读取所有排队的数据,然后停止。

以下是正确工作的代码:

func main() {
    messages := make(chan int)
    var wg sync.WaitGroup
    var result []int

    // 如果需要,也可以逐个添加这些
    // goroutine

    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 1
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 2
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 3
    }()

    // 这个goroutine用于关闭messages通道,以信号结束数据流
    go func() {
        wg.Wait()
        close(messages)
    }()

    // 如果需要在goroutine内部执行此操作,
    // 可以使用此通道作为工作结束的信号,
    // 也可以使用另一个sync.WaitGroup,但对于只有一个
    // goroutine,使用单个通道作为信号是有意义的(没有
    // 组)
    done := make(chan struct{})
    go func() {
        defer close(done)
        for i := range messages {
            fmt.Println(i)
            result = append(result, i)
        }
    }()

    <-done
    fmt.Println(result)
}

如你所见,我们只是添加了另一个goroutine来关闭messages通道,当所有生产者都完成时。

英文:

Closing channels is an idiomatic Go pattern for signalling and if you close a buffered channel, the consumer can read all the queued data then stop.

This code works correctly:

func main() {
	messages := make(chan int)
	var wg sync.WaitGroup
	var result []int

	// you can also add these one at
	// a time if you need to

	wg.Add(1)
	go func() {
		defer wg.Done()
		time.Sleep(time.Second * 1)
		messages &lt;- 1
	}()
	wg.Add(1)
	go func() {
		defer wg.Done()
		time.Sleep(time.Second * 1)
		messages &lt;- 2
	}()
	wg.Add(1)
	go func() {
		defer wg.Done()
		time.Sleep(time.Second * 1)
		messages &lt;- 3
	}()

	// this goroutine added to signal end of data stream
	// by closing messages channel
	go func() {
		wg.Wait()
		close(messages)
	}()

	// if you need this to happen inside a go routine,
	// this channel is used for signalling end of the work,
	// also another sync.WaitGroup could be used, but for just one
	// goroutine, a single channel as a signal makes sense (there is no
	// groups)
	done := make(chan struct{})
	go func() {
		defer close(done)
		for i := range messages {
			fmt.Println(i)
			result = append(result, i)
		}
	}()

	&lt;-done
	fmt.Println(result)
}

As you see we just added another goroutine that closes the messages channel, when all producers are done.

答案2

得分: 4

你生成的最后一个goroutine——用于收集结果的那个——没有被main()等待,所以在那里的wg.Wait()返回后,main()退出并回收了剩余的goroutine。

假设在那个时候只剩下一个单独的收集goroutine,但它未能更新切片。

还要注意,由于同样的原因,你的程序存在数据竞争:当main()读取结果切片时,它不知道是否安全读取它——也就是说,写入者是否完成了对其的写入。

一个简单的修复方法是为该goroutine添加wg.Add(1),并在其中使用defer wg.Done()

一个更好的解决方案是在wg.Wait()之后、从切片中读取之前,关闭messages通道。这将使收集goroutine的range循环终止,并在该goroutine和main()之间创建一个适当的同步点。

英文:

The last goroutine you spawn—that one intended to collect the results—is not waited by main() so wg.Wait() in there returns, main() quits and reaps the remainin goroutines.
Supposedly just a single—collecting—goroutine remains by that time but it fails to update the slice.

Also note that due to the same reason you have a data race in your program: by the time main() reads the slice of results, it does not know whether it's safe to read it—that is, whether the writer is done writing to there.

An easy fix is to do add wg.Add(1) for that goroutine and defer wg.Done() in it, too.

A better solution is to close() the messages channel after wg.Wait()
and before reading from the slice. This would make the collecting goroutine's range loop to terminate and this would also create a proper synchronization point between that goroutine and main().

答案3

得分: 0

kostix的回答在提到以下内容时是正确的:

一个简单的解决方法是在该goroutine中添加wg.Add(1),并在其中使用defer wg.Done()

这将导致循环在未关闭messages通道的情况下永远无法结束!因此,主goroutine会在最后一个“收集”goroutine完成之前再次结束。您还会收到一个错误,因为将一个goroutine绑定到永远不会发送Done()信号的wg WaitGroup。

然后,当他们提到:

一个更好的解决方法是在wg.Wait()之后和从切片中读取之前close() messages通道

他们建议的位置会再次导致相同的错误,因为您将在相同的WaitGroup wg上等待。而您的最后一个“收集”goroutine将继续在messages通道中寻找更多的消息,并且永远不会达到延迟的wg.Done()

然后,Alex Yu的评论通过在完全读取结果之前等待修复了这个问题,这是一个很好的修复方法。但是,如果您希望您的收集goroutine立即开始,并且在它开始从该通道读取之前不等待所有先前的goroutine(写入messages通道),我建议以下操作...

创建一个结果WaitGroup,在启动最后一个“收集”goroutine之前Add(1),在最后一个“收集”goroutine内部defer wgResult.Done(),然后在wg.Wait()fmt.Println(result)之间,您应该close(messages)wgResult.Wait()

这样可以让所有的goroutine尽快启动,并且只在需要时等待写入goroutine和读取goroutine。

这是一个带有建议解决方案的GoPlayground链接:

https://play.golang.org/p/na0JS1HTwNP

英文:

kostix's answer was correct till they mentioned

> An easy fix is to do add wg.Add(1) for that goroutine and defer wg.Done() in it, too.

That would cause your loop to never finish without the messages channel being closed! So the main goroutine would again finish before your last "collecting" goroutine finishes. You would also get an error for having a goroutine tied to your wg WaitGroup that will never send the Done() signal.

Then again when they mentioned

> A better solution is to close() the messages channel after wg.Wait() and before reading from the slice

The placement they suggested will again give you the same error since you'll be waiting on the same WaitGroup wg. While your last "collecting" goroutine will keep looking for more messages in your messages channel and will never reach the deferred wg.Done()

Then Alex Yu's comment fixed it by waiting before reading the results altogether, which is a good fix. But if you want your collecting goroutine to start right away and NOT wait for all previous goroutines (that write to messages channel) to finish before it starts reading from said channel, I would suggest the following...

Create a result WaitGroup, Add(1) before starting your last "collecting" goroutine, defer wgResult.Done() inside your last "collecting" goroutine, then at the end, between your wg.Wait() and your fmt.Println(result), you should close(messages) and wgResult.Wait().

This allows all your go routines to start as soon as possible and waiting only when needed on the writing goroutines as well as the reading one.

Here's a GoPlayground link with the suggested solution

https://play.golang.org/p/na0JS1HTwNP

huangapple
  • 本文由 发表于 2017年4月2日 17:28:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/43166750.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定