Go:一个通道多个监听器

huangapple go评论85阅读模式
英文:

Go: one channel with multiple listeners

问题

我对Go还不太熟悉,如果主题有误请谅解,但我希望你能理解我的问题。我想通过通道将事件传递给不同的Go协程进行处理。以下是一些示例代码:

type Event struct {
    Host    string
    Command string
    Output  string
}

var (
    incoming = make(chan Event)
)

func processEmail(ticker *time.Ticker) {
    for {
        select {
        case t := <-ticker.C:
            fmt.Println("Email Tick at", t)
        case e := <-incoming:
            fmt.Println("EMAIL GOT AN EVENT!")
            fmt.Println(e)
        }
    }
}

func processPagerDuty(ticker *time.Ticker) {
    for {
        select {
        case t := <-ticker.C:
            fmt.Println("Pagerduty Tick at", t)
        case e := <-incoming:
            fmt.Println("PAGERDUTY GOT AN EVENT!")
            fmt.Println(e)
        }
    }
}

func main() {

    err := gcfg.ReadFileInto(&cfg, "dispatch-api.cfg")
    if err != nil {
        fmt.Printf("Error loading the config")
    }

    ticker := time.NewTicker(time.Second * 10)
    go processEmail(ticker)

    ticker := time.NewTicker(time.Second * 1)
    go processPagerDuty(ticker)
}

func eventAdd(r render.Render, params martini.Params, req *http.Request) {

    // create an event now
    e := Event{Host: "web01-east.domain.com", Command: "foo", Output: "bar"}
    incoming <- e
}

所以计时器事件正常工作。当我发出一个API调用来创建一个事件时,我只从processEmail函数中得到输出。无论哪个Go协程首先调用,都会通过通道获取事件。

有没有办法让两个函数都获取到该事件?

英文:

I'm pretty new to Go so sorry if the topic is wrong but I hope you understand my question. I want to process events to different go routines via a channel. Here is some sample code

type Event struct {
Host string
Command string
Output string
}
var (
incoming        = make(chan Event)
)
func processEmail(ticker* time.Ticker) {
for {
select {
case t := &lt;-ticker.C:
fmt.Println(&quot;Email Tick at&quot;, t)
case e := &lt;-incoming:
fmt.Println(&quot;EMAIL GOT AN EVENT!&quot;)
fmt.Println(e)
}
}
}
func processPagerDuty(ticker* time.Ticker) {
for {
select {
case t := &lt;-ticker.C:
fmt.Println(&quot;Pagerduty Tick at&quot;, t)
case e := &lt;-incoming:
fmt.Println(&quot;PAGERDUTY GOT AN EVENT!&quot;)
fmt.Println(e)
}
}
}
func main() {
err := gcfg.ReadFileInto(&amp;cfg, &quot;dispatch-api.cfg&quot;)
if err != nil {
fmt.Printf(&quot;Error loading the config&quot;)
}
ticker := time.NewTicker(time.Second * 10)
go processEmail(ticker)
ticker := time.NewTicker(time.Second * 1)
go processPagerDuty(ticker)
}
func eventAdd(r render.Render, params martini.Params, req *http.Request) {
// create an event now
e := Event{Host: &quot;web01-east.domain.com&quot;, Command: &quot;foo&quot;, Output: &quot;bar&quot;}
incoming &lt;- e
}

So the ticker events work just create. When I issue an API call to create an event I just get output from the processEmail function. Its whatever go routine is called first will get the event over the channel.

Is there a way for both functions to get that event?

答案1

得分: 17

你可以使用fan infan out(来自Rob Pike的演讲):

package main

