Go程序在goroutine工作完成之前退出

huangapple go评论85阅读模式
英文:

Go program exits before goroutine workers complete

问题

我遇到了一个问题,不太理解如何正确地阻塞和关闭通道。我启动了一些任意数量的工作线程,发现我的主函数要么在工作线程完成之前退出,要么由于未关闭的通道而挂起。我需要一种更好的方式,在工作线程读取通道时阻塞主函数,然后在完成后优雅地关闭通道以结束循环。我尝试了一些方法,包括使用等待组,但问题仍然存在。我注意到通过添加 time.Sleep,程序按预期工作,但是如果将其注释掉,就不会有任何工作完成。

这是一个可运行的示例,其中包含了 Sleep:https://go.dev/play/p/QHqNj-AJQBI。这是有问题的代码,其中注释掉了 Sleep 的超时部分:

package main

import (
	"fmt"
	"sync"
	"time"
)

// 一些复杂的工作
func do(num int, ch chan<- int) {
	time.Sleep(time.Duration(500 * time.Millisecond))
	ch <- num
}

func main() {

	results := make(chan int)

	// 对于一些需要进行的复杂工作
	for i := 0; i < 53; i++ {
		go do(i, results)
	}

	var wg sync.WaitGroup

	// 启动 3 个可以处理结果的工作线程
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			worker(id, results)
		}(i)
	}

	// 在所有工作线程完成时关闭通道
	go func() {
		wg.Wait()
		close(results)
	}()

	//time.Sleep(time.Duration(10 * time.Second))

	fmt.Println("donezo")
}

// 以有意义的方式处理 do() 的结果
func worker(id int, ch <-chan int) {
	fmt.Println("starting worker", id)

	for i := range ch {
		fmt.Println("channel val:", i)
	}
}

我还尝试将 defer wg.Done() 移到 worker() 函数内部,但问题依然存在,并且没有 Sleep 也无法正常工作。

// 以有意义的方式处理 do() 的结果
func worker(wg *sync.WaitGroup, id int, ch <-chan int) {
	fmt.Println("starting worker", id)

	defer wg.Done()

	for i := range ch {
		fmt.Println("channel val:", i)
	}
}

我选择了错误的范例,还是使用方法不正确?

英文:

I'm having an issue understanding how to properly block and close channels. I'm starting some arbitrary number of workers and I've discovered that my main function either exits before the workers have completed or hangs due to unclosed channel. I need a better way of blocking while the workers read the channel without main exiting and then gracefully close the channel to end the loop when it's done. Any attempt I've made ends in a deadlock.

I've tried a few things including using a wait group but the issue persists. I noticed that by adding time.Sleep, the program works as expected, but commenting it out results in no work done.

time.Sleep(time.Duration(10 * time.Second))

Here's a runable example https://go.dev/play/p/QHqNj-AJQBI with Sleep left in. And here's the broken code with the Sleep timeout commented out.

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

// some complicated work
func do(num int, ch chan&lt;- int) {
	time.Sleep(time.Duration(500 * time.Millisecond))
	ch &lt;- num
}

func main() {

	results := make(chan int)

	// for some number of required complicated work
	for i := 0; i &lt; 53; i++ {
		go do(i, results)
	}

	var wg sync.WaitGroup

	// start 3 workers which can process results
	for i := 0; i &lt; 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			worker(id, results)
		}(i)
	}

	// handle closing the channel when all workers complete
	go func() {
		wg.Wait()
		close(results)
	}()

	//time.Sleep(time.Duration(10 * time.Second))

	fmt.Println(&quot;donezo&quot;)
}

// process the results of do() in a meaningful way
func worker(id int, ch &lt;-chan int) {
	fmt.Println(&quot;starting worker&quot;, id)

	for i := range ch {
		fmt.Println(&quot;channel val:&quot;, i)
	}
}

I've also tried moving the defer wg.Done() to inside the worker() func but it's the same issue and doesn't work without sleep.

// process the results of do() in a meaningful way
func worker(wg *sync.WaitGroup, id int, ch &lt;-chan int) {
	fmt.Println(&quot;starting worker&quot;, id)

	defer wg.Done()

	for i := range ch {
		fmt.Println(&quot;channel val:&quot;, i)
	}
}

Have I chosen the wrong paradigm or am I just using it wrong?

答案1

得分: 2

使用以下代码:

// 首先启动工作线程。这样可以在不使用goroutine的情况下向工作线程提供工作。

var wg sync.WaitGroup
for i := 0; i < 3; i++ {
    wg.Add(1)
    go func(id int) {
        defer wg.Done()
        worker(id, results)
    }(i)
}

// 向工作线程提供工作。

for i := 0; i < 53; i++ {
    do(i, results)
}

// 关闭通道以表示没有更多的工作。 
// 这必须在所有工作都被提供之后进行。

close(results)

