在缓冲区为空后关闭“worker”协程。

huangapple go评论102阅读模式
英文:

Shutdown "worker" go routine after buffer is empty

问题

我想让我的Go协程工作程序(在下面的代码中的ProcessToDo()函数)在关闭之前等待所有“排队”的工作完成。

工作程序有一个“待处理”通道(带缓冲),通过该通道将工作发送给它。它还有一个“完成”通道,用于告诉它开始关闭。文档中说,如果多个选择都满足条件,select语句会选择一个“伪随机值”...这意味着在所有缓冲的工作完成之前,关闭(返回)就被触发了。

在下面的代码示例中,我希望所有的20条消息都被打印出来...

package main

import (
	"time"
	"fmt"
)


func ProcessToDo(done chan struct{}, todo chan string) {
	for {
		select {
		case work, ok := <-todo:
			if !ok {
				fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
				return
			}
			fmt.Printf("todo: %q\n", work)
			time.Sleep(100 * time.Millisecond)
		case _, ok := <-done:
			if ok {
				fmt.Printf("Shutting down ProcessToDo - done message received!\n")
			} else {
				fmt.Printf("Shutting down ProcessToDo - done channel closed!\n")
			}
			close(todo)
			return
		}
	}
}

func main() {

	done := make(chan struct{})
	todo := make(chan string, 100)

	go ProcessToDo(done, todo)

	for i := 0; i < 20; i++ {
		todo <- fmt.Sprintf("Message %02d", i)
	}

	fmt.Println("*** all messages queued ***")
	time.Sleep(1 * time.Second)
	close(done)
	time.Sleep(4 * time.Second)
}
英文:

I want my go routine worker (ProcessToDo() in the code below) to wait until all "queued" work is processed before shutting down.

The worker routine has a "to do" channel (buffered), through which work is sent to it. And it has a "done" channel to tell it to start shutdown. The documentation says that the select on the channels will pick a "pseudo-random value" if more than one of the selects are met... which means the shutdown (return) is being triggered before all the buffered work is completed.

In the code sample below, I want all 20 messages to print...

package main

import (
	&quot;time&quot;
	&quot;fmt&quot;
)


func ProcessToDo(done chan struct{}, todo chan string) {
	for {
		select {
		case work, ok := &lt;-todo:
			if !ok {
				fmt.Printf(&quot;Shutting down ProcessToDo - todo channel closed!\n&quot;)
				return
			}
			fmt.Printf(&quot;todo: %q\n&quot;, work)
			time.Sleep(100 * time.Millisecond)
		case _, ok := &lt;-done:
			if ok {
				fmt.Printf(&quot;Shutting down ProcessToDo - done message received!\n&quot;)
			} else {
				fmt.Printf(&quot;Shutting down ProcessToDo - done channel closed!\n&quot;)
			}
			close(todo)
			return
		}
	}
}

func main() {

	done := make(chan struct{})
	todo := make(chan string, 100)

	go ProcessToDo(done, todo)

	for i := 0; i &lt; 20; i++ {
		todo &lt;- fmt.Sprintf(&quot;Message %02d&quot;, i)
	}

	fmt.Println(&quot;*** all messages queued ***&quot;)
	time.Sleep(1 * time.Second)
	close(done)
	time.Sleep(4 * time.Second)
}

答案1

得分: 9

在你的情况下,done通道是完全不必要的,因为你可以通过关闭todo通道本身来发出关闭信号。

并且在通道上使用for range,它会迭代直到通道关闭且其缓冲区为空。

你应该有一个done通道,但只是为了让goroutine本身可以发出完成工作的信号,以便主goroutine可以继续或退出。

这个变体与你的代码等效,更简单,并且不需要time.Sleep()调用来等待其他goroutine(这将是错误和不确定的)。在Go Playground上尝试一下:

