英文:
periodically flushing channel in golang
问题
我需要定期清空通道的内容。
我使用len()来做到这一点,我想知道是否有更好的方法来做到这一点。
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
commch := make(chan int, 100)
go fillchan(commch)
drainchan(commch)
}
func fillchan(commch chan int) {
for {
select {
case <-time.Tick(30 * time.Millisecond):
commch <- rand.Int()
}
}
}
func drainchan(commch chan int) {
for {
chanlen := len(commch) // 获取通道中的条目数
time.Sleep(1 * time.Second)
for i := 0; i <= chanlen; i++ { // 根据chanlen清空它们
fmt.Printf("chan len: %s num: %s\n", chanlen, <-commch)
}
}
}
编辑1: 看起来这是更好的方法
http://play.golang.org/p/4Kp8VwO4yl
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
commch := make(chan int, 1000)
go fillchan(commch)
for {
select {
case <-time.Tick(1000 * time.Millisecond):
drainchan(commch)
}
}
}
func fillchan(commch chan int) {
for {
select {
case <-time.Tick(300 * time.Millisecond):
commch <- rand.Int()
}
}
}
func drainchan(commch chan int) {
for {
select {
case e := <-commch:
fmt.Printf("%s\n",e)
default:
return
}
}
}
编辑2: 移除了select,使用time.Tick防止内存泄漏
http://play.golang.org/p/WybAhRE3u4
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
commch := make(chan int, 1000)
go fillchan(commch)
for _ = range time.Tick(1000 * time.Millisecond) {
drainchan(commch)
}
}
func fillchan(commch chan int) {
for _ = range time.Tick(300 * time.Millisecond) {
commch <- rand.Int()
}
}
func drainchan(commch chan int) {
for {
select {
case e := <-commch:
fmt.Printf("%s\n", e)
default:
return
}
}
}
英文:
I need to periodically flush contents of a channel.
I did this with len() and I am wondering if there is some better way to do this.
http://play.golang.org/p/YzaI_2c_-F
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
commch := make(chan int, 100)
go fillchan(commch)
drainchan(commch)
}
func fillchan(commch chan int) {
for {
select {
case <-time.Tick(30 * time.Millisecond):
commch <- rand.Int()
}
}
}
func drainchan(commch chan int) {
for {
chanlen := len(commch) // get number of entries in channel
time.Sleep(1 * time.Second)
for i := 0; i <= chanlen; i++ { //flush them based on chanlen
fmt.Printf("chan len: %s num: %s\n", chanlen, <-commch)
}
}
}
EDIT 1: seems like this is better way to do this
http://play.golang.org/p/4Kp8VwO4yl
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
commch := make(chan int, 1000)
go fillchan(commch)
for {
select {
case <-time.Tick(1000 * time.Millisecond):
drainchan(commch)
}
}
}
func fillchan(commch chan int) {
for {
select {
case <-time.Tick(300 * time.Millisecond):
commch <- rand.Int()
}
}
}
func drainchan(commch chan int) {
for {
select {
case e := <-commch:
fmt.Printf("%s\n",e)
default:
return
}
}
}
EDIT 2: removed select, prevented memory leak with time.Tick
http://play.golang.org/p/WybAhRE3u4
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
commch := make(chan int, 1000)
go fillchan(commch)
for _ = range time.Tick(1000 * time.Millisecond) {
drainchan(commch)
}
}
func fillchan(commch chan int) {
for _ = range time.Tick(300 * time.Millisecond) {
commch <- rand.Int()
}
}
func drainchan(commch chan int) {
for {
select {
case e := <-commch:
fmt.Printf("%s\n", e)
default:
return
}
}
}
答案1
得分: 8
需要清空通道内容的需求是不寻常的。通道不提供此功能 - 但是您可以创建一个行为类似的goroutine(...如果您真的想要的话)。
通常,您会考虑一个在一个通道上输入并在另一个通道上输出的goroutine;两个通道都携带相同的数据类型。原则上,您可以使用这种方式模拟所有缓冲通道;对于其客户端,该goroutine的行为类似于普通的缓冲通道,因为它传递接收到的内容。
在goroutine中添加第三个通道,并与输入之间进行select操作。这将允许您在没有竞争条件的情况下触发缓冲区的清空。简单。
现在有三个连接到goroutine的通道 - 两个输入和一个输出。因此,当您设计将使用它的内容时,您可以思考清空数据的语义。
**一个相关的例子浮现在脑海中。**考虑一个具有一个输入和一个输出通道的goroutine。它提供了一个固定大小的覆盖缓冲区,即使输出通道被阻塞,它也始终可以从输入通道读取。这也需要一个带有默认情况的select,但不需要第三个通道。覆盖缓冲区有一个明确的用例:当通道和goroutine被连接到循环中时,死锁可能非常常见。覆盖缓冲区作为死锁的一种候选解决方案非常有用,因为某些数据在延迟时是无用的 - 例如,当应用程序太忙无法响应时,您可以丢弃GUI中的鼠标事件。
英文:
The need to flush away the contents of a channel would be unusual. Channels don't provide this feature - but you can make a goroutine that will behave that way (...if you really do want to).
Typically, you would be thinking more about a goroutine that inputs on one channel and outputs on another; both channels carry the same data type. You could in principle model all buffered channels this way; to its clients, the goroutine behaves like an ordinary buffered channel because it passes on what it receives.
Add a third channel into the goroutine, combined with a select between it and the input. This will allow you to trigger the emptying of the buffer without race conditions creeping in. Simple.
Now there are three channels connected to the goroutine - two inputs and an output. So, when you design the things that will use it, you can reason about what the semantics of flushing that data are.
A relative springs to mind. Consider a goroutine with one input and one output channel. It provides an overwriting buffer of fixed size, i.e. one that is always ready to read from its input channel, even when the output channel is blocked. This will also need a select with a default case, but no third channel is needed. Overwriting buffers have a clear use case: when channels and goroutines are wired into loops, deadlock can be quite likely. Overwriting buffers come in handy as one candidate solution for deadlocks because some data is useless when it's late - for example, you could for example throw away mice events in a GUI when the application is too busy to respond to them.
答案2
得分: 2
BatchingChannel
来自https://github.com/eapache/channels,我认为它可以满足你的需求。
英文:
The BatchingChannel
from https://github.com/eapache/channels does what you need I think.
答案3
得分: 1
当我遇到这个问题时,我是如何处理的。
func (s *ReadStream) drain() {
go func() {
for b := range s.chan {
blackhole(b)
}
}()
}
func blackhole(b []byte) {}
在可能被阻塞的情况下,context.Context似乎不是正确的选择:
for {
select {
case <-ctx.Done():
return
default:
send <- getData()
}
}
如果send已满,在我们接收到完成信号之前,我们将受到外部goroutine的控制。如果你确定消费者会一直读取直到通道关闭,那么这是可以的,但如果这些消费者可能遇到错误条件并返回,那么你只能寄希望于这一点。在这种特定情况下,我喜欢用一个内部的quit chan和waitgroup来替换context,然后提供一个公共的Kill()方法。当然,前提是我绝对确定可以丢弃数据。
func (s *ReadStream) Kill() {
s.quit <- struct{}{}
s.drain() // 确保goroutine看到取消信号
s.wg.Wait() // 等待goroutine看到取消信号
s.close()
}
英文:
When I come across this issue, this is how I handle it.
func (s *ReadStream) drain() {
go func() {
for b := range s.chan {
blackhole(b)
}
}()
}
func blackhole(b []byte) {}
In situations where your select may be blocked context.Context seems like the wrong choice:
for {
select {
case <-ctx.Done():
return
default:
send<-getData()
}
}
If send is full, we are at the mercy of an outside goroutine before we can receive the done signal. This is ok if you're sure the consumers are going to read until the channel closes, but if those consumers can experience error conditions and return, hope is all you can do. In this specific situation, I'm a fan of replacing the context with an internal quit chan and waitgroup, and then provide a public Kill() method. As long as I'm absolutely sure I'm ok throwing away the data, of course.
func (s *ReadStream) Kill() {
s.quit<-struct{}{}
s.drain() // ensure goroutine sees the cancel
s.wg.Wait() // wait for goroutine to see the cancel
s.close()
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论