Go:一个生产者多个消费者

huangapple go评论78阅读模式
英文:

Go: One producer many consumers

问题

所以我看到了很多在Go中实现一个消费者和多个生产者的方法 - 来自Go并发讲座的经典fanIn函数。

我想要的是一个fanOut函数。它以一个参数作为通道,从中读取一个值,并返回一个通道的切片,将该值的副本写入这些通道。

有没有一种正确/推荐的实现方式?

英文:

So I have seen a lot of ways of implementing one consumer and many producers in Go - the classic fanIn function from the Concurrency in Go talk.

What I want is a fanOut function. It takes as a parameter a channel it reads a value from and returns a slice of channels that it writes copies of this value to.

Is there a correct/recommended way of implementing this?

答案1

得分: 21

package main

import (
"fmt"
"time"
)

func producer(iters int) <-chan int {
c := make(chan int)
go func() {
for i := 0; i < iters; i++ {
c <- i
time.Sleep(1 * time.Second)
}
close(c)
}()
return c
}

func consumer(cin <-chan int) {
for i := range cin {
fmt.Println(i)
}
}

func fanOut(ch <-chan int, size, lag int) []chan int {
cs := make([]chan int, size)
for i, _ := range cs {
// The size of the channels buffer controls how far behind the recievers
// of the fanOut channels can lag the other channels.
cs[i] = make(chan int, lag)
}
go func() {
for i := range ch {
for _, c := range cs {
c <- i
}
}
for _, c := range cs {
// close all our fanOut channels when the input channel is exhausted.
close(c)
}
}()
return cs
}

func fanOutUnbuffered(ch <-chan int, size int) []chan int {
cs := make([]chan int, size)
for i, _ := range cs {
// The size of the channels buffer controls how far behind the recievers
// of the fanOut channels can lag the other channels.
cs[i] = make(chan int)
}
go func() {
for i := range ch {
for _, c := range cs {
c <- i
}
}
for _, c := range cs {
// close all our fanOut channels when the input channel is exhausted.
close(c)
}
}()
return cs
}

func main() {
c := producer(10)
chans := fanOutUnbuffered(c, 3)
go consumer(chans[0])
go consumer(chans1)
consumer(chans2)
}

英文:

You pretty much described the best way to do it but here is a small sample of code that does it.

Go playground: https://play.golang.org/p/jwdtDXVHJk

package main

import (
	&quot;fmt&quot;
	&quot;time&quot;
)

func producer(iters int) &lt;-chan int {
	c := make(chan int)
	go func() {
		for i := 0; i &lt; iters; i++ {
			c &lt;- i
			time.Sleep(1 * time.Second)
		}
		close(c)
	}()
	return c
}

func consumer(cin &lt;-chan int) {
	for i := range cin {
		fmt.Println(i)
	}
}

func fanOut(ch &lt;-chan int, size, lag int) []chan int {
	cs := make([]chan int, size)
	for i, _ := range cs {
		// The size of the channels buffer controls how far behind the recievers
		// of the fanOut channels can lag the other channels.
		cs[i] = make(chan int, lag)
	}
	go func() {
		for i := range ch {
			for _, c := range cs {
				c &lt;- i
			}
		}
		for _, c := range cs {
			// close all our fanOut channels when the input channel is exhausted.
			close(c)
		}
	}()
	return cs
}

func fanOutUnbuffered(ch &lt;-chan int, size int) []chan int {
	cs := make([]chan int, size)
	for i, _ := range cs {
		// The size of the channels buffer controls how far behind the recievers
		// of the fanOut channels can lag the other channels.
		cs[i] = make(chan int)
	}
	go func() {
		for i := range ch {
			for _, c := range cs {
				c &lt;- i
			}
		}
		for _, c := range cs {
			// close all our fanOut channels when the input channel is exhausted.
			close(c)
		}
	}()
	return cs
}

func main() {
	c := producer(10)
	chans := fanOutUnbuffered(c, 3)
	go consumer(chans[0])
	go consumer(chans[1])
	consumer(chans[2])
}

The important part to note is how we close the output channels once the input channel has been exhausted. Also if one of the output channels blocks on the send it will hold up the send on the other output channels. We control the amount of lag by setting the buffer size of the channels.

答案2

得分: 2

这个解决方案有点牵强,但对我来说有效:

package main

import (
	"fmt"
	"time"
	"crypto/rand"
	"encoding/binary"
)

func handleNewChannels(arrchangen chan [](chan uint32),
					   intchangen chan (chan uint32)) {
	currarr := []chan uint32{}
	arrchangen <- currarr
	for {
		newchan := <-intchangen
		currarr = append(currarr, newchan)
		arrchangen <- currarr
	}
}

func sendToChannels(arrchangen chan [](chan uint32)) {
	tick := time.Tick(1 * time.Second)
	currarr := <-arrchangen
	for {
		select {
		case <-tick:
			sent := false
			var n uint32
			binary.Read(rand.Reader, binary.LittleEndian, &n)
			for i := 0 ; i < len(currarr) ; i++ {
				currarr[i] <- n
				sent = true
			}
			if sent {
				fmt.Println("Sent generated ", n)
			}
		case newarr := <-arrchangen:
			currarr = newarr
		}
	}
}
func handleChannel(tchan chan uint32) {
	for {
		val := <-tchan
		fmt.Println("Got the value ", val)
	}
}