func ProcessToDo(done chan struct{}, todo chan string) {
    for work := range todo {
        fmt.Printf("todo: %q\n", work)
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
    done <- struct{}{} // Signal that we processed all jobs
}

func main() {
    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    close(todo)
    <-done // Wait until the other goroutine finishes all jobs
}

还要注意,工作goroutine应该使用defer来信号完成,这样如果它以某种意外的方式返回或发生恐慌,主goroutine就不会被卡住。所以它应该像这样开始:

defer func() {
    done <- struct{}{} // Signal that we processed all jobs
}()

你还可以使用sync.WaitGroup来将主goroutine与工作goroutine同步(等待它)。实际上,如果你计划使用多个工作goroutine,这比从done通道中读取多个值更清晰。而且使用WaitGroup来信号完成更简单,因为它有一个Done()方法(一个函数调用),所以你不需要使用匿名函数:

defer wg.Done()

参见JimB的答案,其中包含使用WaitGroup的完整示例。

如果你想要使用多个工作goroutine并发处理作业(很可能是并行的),那么现在你只需要在代码中添加/更改以下内容:以真正启动多个工作goroutine:

for i := 0; i < 10; i++ {
    wg.Add(1)
    go ProcessToDo(todo)
}

在不改变其他任何内容的情况下,现在你有一个正确的并发应用程序,它使用10个并发的goroutine接收和处理你的作业。我们没有使用任何“丑陋”的time.Sleep()(我们只使用了一个来模拟慢速处理,而不是等待其他goroutine),也不需要任何额外的同步。

英文:

done channel in your case is completely unnecessary as you can signal the shutdown by closing the todo channel itself.

And use the for range on the channel which will iterate until the channel is closed and its buffer is empty.

You should have a done channel, but only so that the goroutine itself can signal that it finished work and so the main goroutine can continue or exit.

This variant is equivalent to yours, is much simpler and does not require time.Sleep() calls to wait other goroutines (which would be too erroneous and undeterministic anyway). Try it on the Go Playground:

func ProcessToDo(done chan struct{}, todo chan string) {
    for work := range todo {
		fmt.Printf(&quot;todo: %q\n&quot;, work)
		time.Sleep(100 * time.Millisecond)
	}
	fmt.Printf(&quot;Shutting down ProcessToDo - todo channel closed!\n&quot;)
	done &lt;- struct{}{} // Signal that we processed all jobs
}

func main() {
	done := make(chan struct{})
	todo := make(chan string, 100)

	go ProcessToDo(done, todo)

	for i := 0; i &lt; 20; i++ {
		todo &lt;- fmt.Sprintf(&quot;Message %02d&quot;, i)
	}

	fmt.Println(&quot;*** all messages queued ***&quot;)
	close(todo)
	&lt;-done // Wait until the other goroutine finishes all jobs
}

Also note that worker goroutines should signal completion using defer so the main goroutine won't get stuck waiting for the worker if it returns in some unexpected way, or panics. So it should rather start like this:

defer func() {
   	done &lt;- struct{}{} // Signal that we processed all jobs
}()

You can also use sync.WaitGroup to sync the main goroutine to the worker (to wait it up). In fact if you plan to use multiple worker goroutines, that is cleaner than to read multiple values from the done channel. Also it's simpler to signal the completion with WaitGroup as it has a Done() method (which is a function call) so you don't need an anonymous function:

defer wg.Done()

See JimB's anwser for the complete example with WaitGroup.

Using the for range is also idiomatic if you want to use multiple worker goroutines: channels are synchronized so you don't need any extra code that would synchronize access to the todo channel or the jobs received from it. And if you close the todo channel in the main(), that will properly signal all worker goroutines. But of course all queued jobs will be received and processed exactly once.

Now taking the variant that uses WaitGroup to make the main goroutine to wait for the worker (JimB's answer): What if you want more than 1 worker goroutine; to process your jobs concurrently (and most likely parallel)?

The only thing you need to add / change in your code is this: to really start multiple of them:

for i := 0; i &lt; 10; i++ {
    wg.Add(1)
    go ProcessToDo(todo)
}

Without changing anything else, you now have a correct, concurrent application which receives and processes your jobs using 10 concurrent goroutines. And we haven't used any "ugly" time.Sleep() (we used one but only to simulate slow processing, not to wait other goroutines), and you don't need any extra synchronization.

答案2

得分: 5

通常情况下,让一个通道的消费者关闭它是一个不好的主意,因为在关闭的通道上发送数据会导致 panic。

在这种情况下,如果你不想在所有消息都发送完之前中断消费者,只需使用 for...range 循环,并在完成后关闭通道。你还需要一个信号,比如 WaitGroup 来等待 goroutine 完成(而不是使用 time.Sleep)。

var wg sync.WaitGroup

func ProcessToDo(todo chan string) {
    defer wg.Done()
    for work := range todo {
        fmt.Printf("todo: %q\n", work)
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
}

func main() {
    todo := make(chan string, 100)
    wg.Add(1)
    go ProcessToDo(todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    close(todo)
    wg.Wait()
}

链接:http://play.golang.org/p/r97vRPsxEb

英文:

It's usually a bad idea to have a consumer of a channel close it, since sending on a closed channel is a panic.

In this case, if you never want to interrupt the consumer before all messages have been sent, just use a for...range loop and close the channel when you're done. You will also need a signal like a WaitGroup to wait for the goroutine to finish (rather than using time.Sleep)

http://play.golang.org/p/r97vRPsxEb

var wg sync.WaitGroup

func ProcessToDo(todo chan string) {
	defer wg.Done()
	for work := range todo {
		fmt.Printf(&quot;todo: %q\n&quot;, work)
		time.Sleep(100 * time.Millisecond)

	}
	fmt.Printf(&quot;Shutting down ProcessToDo - todo channel closed!\n&quot;)

}

func main() {
	todo := make(chan string, 100)
	wg.Add(1)
	go ProcessToDo(todo)

	for i := 0; i &lt; 20; i++ {
		todo &lt;- fmt.Sprintf(&quot;Message %02d&quot;, i)
	}

	fmt.Println(&quot;*** all messages queued ***&quot;)
	close(todo)
	wg.Wait()
}

答案3

得分: 0

我认为对于这个特定的例子,接受的答案是相当有效的。然而,为了回答问题"在缓冲区为空时关闭“worker”协程",有一个更优雅的解决方案。

当缓冲区为空时,工作协程可以直接返回,而不需要通过关闭通道来发出信号。

如果工作协程需要处理的任务数量是未知的,这种方法尤其有用。

请在这里查看代码示例:https://play.golang.org/p/LZ1y0eIRMeS

package main

import (
	"fmt"
	"time"
	"math/rand"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	ch := make(chan interface{}, 10)
	
	go worker(ch)
	for i := 1; i <= rand.Intn(9) + 1; i++ {
		ch <- i
	}
	
	blocker := make(chan interface{})
	<-blocker
}

func worker(ch chan interface{}){
	for {
		select {
		case msg := <- ch:
			fmt.Println("msg: ", msg)
		default:
			fmt.Println("exiting worker")
			return
		}
	}		
}
英文:

I think the accepted answer is pretty valid for this specific example. However to answer the question "Shutdown “worker” go routine after buffer is empty" - a more elegant solution is possible.

The worker can just return when the buffer is empty without needing to signal by closing the channel.

This is especially useful if the number of tasks that the worker needs to process in not known.

Check it out here: https://play.golang.org/p/LZ1y0eIRMeS

package main

import (
	&quot;fmt&quot;
	&quot;time&quot;
	&quot;math/rand&quot;
)

func main() {
	rand.Seed(time.Now().UnixNano())
	ch := make(chan interface{}, 10)
	
	go worker(ch)
	for i := 1; i &lt;= rand.Intn(9) + 1; i++ {
    		ch &lt;- i
	}
	
	blocker := make(chan interface{})
	&lt;-blocker
}

func worker(ch chan interface{}){	
	for {
		select {
		case msg := &lt;- ch:
			fmt.Println(&quot;msg: &quot;, msg)
		default:
			fmt.Println(&quot;exiting worker&quot;)
			return
		}
	}		
}

huangapple
  • 本文由 发表于 2015年9月4日 02:34:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/32383063.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定