如何在golang中向通道发送者发送退出信号?

huangapple go评论92阅读模式
英文:

How can I signal a channel sender to quit in golang?

问题

我正在使用Go语言中的通道来处理一种数据流水线。代码大致如下:

type Channels struct {
    inputs chan string
    errc   chan error
    quit   chan struct{}
}

func (c *Channels) doSomethingWithInput() {
    defer close(c.quit)
    defer close(c.errc)
    for input := range p.inputs {
        _, err := doSomethingThatSometimesErrors(input)
        if err != nil {
            c.errc <- err
            return
        }
    }
    doOneFinalThingThatCannotError()
    return
}

func (c *Channels) inputData(s string) {
    // 这个函数的实现是我的问题
}

func StartProcessing(c *Channels, data ...string) error {
    go c.doSomethingWithInput()
    go func() {
        defer close(c.inputs)
        for _, i := range data {
            select {
            case <-c.quit:
                break
            default:
            }
            inputData(i)
        }
    }()
    // 阻塞直到quit通道关闭。
    <-c.quit
    if err := <-c.errc; err != nil {
        return err
    }
    return nil
}

这似乎是一种合理的方式来在通道处理器之间传递退出信号,基于这篇关于Go并发模式的博文

我在使用这种模式时遇到的问题是inputData函数。将字符串添加到inputs通道需要等待doSomethingWithInput()读取通道,但它也可能出错。inputData需要尝试向inputs通道发送数据,但如果被告知退出,它需要放弃。我能做的最好的办法是这样的:

func (c *Channels) inputData(s string) {
    for {
        select {
        case <-c.quit:
            return
        case c.inputs <- s:
            return
        }
    }
}

基本上,"在选项之间来回切换,直到其中一个生效"。明确地说,我不认为这是一个糟糕的设计。只是感觉...有些浪费。我觉得可能有一些巧妙的方法。在Go中,当通道的消费者出错时,我该如何告诉通道的发送者退出呢?

英文:

I'm using channels in Go to process a data pipeline of sorts. The code looks something like this:

type Channels struct {
    inputs chan string
    errc   chan error
    quit   chan struct{}
}

func (c *Channels) doSomethingWithInput() {
    defer close(c.quit)
    defer close(c.errc)
    for input := range p.inputs {
        _, err := doSomethingThatSometimesErrors(input)
        if err != nil {
            c.errc &lt;- err
            return
        }
    }
    doOneFinalThingThatCannotError()
    return
}

func (c *Channels) inputData(s string) {
        // This function implementation is my question
}

func StartProcessing(c *Channels, data ...string) error {
    go c.doSomethingWithInput()
    go func() {
        defer close(c.inputs)
        for _, i := range data {
            select {
            case &lt;-c.quit:
                break
            default:
            }
            inputData(i)
            }
    }()
    // Block until the quit channel is closed.
    &lt;-c.quit
    if err := &lt;-c.errc; err != nil {
	    return err
    }
    return nil
}

This seems like a reasonable way to communicate a quit signal between channel processors and is based on this blog post about concurrency patterns in Go.

The thing I struggle with using this pattern is the inputData function. Adding strings to the input channel needs to wait for doSomethingWithInput() to read the channel, but it also might error. inputData needs to try and feed the inputs channel but give up if told to quit. The best I could do was this:

func (c *Channels) inputData(s string) {
	for {
		select {
		case &lt;-c.quit:
			return
		case c.inputs &lt;- s:
			return
		}
	}
}

Essentially, "oscillate between your options until one of them sticks." To be clear, I don't think it's a bad design. It just feels... wasteful. Like I'm missing something clever. How can I tell a channel sender to quit in Go when a channel consumer errors?

答案1

得分: 3

你的inputData()很好,这是正确的做法。

在你的用例中,你的通道消费者,也就是接收者,即doSomethingWithInput()应该对"quit"通道有控制权。目前情况下,如果发生错误,只需从doSomethingWithInput()返回,这将关闭quit通道并使发送者退出(将触发case <-quit:)。这实际上是一个巧妙的地方。

只需注意一下你的错误通道,它不应该是带缓冲的,并且在doSomethingWithInput()退出时不应该关闭它。你不能在之后读取它以收集错误。你需要在主函数中关闭它并初始化一些容量(例如make(chan int, 10)),或者为它创建一个消费者goroutine。你可能还想尝试使用select语句来尝试读取它:如果没有错误,你当前的错误检查代码将永远阻塞。

英文:

Your inputData() is fine, that's the way to do it.

In your use case, your channel consumer, the receiver, aka doSomethingWithInput() is the one which should have control over the "quit" channel. As it is, if an error occurs, just return from doSomethingWithInput(), which will in turn close the quit channel and make the sender(s) quit (will trigger case &lt;-quit:). That is in fact the clever bit.

Just watch out with your error channel that's not buffered and closed when doSomethingWithInput() exits. You cannot read it afterwards to collect errors. You need to close it in your main function and initialize it with some capacity (make(chan int, 10) for example), or create a consumer goroutine for it. You may also want to try reading it with a select statement: your error checking code, as it is, will block forever if there are no errors.

huangapple
  • 本文由 发表于 2014年12月16日 10:22:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/27496523.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定