关闭长度未知的通道

huangapple go评论67阅读模式
英文:

Closing channel of unknown length

问题

我无法关闭通道,因为对其长度没有任何了解。

package main

import (
	"fmt"
	"time"
)

func gen(ch chan int) {
	var i int
	for {
		time.Sleep(time.Millisecond * 10)
		ch <- i
		i++
		// 当没有更多数据时(例如来自数据库或事件流)
		if i > 100 {
			break
		}
	}

	// 如何正确关闭通道?
	close(ch)
}

func receiver(ch chan int) {
	for i := range ch {
		fmt.Println("received:", i)
	}
}

func main() {
	ch := make(chan int)

	for i := 0; i < 10; i++ {
		go gen(ch)
	}

	receiver(ch)
}

它给我一个错误:

panic: send on closed channel

goroutine 8 [running]:
main.gen(0xc82001a0c0)
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:12 +0x57
created by main.main
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:35 +0xbd

goroutine 1 [panicwait]:
runtime.gopark(0x0, 0x0, 0x50b8e0, 0x9, 0x10, 0x1)
	/usr/lib/go/src/runtime/proc.go:185 +0x163
runtime.main()
	/usr/lib/go/src/runtime/proc.go:121 +0x2f4
runtime.goexit()
	/usr/lib/go/src/runtime/asm_amd64.s:1696 +0x1

goroutine 6 [sleep]:
time.Sleep(0x989680)
	/usr/lib/go/src/runtime/time.go:59 +0xf9
main.gen(0xc82001a0c0)
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29
created by main.main
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:33 +0x79

goroutine 7 [sleep]:
time.Sleep(0x989680)
	/usr/lib/go/src/runtime/time.go:59 +0xf9
main.gen(0xc82001a0c0)
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29
created by main.main
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:34 +0x9b
exit status 2

这是合乎逻辑的 - 第一个 goroutine 在第二个 goroutine 尝试发送数据时关闭了通道。在这种情况下,最好的关闭通道的方法是什么?

英文:

I'm not able to close channel when there is no knowledge about its
length

package main

import (
	&quot;fmt&quot;
	&quot;time&quot;
)

func gen(ch chan int) {
	var i int
	for {
		time.Sleep(time.Millisecond * 10)
		ch &lt;- i
		i++
        // when no more data (e.g. from db, or event stream)
		if i &gt; 100 {
			break
		}
	}

    // hot to close it properly?
	close(ch)
}

func receiver(ch chan int) {
	for i := range ch {
		fmt.Println(&quot;received:&quot;, i)
	}
}

func main() {
	ch := make(chan int)

	for i := 0; i &lt; 10; i++ {
		go gen(ch)
	}

	receiver(ch)
}

It gives me error

panic: send on closed channel

goroutine 8 [running]:
main.gen(0xc82001a0c0)
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:12 +0x57
created by main.main
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:35 +0xbd

goroutine 1 [panicwait]:
runtime.gopark(0x0, 0x0, 0x50b8e0, 0x9, 0x10, 0x1)
	/usr/lib/go/src/runtime/proc.go:185 +0x163
runtime.main()
	/usr/lib/go/src/runtime/proc.go:121 +0x2f4
runtime.goexit()
	/usr/lib/go/src/runtime/asm_amd64.s:1696 +0x1

goroutine 6 [sleep]:
time.Sleep(0x989680)
	/usr/lib/go/src/runtime/time.go:59 +0xf9
main.gen(0xc82001a0c0)
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29
created by main.main
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:33 +0x79

goroutine 7 [sleep]:
time.Sleep(0x989680)
	/usr/lib/go/src/runtime/time.go:59 +0xf9
main.gen(0xc82001a0c0)
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29
created by main.main
	/home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:34 +0x9b
exit status 2

It's logical - first goroutine closing channel when the second one tries to send to it. What will be the best approach to close channel in this situation?

答案1

得分: 21

一旦一个通道关闭,就不能再向其发送值,否则会引发 panic。这就是你所经历的情况。

这是因为你启动了多个使用同一个通道的 goroutine,并且它们在通道上发送值。而且你在每个 goroutine 中都关闭了通道。由于它们没有同步,一旦第一个 goroutine 到达关闭通道的点,其他 goroutine 可能(而且它们会)继续向其发送值:panic!

你只能关闭通道一次(尝试关闭已经关闭的通道也会引发 panic)。而且你应该在所有发送值的 goroutine 完成时关闭它。为了做到这一点,你需要检测所有发送者 goroutine 何时完成。一种惯用的方法是使用 sync.WaitGroup

对于每个启动的发送者 goroutine,我们使用 WaitGroup.Add() 将 1 添加到 WaitGroup。每个完成发送值的 goroutine 可以通过调用 WaitGroup.Done() 来发出信号。最好将其作为延迟语句执行,这样如果你的 goroutine 突然终止(例如发生 panic),WaitGroup.Done() 仍然会被调用,并且不会让其他 goroutine 悬挂(等待一个永远不会到来的“缺失” WaitGroup.Done() 调用)。

WaitGroup.Wait() 将等待直到所有发送者 goroutine 完成,只有在此之后,它才会关闭通道。我们希望检测到这个“全局”完成事件,并在处理发送到通道的值时关闭通道,所以我们必须在它自己的 goroutine 中执行此操作。