func main() {
    // feeders - feeder1, feeder2和feeder3用于将数据汇入一个通道
    go func() {
        for {
            select {
            case v1 := <-feeder1:
                mainChannel <- v1
            case v2 := <-feeder2:
                mainChannel <- v2
            case v3 := <-feeder3:
                mainChannel <- v3
            }
        }
    }()

    // dispatchers - 实际上不是fan out,而是将数据分发
    go func() {
        for {
            v := <-mainChannel

            // 使用这个来防止goroutine泄漏
            // (即当一个消费者被卡住时)
            done := make(chan bool)

            go func() {
                consumer1 <- v
                done <- true
            }()
            go func() {
                consumer2 <- v
                done <- true
            }()
            go func() {
                consumer3 <- v
                done <- true
            }()

            <-done
            <-done
            <-done
        }
    }()

    // 或者进行fan out(当只有一个消费者处理数据就足够时)
    go func() {
        for {
            v := <-mainChannel
            select {
            case consumer1 <- v:
            case consumer2 <- v:
            case consumer3 <- v:
            }
        }
    }()

    // 消费者(你的逻辑)
    go func() { <-consumer1 /* 使用值 */ }()
    go func() { <-consumer2 /* 使用值 */ }()
    go func() { <-consumer3 /* 使用值 */ }()
}

type payload int

var (
    feeder1 = make(chan payload)
    feeder2 = make(chan payload)
    feeder3 = make(chan payload)

    mainChannel = make(chan payload)

    consumer1 = make(chan payload)
    consumer2 = make(chan payload)
    consumer3 = make(chan payload)
)

这段代码展示了如何使用fan infan out模式。其中,feeder1feeder2feeder3将数据汇入mainChannel通道,consumer1consumer2consumer3mainChannel通道接收数据进行处理。通过使用goroutine和通道,可以实现并发的数据处理。

英文:

You can use fan in and fan out (from Rob Pike's speech):

package main
func main() {
// feeders - feeder1, feeder2 and feeder3 are used to fan in
// data into one channel
go func() {
for {
select {
case v1 := &lt;-feeder1:
mainChannel &lt;- v1
case v2 := &lt;-feeder2:
mainChannel &lt;- v2
case v3 := &lt;-feeder3:
mainChannel &lt;- v3
}
}
}()
// dispatchers - not actually fan out rather dispatching data
go func() {
for {
v := &lt;-mainChannel
// use this to prevent leaking goroutines
// (i.e. when one consumer got stuck)
done := make(chan bool)
go func() {
consumer1 &lt;- v
done &lt;- true
}()
go func() {
consumer2 &lt;- v
done &lt;- true
}()
go func() {
consumer3 &lt;- v
done &lt;- true
}()
&lt;-done
&lt;-done
&lt;-done
}
}()
// or fan out (when processing the data by just one consumer is enough)
go func() {
for {
v := &lt;-mainChannel
select {
case consumer1 &lt;- v:
case consumer2 &lt;- v:
case consumer3 &lt;- v:
}
}
}()
// consumers(your logic)
go func() { &lt;-consumer1 /* using the value */ }()
go func() { &lt;-consumer2 /* using the value */ }()
go func() { &lt;-consumer3 /* using the value */ }()
}
type payload int
var (
feeder1 = make(chan payload)
feeder2 = make(chan payload)
feeder3 = make(chan payload)
mainChannel = make(chan payload)
consumer1 = make(chan payload)
consumer2 = make(chan payload)
consumer3 = make(chan payload)
)

答案2

得分: 8

通道是一种点对点的通信方法,而不是广播通信方法,所以不能同时获取事件的两个功能,除非进行特殊处理。

你可以为每个goroutine创建单独的通道,并将消息发送到每个通道中。这可能是最简单的解决方案。

或者,你可以让一个goroutine发出信号给下一个goroutine。

据我所知,Go语言有两种机制可以进行广播信号。一种是关闭通道。但这只能工作一次。

另一种是使用sync.Cond锁。这些锁的使用有一定的技巧,但可以让多个goroutine被单个事件唤醒。

如果我是你,我会选择第一种选项,将事件发送到两个不同的通道。这似乎很好地解决了这个问题。

英文:

Channels are a point to point communication method, not a broadcast communication method, so no, you can't get both functions to get the event without doing something special.

You could have separate channels for both goroutines and send the message into each. This is probably the simplest solution.

Or alternatively you could get one goroutine to signal the next one.

Go has two mechanisms for doing broadcast signalling as far as I know. One is closing a channel. This only works a single time though.

The other is to use a sync.Cond lock. These are moderately tricky to use, but will allow you to have multiple goroutines woken up by a single event.

If I was you, I'd go for the first option, send the event to two different channels. That seems to map the problem quite well.

huangapple
  • 本文由 发表于 2015年2月15日 22:23:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/28527038.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定