如何明确地清空一个通道?

huangapple go评论77阅读模式
英文:

How can I explicitly empty a channel?

问题

简短版本:
有没有一种方法可以在不重新创建或循环遍历的情况下清空一个Go通道?

原因:
我正在使用两个通道来发送和接收数据,并且我有一个额外的通道来表示需要重新连接。

现在,当传输已经重置/重新连接后,我想要"清空"额外的通道,以确保没有任何悬挂的其他重置请求会导致重新连接。

英文:

The short version:
Is there a way to empty a go channel without recreating it, or looping through it?

The why:
I'm using two channels to send and receive data, and I have an extra channel to signal that a reconnect is needed.

Now when the transport has been reset/reconnected I want to 'empty' the extra channel to make sure that there is not any lingering other reset requests which would cause the thing to reconnect again.

答案1

得分: 41

没有循环的情况下无法清空通道。如果没有并发的接收器,你可以使用以下简单的循环:

for len(ch) > 0 {
  <-ch
}

如果有并发的接收器,可以使用以下循环:

L:
for {
    select {
    case <-c:
    default:
       break L
    }
}
英文:

It is not possible to empty a channel without a loop. If you don't have any concurrent receivers, then you can use this simple loop:

for len(ch) &gt; 0 {
  &lt;-ch
}

If you do have concurrent receivers, then use the loop:

L:
for {
    select {
    case &lt;-c:
    default:
       break L
    }
}

答案2

得分: 7

你所描述的情况本质上是具有争议性的,因为可能存在合法的重新连接请求。与其试图排空通道,我建议跟踪时间。

在重新连接的通道上,发布时间。在重新连接完成后,记录时间。在消费重新连接通道时,丢弃任何早于上次重新连接的消息。

另一种更加同步的解决方案是将重新连接通道设置为布尔类型。发布“true”以进行重新连接。当重新连接完成时,发布“false”。然后消费该通道,直到找到“false”。

英文:

What you're describing is inherently racy, since there may be legitimate requests to reconnect on the channel. Rather than trying to drain the channel, I would recommend keeping track of timing.

On your reconnect channel, post the time. When finished with the reconnect, note the time. While consuming the reconnect channel, throw away any messages that are older than your last reconnect.

Another more lockstep solution to achieve this is to make the reconnect channel a bool. Post "true" to reconnect. When the reconnect completes, post "false". Then consume the channel until you find "false."

答案3

得分: 2

另一种方法是使用sync.Condatomic,大致如下:

type Server struct {
    s     chan int
    r     chan int
    c     *sync.Cond
    state uint32
}

const (
    sNormal       = 0
    sQuitting     = 1
    sReconnecting = 2
)

func New() *Server {
    s := &Server{
        s: make(chan int),
        r: make(chan int),
        c: sync.NewCond(&sync.Mutex{}),
    }
    go s.sender()
    // go s.receiver()
    return s
}
func (s *Server) sender() {
    //
    for {
        select {
        case data := <-s.s:
            //处理数据
        default:
            s.c.L.Lock()
        L:
            for {
                switch atomic.LoadUint32(&s.state) {
                case sNormal:
                    break L
                case sReconnecting:
                case sQuitting:
                    s.c.L.Unlock()
                    return
                }
                s.c.Wait()
            }
            s.c.L.Unlock()
        }
    }
}

//接收者的代码重复类似的逻辑

func (s *Server) Reconnect() {
    var cannotReconnect bool
    atomic.StoreUint32(&s.state, sReconnecting)
    //尝试重新连接
    if cannotReconnect {
        atomic.StoreUint32(&s.state, sQuitting)
    } else {
        atomic.StoreUint32(&s.state, sNormal)
    }
    s.c.Broadcast()
}

playground

英文:

Another approach is using sync.Cond and atomic, something along the lines of:

type Server struct {
s     chan int
r     chan int
c     *sync.Cond
state uint32
}
const (
sNormal       = 0
sQuitting     = 1
sReconnecting = 2
)
func New() *Server {
s := &amp;Server{
s: make(chan int),
r: make(chan int),
c: sync.NewCond(&amp;sync.Mutex{}),
}
go s.sender()
// go s.receiver()
return s
}
func (s *Server) sender() {
//
for {
select {
case data := &lt;-s.s:
//do stuff with data
default:
s.c.L.Lock()
L:
for {
switch atomic.LoadUint32(&amp;s.state) {
case sNormal:
break L
case sReconnecting:
case sQuitting:
s.c.L.Unlock()
return
}
s.c.Wait()
}
s.c.L.Unlock()
}
}
}
//repeat for receiver
func (s *Server) Reconnect() {
var cannotReconnect bool
atomic.StoreUint32(&amp;s.state, sReconnecting)
//keep trying to reconnect
if cannotReconnect {
atomic.StoreUint32(&amp;s.state, sQuitting)
} else {
atomic.StoreUint32(&amp;s.state, sNormal)
}
s.c.Broadcast()
}

<kbd>playground</kbd>

答案4

得分: 2

无法编辑Simon Fox的回答,所以在这里写:

  • 如果通道关闭,那么会导致无限循环:
    cause <-ch 立即返回错误。

更安全的版本(处理关闭的通道):

L:
for {
    select {
    case _, ok := <-ch:
        if !ok { //通道已关闭 //立即返回错误
            break L
        }
    default: //所有其他情况都未准备好:表示当前通道中没有任何内容
        break L
    }
}
英文:

can't edit Simon Fox 's answer, so write here:

  • if chan closed, that will got infinite loop:
    cause <-ch immediately return err.

<br>

More safer version (handle closed chan):

L:
	for {
		select {
		case _, ok := &lt;-ch:
			if !ok { //ch is closed //immediately return err
				break L
			}
		default: //all other case not-ready: means nothing in ch for now
			break L
		}
	}

答案5

得分: 0

听起来,你想要一个重置goroutine而不是一个重置通道。它将从侧面接收重置信号,并将其输出到接收器。当这个goroutine收到重新连接的请求时,它将将其传递给接收器。然后,它等待在第三个通道上接收来自接收器的确认回复,并丢弃在此期间收到的任何重新连接请求。所以总共有3个通道,1个输入通道,1个输出通道,1个确认通道。

英文:

It sounds like instead of a reset channel, you want a reset goroutine. It would have an input from the side sending the reset signal, and an output to the receiver. When this goroutine receives a request to reconnect, it passes it to the receiver. Then it waits to receive an acknowledgement back from the receiver on a third channel, throwing away any reconnect requests it receives in the meantime. So 3 channels total, 1 input, 1 output, 1 ack.

huangapple
  • 本文由 发表于 2014年10月1日 21:54:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/26143091.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定