英文:
How to check a channel is closed or not without reading it?
问题
这是一个由@Jimt编写的Go语言中的工作者和控制器模式的很好的例子,以回答“在golang中有没有一种优雅的方式来暂停和恢复任何其他goroutine?”。
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// 可能的工作者状态。
const (
Stopped = 0
Paused = 1
Running = 2
)
// 最大工作者数量。
const WorkerCount = 1000
func main() {
// 启动工作者。
var wg sync.WaitGroup
wg.Add(WorkerCount + 1)
workers := make([]chan int, WorkerCount)
for i := range workers {
workers[i] = make(chan int)
go func(i int) {
worker(i, workers[i])
wg.Done()
}(i)
}
// 启动控制器例程。
go func() {
controller(workers)
wg.Done()
}()
// 等待所有goroutine完成。
wg.Wait()
}
func worker(id int, ws <-chan int) {
state := Paused // 开始处于暂停状态。
for {
select {
case state = <-ws:
switch state {
case Stopped:
fmt.Printf("Worker %d: Stopped\n", id)
return
case Running:
fmt.Printf("Worker %d: Running\n", id)
case Paused:
fmt.Printf("Worker %d: Paused\n", id)
}
default:
// 我们使用runtime.Gosched()来防止在这种情况下发生死锁。
// 如果在这里执行了工作并让出给调度器,则不需要它。
runtime.Gosched()
if state == Paused {
break
}
// 在这里执行实际的工作。
}
}
}
// 控制器处理所有工作者的当前状态。它们可以被指示运行、暂停或完全停止。
func controller(workers []chan int) {
// 启动工作者
for i := range workers {
workers[i] <- Running
}
// 暂停工作者。
<-time.After(1e9)
for i := range workers {
workers[i] <- Paused
}
// 恢复工作者。
<-time.After(1e9)
for i := range workers {
workers[i] <- Running
}
// 关闭工作者。
<-time.After(1e9)
for i := range workers {
close(workers[i])
}
}
但是这段代码也有一个问题:如果你想在worker()
退出时从workers
中删除一个工作者通道,会发生死锁。
如果你使用close(workers[i])
,下一次控制器写入它将会导致恐慌,因为Go不能向关闭的通道写入。如果你使用一些互斥锁来保护它,那么它将在workers[i] <- Running
上被阻塞,因为worker
没有从通道中读取任何内容,写入将被阻塞,互斥锁将导致死锁。你也可以给通道一个更大的缓冲区作为解决方法,但这还不够好。
所以我认为解决这个问题的最好方法是在worker()
退出时关闭通道,如果控制器发现通道已关闭,它将跳过它并什么都不做。但是我找不到在这种情况下如何检查通道是否已经关闭。如果我尝试在控制器中读取通道,控制器可能会被阻塞。所以我现在非常困惑。
PS:我尝试过恢复引发的恐慌,但它会关闭引发恐慌的goroutine。在这种情况下,它将是控制器,所以没有用。
尽管如此,我认为在Go的下一个版本中实现这个功能对Go团队来说是有用的。
英文:
This is a good example of workers & controller mode in Go written by @Jimt, in answer to
"Is there some elegant way to pause & resume any other goroutine in golang?"
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// Possible worker states.
const (
Stopped = 0
Paused = 1
Running = 2
)
// Maximum number of workers.
const WorkerCount = 1000
func main() {
// Launch workers.
var wg sync.WaitGroup
wg.Add(WorkerCount + 1)
workers := make([]chan int, WorkerCount)
for i := range workers {
workers[i] = make(chan int)
go func(i int) {
worker(i, workers[i])
wg.Done()
}(i)
}
// Launch controller routine.
go func() {
controller(workers)
wg.Done()
}()
// Wait for all goroutines to finish.
wg.Wait()
}
func worker(id int, ws <-chan int) {
state := Paused // Begin in the paused state.
for {
select {
case state = <-ws:
switch state {
case Stopped:
fmt.Printf("Worker %d: Stopped\n", id)
return
case Running:
fmt.Printf("Worker %d: Running\n", id)
case Paused:
fmt.Printf("Worker %d: Paused\n", id)
}
default:
// We use runtime.Gosched() to prevent a deadlock in this case.
// It will not be needed of work is performed here which yields
// to the scheduler.
runtime.Gosched()
if state == Paused {
break
}
// Do actual work here.
}
}
}
// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
// Start workers
for i := range workers {
workers[i] <- Running
}
// Pause workers.
<-time.After(1e9)
for i := range workers {
workers[i] <- Paused
}
// Unpause workers.
<-time.After(1e9)
for i := range workers {
workers[i] <- Running
}
// Shutdown workers.
<-time.After(1e9)
for i := range workers {
close(workers[i])
}
}
But this code also has an issue: If you want to remove a worker channel in workers
when worker()
exits, dead lock happens.
If you close(workers[i])
, next time controller writes into it will cause a panic since go can't write into a closed channel. If you use some mutex to protect it, then it will be stuck on workers[i] <- Running
since the worker
is not reading anything from the channel and write will be blocked, and mutex will cause a dead lock. You can also give a bigger buffer to channel as a work-around, but it's not good enough.
So I think the best way to solve this is worker()
close channel when exits, if the controller finds a channel closed, it will jump over it and do nothing. But I can't find how to check a channel is already closed or not in this situation. If I try to read the channel in controller, the controller might be blocked. So I'm very confused for now.
PS: Recovering the raised panic is what I have tried, but it will close goroutine which raised panic. In this case it will be controller so it's no use.
Still, I think it's useful for Go team to implement this function in next version of Go.
答案1
得分: 97
没有办法编写一个安全的应用程序,其中需要知道一个通道是否打开而不与其进行交互。
做你想做的事情的最好方法是使用两个通道 - 一个用于工作,一个用于指示改变状态的愿望(以及如果这个状态改变很重要的话,完成这个状态改变)。
通道是廉价的。复杂的设计过载语义是不明确的。
[另外]
<-time.After(1e9)
是一个非常令人困惑和不明显的方式来写
time.Sleep(time.Second)
保持简单,每个人(包括你)都能理解。
英文:
There's no way to write a safe application where you need to know whether a channel is open without interacting with it.
The best way to do what you're wanting to do is with two channels -- one for the work and one to indicate a desire to change state (as well as the completion of that state change if that's important).
Channels are cheap. Complex design overloading semantics isn't.
[also]
<-time.After(1e9)
is a really confusing and non-obvious way to write
time.Sleep(time.Second)
Keep things simple and everyone (including you) can understand them.
答案2
得分: 96
以一种巧妙的方式,可以通过恢复引发的恐慌来完成对尝试写入的通道的操作。但是,你无法在不从通道中读取数据的情况下检查读取通道是否关闭。
你可以:
- 最终从中读取“true”值(
v <- c
) - 读取“true”值和“未关闭”指示器(
v, ok <- c
) - 读取零值和“关闭”指示器(
v, ok <- c
)(示例) - 在通道读取中永远阻塞(
v <- c
)
只有最后一种情况在技术上不从通道中读取数据,但这没有什么用处。
英文:
In a hacky way it can be done for channels which one attempts to write to by recovering the raised panic. But you cannot check if a read channel is closed without reading from it.
Either you will
- eventually read the "true" value from it (
v <- c
) - read the "true" value and 'not closed' indicator (
v, ok <- c
) - read a zero value and the 'closed' indicator (
v, ok <- c
) (example) - will block in the channel read forever (
v <- c
)
Only the last one technically doesn't read from the channel, but that's of little use.
答案3
得分: 9
我知道这个答案来得很晚,我写了这个解决方案,黑客Go运行时,它不安全,可能会崩溃:
import (
"unsafe"
"reflect"
)
func isChanClosed(ch interface{}) bool {
if reflect.TypeOf(ch).Kind() != reflect.Chan {
panic("only channels!")
}
// 获取接口值指针,从cgo_export中
// typedef struct { void *t; void *v; } GoInterface;
// 然后获取通道真实指针
cptr := *(*uintptr)(unsafe.Pointer(
unsafe.Pointer(uintptr(unsafe.Pointer(&ch)) + unsafe.Sizeof(uint(0))),
))
// 如果chan.closed > 0,此函数将返回true
// 参见https://github.com/golang/go/blob/master/src/runtime/chan.go上的hchan
// 类型hchan struct {
// qcount uint // 队列中的总数据
// dataqsiz uint // 循环队列的大小
// buf unsafe.Pointer // 指向一个包含dataqsiz个元素的数组
// elemsize uint16
// closed uint32
// **
cptr += unsafe.Sizeof(uint(0))*2
cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))
cptr += unsafe.Sizeof(uint16(0))
return *(*uint32)(unsafe.Pointer(cptr)) > 0
}
英文:
I know this answer is so late, I have wrote this solution, Hacking Go run-time, It's not safety, It may crashes:
import (
"unsafe"
"reflect"
)
func isChanClosed(ch interface{}) bool {
if reflect.TypeOf(ch).Kind() != reflect.Chan {
panic("only channels!")
}
// get interface value pointer, from cgo_export
// typedef struct { void *t; void *v; } GoInterface;
// then get channel real pointer
cptr := *(*uintptr)(unsafe.Pointer(
unsafe.Pointer(uintptr(unsafe.Pointer(&ch)) + unsafe.Sizeof(uint(0))),
))
// this function will return true if chan.closed > 0
// see hchan on https://github.com/golang/go/blob/master/src/runtime/chan.go
// type hchan struct {
// qcount uint // total data in the queue
// dataqsiz uint // size of the circular queue
// buf unsafe.Pointer // points to an array of dataqsiz elements
// elemsize uint16
// closed uint32
// **
cptr += unsafe.Sizeof(uint(0))*2
cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))
cptr += unsafe.Sizeof(uint16(0))
return *(*uint32)(unsafe.Pointer(cptr)) > 0
}
答案4
得分: 3
好的,你可以使用default
分支来检测它,因为一个关闭的通道会被选中,例如:下面的代码将选择default
,channel
,channel
,第一个选择不会被阻塞。
func main() {
ch := make(chan int)
go func() {
select {
case <-ch:
log.Printf("1.channel")
default:
log.Printf("1.default")
}
select {
case <-ch:
log.Printf("2.channel")
}
close(ch)
select {
case <-ch:
log.Printf("3.channel")
default:
log.Printf("3.default")
}
}()
time.Sleep(time.Second)
ch <- 1
time.Sleep(time.Second)
}
输出结果为:
2018/05/24 08:00:00 1.default
2018/05/24 08:00:01 2.channel
2018/05/24 08:00:01 3.channel
注意,参考@Angad在这个答案下的评论:
如果你使用的是带缓冲的通道,并且其中包含未读数据,这种方法将不起作用。
英文:
Well, you can use default
branch to detect it, for a closed channel will be selected, for example: the following code will select default
, channel
, channel
, the first select is not blocked.
func main() {
ch := make(chan int)
go func() {
select {
case <-ch:
log.Printf("1.channel")
default:
log.Printf("1.default")
}
select {
case <-ch:
log.Printf("2.channel")
}
close(ch)
select {
case <-ch:
log.Printf("3.channel")
default:
log.Printf("3.default")
}
}()
time.Sleep(time.Second)
ch <- 1
time.Sleep(time.Second)
}
Prints
2018/05/24 08:00:00 1.default
2018/05/24 08:00:01 2.channel
2018/05/24 08:00:01 3.channel
Note, refer to comment by @Angad under this answer:
> It doesn't work if you're using a Buffered Channel and it contains
> unread data
答案5
得分: 2
我经常在多个并发goroutine中遇到这个问题。
这可能是一个好的模式,但是我为我的工作线程定义了一个带有退出通道和工作线程状态字段的结构体:
type Worker struct {
data chan struct
quit chan bool
stopped bool
}
然后,您可以让控制器调用工作线程的停止函数:
func (w *Worker) Stop() {
w.quit <- true
w.stopped = true
}
func (w *Worker) eventloop() {
for {
if w.Stopped {
return
}
select {
case d := <-w.data:
//做一些事情
if w.Stopped {
return
}
case <-w.quit:
return
}
}
}
这样可以很好地让您的工作线程停止,而不会出现任何挂起或生成错误的情况,尤其是在容器中运行时。
英文:
I have had this problem frequently with multiple concurrent goroutines.
It may or may not be a good pattern, but I define a a struct for my workers with a quit channel and field for the worker state:
type Worker struct {
data chan struct
quit chan bool
stopped bool
}
Then you can have a controller call a stop function for the worker:
func (w *Worker) Stop() {
w.quit <- true
w.stopped = true
}
func (w *Worker) eventloop() {
for {
if w.Stopped {
return
}
select {
case d := <-w.data:
//DO something
if w.Stopped {
return
}
case <-w.quit:
return
}
}
}
This gives you a pretty good way to get a clean stop on your workers without anything hanging or generating errors, which is especially good when running in a container.
答案6
得分: 1
你可以将通道设置为nil,除了关闭它之外。这样你就可以检查它是否为nil。
在playground中的示例:
https://play.golang.org/p/v0f3d4DisCz
编辑:
实际上,这是一个不好的解决方案,如下一个示例所示,
因为在函数中将通道设置为nil会导致它出错:
https://play.golang.org/p/YVE2-LV9TOp
英文:
You could set your channel to nil in addition to closing it. That way you can check if it is nil.
example in the playground:
https://play.golang.org/p/v0f3d4DisCz
edit:
This is actually a bad solution as demonstrated in the next example,
because setting the channel to nil in a function would break it:
https://play.golang.org/p/YVE2-LV9TOp
答案7
得分: 0
ch1 := make(chan int)
ch2 := make(chan int)
go func(){
for i:=0; i<10; i++{
ch1 <- i
}
close(ch1)
}()
go func(){
for i:=10; i<15; i++{
ch2 <- i
}
close(ch2)
}()
ok1, ok2 := false, false
v := 0
for{
ok1, ok2 = true, true
select{
case v,ok1 = <-ch1:
if ok1 {fmt.Println(v)}
default:
}
select{
case v,ok2 = <-ch2:
if ok2 {fmt.Println(v)}
default:
}
if !ok1 && !ok2{return}
}
英文:
ch1 := make(chan int)
ch2 := make(chan int)
go func(){
for i:=0; i<10; i++{
ch1 <- i
}
close(ch1)
}()
go func(){
for i:=10; i<15; i++{
ch2 <- i
}
close(ch2)
}()
ok1, ok2 := false, false
v := 0
for{
ok1, ok2 = true, true
select{
case v,ok1 = <-ch1:
if ok1 {fmt.Println(v)}
default:
}
select{
case v,ok2 = <-ch2:
if ok2 {fmt.Println(v)}
default:
}
if !ok1 && !ok2{return}
}
}
答案8
得分: -3
从文档中:
可以使用内置函数close关闭通道。接收操作符的多值赋值形式报告在通道关闭之前是否发送了接收到的值。
https://golang.org/ref/spec#Receive_operator
Golang in Action的示例展示了这种情况:
// 这个示例程序演示了如何使用无缓冲通道模拟两个goroutine之间的网球比赛。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// wg用于等待程序完成。
var wg sync.WaitGroup
func init() {
rand.Seed(time.Now().UnixNano())
}
// main是所有Go程序的入口点。
func main() {
// 创建一个无缓冲通道。
court := make(chan int)
// 添加两个计数,每个goroutine一个。
wg.Add(2)
// 启动两个选手。
go player("Nadal", court)
go player("Djokovic", court)
// 开始比赛。
court <- 1
// 等待比赛结束。
wg.Wait()
}
// player模拟一个人打网球的游戏。
func player(name string, court chan int) {
// 调度Done的调用告诉main我们已经完成了。
defer wg.Done()
for {
// 等待球被击回给我们。
ball, ok := <-court
fmt.Printf("ok %t\n", ok)
if !ok {
// 如果通道被关闭,我们赢了。
fmt.Printf("Player %s 赢了\n", name)
return
}
// 选择一个随机数,看看我们是否错过了球。
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s 错过了\n", name)
// 关闭通道以表示我们输了。
close(court)
return
}
// 显示并递增一次击球计数。
fmt.Printf("Player %s 击中 %d\n", name, ball)
ball++
// 将球击回给对方选手。
court <- ball
}
}
英文:
From the documentation:
A channel may be closed with the built-in function close. The multi-valued assignment form of the receive operator reports whether a received value was sent before the channel was closed.
https://golang.org/ref/spec#Receive_operator
Example by Golang in Action shows this case:
// This sample program demonstrates how to use an unbuffered
// channel to simulate a game of tennis between two goroutines.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// wg is used to wait for the program to finish.
var wg sync.WaitGroup
func init() {
rand.Seed(time.Now().UnixNano())
}
// main is the entry point for all Go programs.
func main() {
// Create an unbuffered channel.
court := make(chan int)
// Add a count of two, one for each goroutine.
wg.Add(2)
// Launch two players.
go player("Nadal", court)
go player("Djokovic", court)
// Start the set.
court <- 1
// Wait for the game to finish.
wg.Wait()
}
// player simulates a person playing the game of tennis.
func player(name string, court chan int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for {
// Wait for the ball to be hit back to us.
ball, ok := <-court
fmt.Printf("ok %t\n", ok)
if !ok {
// If the channel was closed we won.
fmt.Printf("Player %s Won\n", name)
return
}
// Pick a random number and see if we miss the ball.
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s Missed\n", name)
// Close the channel to signal we lost.
close(court)
return
}
// Display and then increment the hit count by one.
fmt.Printf("Player %s Hit %d\n", name, ball)
ball++
// Hit the ball back to the opposing player.
court <- ball
}
}
答案9
得分: -6
首先,检查通道是否有元素会更容易,这将确保通道是活动的。
func isChanClosed(ch chan interface{}) bool {
if len(ch) == 0 {
select {
case _, ok := <-ch:
return !ok
}
}
return false
}
英文:
it's easier to check first if the channel has elements, that would ensure the channel is alive.
func isChanClosed(ch chan interface{}) bool {
if len(ch) == 0 {
select {
case _, ok := <-ch:
return !ok
}
}
return false
}
答案10
得分: -9
如果你听这个频道,你总是可以发现频道已经关闭。
case state, opened := <-ws:
if !opened {
// 频道已关闭
// 返回或进行一些最终工作
}
switch state {
case Stopped:
但请记住,你不能两次关闭一个频道。这会引发恐慌。
英文:
If you listen this channel you always can findout that channel was closed.
case state, opened := <-ws:
if !opened {
// channel was closed
// return or made some final work
}
switch state {
case Stopped:
But remember, you can not close one channel two times. This will raise panic.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论