func createChannels(intchangen chan (chan uint32)) {
	othertick := time.Tick(5 * time.Second)
	for {
		<-othertick
		fmt.Println("Creating new channel! ")
		newchan := make(chan uint32)
		intchangen <- newchan
		go handleChannel(newchan)
	}
}

func main() {
	arrchangen := make(chan [](chan uint32))
	intchangen := make(chan (chan uint32))
	go handleNewChannels(arrchangen, intchangen)
	go sendToChannels(arrchangen)
	createChannels(intchangen)
}
英文:

This solution below is a bit contrived, but it works for me:

package main

import (
	&quot;fmt&quot;
	&quot;time&quot;
	&quot;crypto/rand&quot;
	&quot;encoding/binary&quot;
)

func handleNewChannels(arrchangen chan [](chan uint32),
					   intchangen chan (chan uint32)) {
	currarr := []chan uint32{}
	arrchangen &lt;- currarr
	for {
		newchan := &lt;-intchangen
		currarr = append(currarr, newchan)
		arrchangen &lt;- currarr
	}
}

func sendToChannels(arrchangen chan [](chan uint32)) {
	tick := time.Tick(1 * time.Second)
	currarr := &lt;-arrchangen
	for {
		select {
		case &lt;-tick:
			sent := false
			var n uint32
			binary.Read(rand.Reader, binary.LittleEndian, &amp;n)
			for i := 0 ; i &lt; len(currarr) ; i++ {
				currarr[i] &lt;- n
				sent = true
			}
			if sent {
				fmt.Println(&quot;Sent generated &quot;, n)
			}
		case newarr := &lt;-arrchangen:
			currarr = newarr
		}
	}
}
func handleChannel(tchan chan uint32) {
	for {
		val := &lt;-tchan
		fmt.Println(&quot;Got the value &quot;, val)
	}
}

func createChannels(intchangen chan (chan uint32)) {
	othertick := time.Tick(5 * time.Second)
	for {
		&lt;-othertick
		fmt.Println(&quot;Creating new channel! &quot;)
		newchan := make(chan uint32)
		intchangen &lt;- newchan
		go handleChannel(newchan)
	}
}

func main() {
	arrchangen := make(chan [](chan uint32))
	intchangen := make(chan (chan uint32))
	go handleNewChannels(arrchangen, intchangen)
	go sendToChannels(arrchangen)
	createChannels(intchangen)
}

答案3

得分: 0

首先,查看相关问题https://stackoverflow.com/questions/11075876/what-is-the-neatest-idiom-for-producer-consumer-in-go和https://stackoverflow.com/questions/16035127/one-thread-showing-interest-in-another-thread-consumer-producer。另外,可以参考生产者-消费者问题。关于并发性,请参阅如何在Google Go中实现并发性

英文:

First, see related question https://stackoverflow.com/questions/11075876/what-is-the-neatest-idiom-for-producer-consumer-in-go and https://stackoverflow.com/questions/16035127/one-thread-showing-interest-in-another-thread-consumer-producer. Also, take a look to producer-consumer problem. About concurrency see how to achieve concurrency In Google Go.

答案4

得分: 0

我们可以处理多个消费者,而不需要为每个消费者复制通道数据。

Go playground: https://play.golang.org/p/yOKindnqiZv

package main

import (
    "fmt"
    "sync"
)

type data struct {
    msg string
    consumers int
}

func main() {
    ch := make(chan *data) // both block or non-block are ok
    var wg sync.WaitGroup
    consumerCount := 3 // specify no. of consumers

    producer := func() {
        obj := &data {
            msg: "hello everyone!",
            consumers: consumerCount,
        }
        ch <- obj
    }
    consumer := func(idx int) {
        defer wg.Done()
        obj := <-ch
        fmt.Printf("consumer %d received data %v\n", idx, obj)
        obj.consumers--
        if obj.consumers > 0 {
            ch <- obj // forward to others
        } else {
            fmt.Printf("last receiver: %d\n", idx)
        }
    }

    go producer()
    for i:=1; i<=consumerCount; i++ {
        wg.Add(1)
        go consumer(i)
    }

    wg.Wait()
}
英文:

We can handle multiple consumers without making the copy of channel data for each consumer.

Go playground: https://play.golang.org/p/yOKindnqiZv

package main

import (
    &quot;fmt&quot;
    &quot;sync&quot;
)

type data struct {
    msg string
    consumers int
}

func main() {
    ch := make(chan *data) // both block or non-block are ok
    var wg sync.WaitGroup
    consumerCount := 3 // specify no. of consumers

    producer := func() {
        obj := &amp;data {
            msg: &quot;hello everyone!&quot;,
            consumers: consumerCount,
        }
        ch &lt;- obj
    }
    consumer := func(idx int) {
        defer wg.Done()
        obj := &lt;-ch
        fmt.Printf(&quot;consumer %d received data %v\n&quot;, idx, obj)
        obj.consumers--
        if obj.consumers &gt; 0 {
            ch &lt;- obj // forward to others
        } else {
            fmt.Printf(&quot;last receiver: %d\n&quot;, idx)
        }
    }

    go producer()
    for i:=1; i&lt;=consumerCount; i++ {
        wg.Add(1)
        go consumer(i)
    }

    wg.Wait()
}

huangapple
  • 本文由 发表于 2013年6月5日 09:43:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/16930251.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定