英文:
How can I signal a channel sender to quit in golang?
问题
我正在使用Go语言中的通道来处理一种数据流水线。代码大致如下:
type Channels struct {
inputs chan string
errc chan error
quit chan struct{}
}
func (c *Channels) doSomethingWithInput() {
defer close(c.quit)
defer close(c.errc)
for input := range p.inputs {
_, err := doSomethingThatSometimesErrors(input)
if err != nil {
c.errc <- err
return
}
}
doOneFinalThingThatCannotError()
return
}
func (c *Channels) inputData(s string) {
// 这个函数的实现是我的问题
}
func StartProcessing(c *Channels, data ...string) error {
go c.doSomethingWithInput()
go func() {
defer close(c.inputs)
for _, i := range data {
select {
case <-c.quit:
break
default:
}
inputData(i)
}
}()
// 阻塞直到quit通道关闭。
<-c.quit
if err := <-c.errc; err != nil {
return err
}
return nil
}
这似乎是一种合理的方式来在通道处理器之间传递退出信号,基于这篇关于Go并发模式的博文。
我在使用这种模式时遇到的问题是inputData
函数。将字符串添加到inputs
通道需要等待doSomethingWithInput()
读取通道,但它也可能出错。inputData
需要尝试向inputs
通道发送数据,但如果被告知退出,它需要放弃。我能做的最好的办法是这样的:
func (c *Channels) inputData(s string) {
for {
select {
case <-c.quit:
return
case c.inputs <- s:
return
}
}
}
基本上,"在选项之间来回切换,直到其中一个生效"。明确地说,我不认为这是一个糟糕的设计。只是感觉...有些浪费。我觉得可能有一些巧妙的方法。在Go中,当通道的消费者出错时,我该如何告诉通道的发送者退出呢?
英文:
I'm using channels in Go to process a data pipeline of sorts. The code looks something like this:
type Channels struct {
inputs chan string
errc chan error
quit chan struct{}
}
func (c *Channels) doSomethingWithInput() {
defer close(c.quit)
defer close(c.errc)
for input := range p.inputs {
_, err := doSomethingThatSometimesErrors(input)
if err != nil {
c.errc <- err
return
}
}
doOneFinalThingThatCannotError()
return
}
func (c *Channels) inputData(s string) {
// This function implementation is my question
}
func StartProcessing(c *Channels, data ...string) error {
go c.doSomethingWithInput()
go func() {
defer close(c.inputs)
for _, i := range data {
select {
case <-c.quit:
break
default:
}
inputData(i)
}
}()
// Block until the quit channel is closed.
<-c.quit
if err := <-c.errc; err != nil {
return err
}
return nil
}
This seems like a reasonable way to communicate a quit signal between channel processors and is based on this blog post about concurrency patterns in Go.
The thing I struggle with using this pattern is the inputData
function. Adding strings to the input
channel needs to wait for doSomethingWithInput()
to read the channel, but it also might error. inputData
needs to try and feed the inputs
channel but give up if told to quit. The best I could do was this:
func (c *Channels) inputData(s string) {
for {
select {
case <-c.quit:
return
case c.inputs <- s:
return
}
}
}
Essentially, "oscillate between your options until one of them sticks." To be clear, I don't think it's a bad design. It just feels... wasteful. Like I'm missing something clever. How can I tell a channel sender to quit in Go when a channel consumer errors?
答案1
得分: 3
你的inputData()很好,这是正确的做法。
在你的用例中,你的通道消费者,也就是接收者,即doSomethingWithInput()应该对"quit"通道有控制权。目前情况下,如果发生错误,只需从doSomethingWithInput()返回,这将关闭quit通道并使发送者退出(将触发case <-quit:
)。这实际上是一个巧妙的地方。
只需注意一下你的错误通道,它不应该是带缓冲的,并且在doSomethingWithInput()退出时不应该关闭它。你不能在之后读取它以收集错误。你需要在主函数中关闭它并初始化一些容量(例如make(chan int, 10)
),或者为它创建一个消费者goroutine。你可能还想尝试使用select语句来尝试读取它:如果没有错误,你当前的错误检查代码将永远阻塞。
英文:
Your inputData() is fine, that's the way to do it.
In your use case, your channel consumer, the receiver, aka doSomethingWithInput() is the one which should have control over the "quit" channel. As it is, if an error occurs, just return from doSomethingWithInput(), which will in turn close the quit channel and make the sender(s) quit (will trigger case <-quit:
). That is in fact the clever bit.
Just watch out with your error channel that's not buffered and closed when doSomethingWithInput() exits. You cannot read it afterwards to collect errors. You need to close it in your main function and initialize it with some capacity (make(chan int, 10)
for example), or create a consumer goroutine for it. You may also want to try reading it with a select statement: your error checking code, as it is, will block forever if there are no errors.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论