接收者 goroutine 将一直运行,直到通道关闭,因为我们在通道上使用了 for ... range 结构。并且由于它在主 goroutine 中运行,程序将在所有值从通道中正确接收和处理之前不会退出。for ... range 结构会循环接收在关闭通道之前发送的所有值。

请注意,下面的解决方案也适用于有缓冲和无缓冲的通道,无需修改(尝试使用带缓冲的通道 ch := make(chan int, 100))。

正确的解决方案(在 Go Playground 上尝试):

func gen(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	var i int
	for {
		time.Sleep(time.Millisecond * 10)
		ch <- i
		i++
		// 当没有更多数据时(例如来自数据库或事件流)
		if i > 100 {
			break
		}
	}
}

func receiver(ch chan int) {
	for i := range ch {
		fmt.Println("received:", i)
	}
}

func main() {
	ch := make(chan int)
	wg := &sync.WaitGroup{}

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go gen(ch, wg)
	}

	go func() {
		wg.Wait()
		close(ch)
	}()

	receiver(ch)
}

注意:

请注意,receiver(ch) 必须在主 goroutine 中运行,而等待 WaitGroup 并在自己的(非主)goroutine 中关闭通道的代码;而不是反过来。如果你交换这两者,可能会导致“提前退出”,也就是并非所有的值都能从通道中接收和处理。原因是当主 goroutine 完成时,Go 程序会退出(规范:程序执行)。它不会等待其他(非主)goroutine 完成。因此,如果等待和关闭通道在主 goroutine 中,关闭通道后程序可能在任何时刻退出,而不会等待在这种情况下循环接收通道中的值的其他 goroutine。

英文:

Once a channel is closed, you can't send further values on it else it panics. This is what you experience.

This is because you start multiple goroutines that use the same channel and they send values on it. And you close the channel in each of it. And since they are not synchronized, once the first goroutine reaches the point where it closes it, others may (and they will) still continue to send values on it: panic!

You can close the channel only once (attempting to close an already closed channel also panics). And you should do it when all the goroutines that send values on it are done. In order to do this, you need to detect when all the sender goroutines are done. An idiomatic way to detect this is to use sync.WaitGroup.

For each started sender goroutine we add 1 to the WaitGroup using WaitGroup.Add(). And each goroutine that is done sending the values can signal this by calling WaitGroup.Done(). Best to do this as a deferred statement, so if your goroutine would terminate abruptly (e.g. panics), WaitGroup.Done() would still be called, and would not leave other goroutines hanging (waiting for an absolution - a "missing" WaitGroup.Done() call that would never come...).

And WaitGroup.Wait() will wait until all sender goroutines are done, and only after this and only once will it close the channel. We want to detect this "global" done event and close the channel while processing the values sent on it is in progress, so we have to do this in its own goroutine.

The receiver goroutine will run until the channel is closed since we used the for ... range construct on the channel. And since it runs in the main goroutine, the program will not exit until all the values are properly received and processed from the channel. The for ... range construct loops until all the values are received that were sent before the channel was closed.

Note that the solution below works with buffered and unbuffered channel too without modification (try using a buffered channel with ch := make(chan int, 100)).

Correct solution (try it on the Go Playground):

func gen(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	var i int
	for {
		time.Sleep(time.Millisecond * 10)
		ch &lt;- i
		i++
		// when no more data (e.g. from db, or event stream)
		if i &gt; 100 {
			break
		}
	}
}

func receiver(ch chan int) {
	for i := range ch {
		fmt.Println(&quot;received:&quot;, i)
	}
}

func main() {
	ch := make(chan int)
	wg := &amp;sync.WaitGroup{}

	for i := 0; i &lt; 10; i++ {
		wg.Add(1)
		go gen(ch, wg)
	}

	go func() {
		wg.Wait()
		close(ch)
	}()

	receiver(ch)
}

Note:

Note that it's important that receiver(ch) runs in the main goroutine, and the code what waits for the WaitGroup and closes the channel in its own (non-main) goroutine; and not the other way around. If you would switch these 2, it might cause an "early exit", that is not all values might be received and processed from the channel. The reason for this is because a Go program exits when the main goroutine finishes (spec: Program execution). It does not wait for other (non-main) goroutines to finish. So if waiting and closing the channel would be in the main goroutine, after closing the channel the program could exit at any moment, not waiting for the other goroutine that in this case would loop to receive values from the channel.

答案2

得分: -1

使用Go通道的一个通用原则是,不要从接收方关闭通道,也不要在通道有多个并发发送方时关闭通道。

每个通道最终都会被垃圾回收,所以即使不显式关闭通道,它也会被垃圾回收。唯一的区别是,未关闭的通道可能在几个循环后才会被gc回收。

然而,如果可能的话,最好关闭通道。请参考以下链接以获取详细说明。

文章这个这个展示了在1:N、N:1或M:N(发送方:接收方)情况下关闭通道的各种方法。

英文:

"One general principle of using Go channels is don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders."

Every channel will be GCed eventually once it is marked for cleanup, so it is okay to leave channel un-closed the only difference it will make is that that channel will be available for gc after a few cycles maybe if not closed explicitly.

Nevertheless it is always good if you can close the channel off. Please go through the following links for detailed explaination.

Articles this and this shows various ways to close a channel in case of 1:N, N:1 or M:N (senders:receivers)

huangapple
  • 本文由 发表于 2015年12月15日 15:24:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/34283255.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定