// 等待工作线程完成。
wg.Wait()

这段代码启动了3个工作线程,并向它们提供工作。然后,它关闭通道以指示没有更多的工作,并等待工作线程完成。

英文:

Use this code:

// Start the workers first. This allows you to feed
// work to the workers without using goroutines.
var wg sync.WaitGroup
for i := 0; i &lt; 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, results)
}(i)
}
// Feed work to the workers.
for i := 0; i &lt; 53; i++ {
do(i, results)
}
// Close channel to indicate no more work. 
// This must come after all work is fed in.
close(results)
// Wait for workers to complete.
wg.Wait()

答案2

得分: 1

我最初问的问题是:“我是否可以对代码进行一点小修改使其正常工作,还是必须重新思考问题?”我发现的答案是,是的,可以进行一点小修改。

我学到了一个有趣的基本概念,关于通道:你可以从一个关闭的通道中读取,也就是排空通道。如我在原始示例中提到的,range从未终止,因为我找不到一个合适的位置来关闭通道,即使我以其他创造性的方式强制关闭通道,程序也会表现出不希望出现的行为:

  • 在处理通道中的所有内容之前就退出
  • 死锁或在关闭的通道上发送

这是因为“真实”代码的细微差别,其中处理通道内容所需的时间比填充通道所需的时间更长,导致事物不同步。

由于在我的发送方没有明确的实用方法来关闭通道(这在99%的通道教程中是推荐的),所以在具有多个读取通道的工作者中,在主函数中通过一个goroutine关闭通道是可以接受的,而工作者们则不知道哪个工作者读取了最后一个值。

解决方案

我将工作者们包装在自己的sync.WaitGroup中,并使用worker.Wait()来阻止程序退出,以允许工作者们“完成”。独立地,当没有更多数据要发送时,我通过close()关闭通道,也就是通过使用它们自己的等待组来阻塞等待写入者完成。关闭操作使得range循环具有终止条件,因为当通道的默认值返回时,即达到通道末尾时,循环将结束,返回EOF类型的返回值。阻塞的会合通道没有终点,直到它被关闭。

我对此的理解是,如果你不知道在并行推送多少个值到无缓冲通道时,go无法知道无缓冲通道的长度,直到你关闭它。因为关闭通道意味着读取剩下的值,直到终止值或结束。而workers.Wait()会阻塞,直到这个过程完成。

解决方案的示例:
https://go.dev/play/p/NzKIYOqaGGD

读取关闭通道的示例:
https://go.dev/play/p/HYZofbz0Xc5

输出:

填充 0
填充 1
填充 2
填充 3
填充 4
填充 5
填充 6
填充 7
填充 8
填充 9
关闭
排空 0
排空 1
排空 2
排空 3
排空 4
排空 5
排空 6
排空 7
排空 8
排空 9
英文:

I originally asked “Is there a small tweak I can make to my code to make this work or do I have to rethink the problem?” The answer I’ve discovered is, yes, there is a small tweak.

I had to learn an interesting fundamental concept about channels: You can read from a closed channel i.e. drain a channel. As mentioned the range in my original example never terminates because I couldn’t find a good place to close the channel and even when I forced it some other creative way the program was exhibiting undesired behaviour

  • exiting short of processing all the contents within a channel
  • deadlocks or send on closed channel

This is because of the nuances of the “real” code where processing the contents of a channel takes longer than it took to fill the channel and things were just out of sync.

Since there’s no clear practical way within my sender to close the channel which is recommended in 99% of channel tutorials, it’s actually acceptable to do it in main via a goroutine when you have multiple workers reading a channel and the workers are unaware of which of them read the last value.

solution

I’ve wrapped the workers in their own sync.WaitGroup and used worker.Wait() to block the progam from exiting, allowing the works to “finish”. Independently I close() the channel when there's no more data to send, i.e. I block by waiting for the writers to finish by using their own wait group. The close gives the range loop a terminating case as it’ll end when the default value of the channel is returned, i.e. and EOF type return when the channel end is reached. A blocking rendezvous channel has no end, until it's closed.

The way I think about this is go has no way of knowing the length of an unbuffered channel as it’s being ranged over if you don’t know how many values will be pushed to it in parrellel, until you close it. Since closing it says read whatever is left until the terminating value or end. And the workers.Wait() blocks until that's complete.

example of the solved op
https://go.dev/play/p/NzKIYOqaGGD

example of reading a closed channel
https://go.dev/play/p/HYZofbz0Xc5

output

filling 0
filling 1
filling 2
filling 3
filling 4
filling 5
filling 6
filling 7
filling 8
filling 9
closed
empyting 0
empyting 1
empyting 2
empyting 3
empyting 4
empyting 5
empyting 6
empyting 7
empyting 8
empyting 9

huangapple
  • 本文由 发表于 2023年4月24日 23:50:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/76093758.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定