在Go语言中,生产者/消费者模式最简洁的习语是什么?

huangapple go评论98阅读模式
英文:

What is the neatest idiom for producer/consumer in Go?

问题

我想做的是有一组生产者goroutine(其中一些可能会完成,也可能不会完成)和一个消费者routine。问题在于括号中的那个附加条件 - 我们不知道会返回答案的总数。

所以我想做的是这样的:

package main
     
import (
  "fmt"
  "math/rand"
)

func producer(c chan int) {
  // 可能会产生,也可能不会产生。
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
}

func main() {
  c := make(chan int, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // 如果我们包括一个close,那是错误的。通道将被关闭,但是生产者将尝试向其写入。运行时错误。
  close(c)

  // 如果我们不关闭,那也是错误的。所有的goroutine都将死锁,因为range关键字将寻找一个关闭。
  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}

所以问题是,如果我关闭它是错误的,如果我不关闭它 - 还是错误的(请参见代码中的注释)。

现在,解决方案是一个带有所有生产者写入的带外信号通道:

package main
     
import (
  "fmt"
  "math/rand"
)

func producer(c chan int, signal chan bool) {
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
  signal <- true
}

func main() {
  c := make(chan int, 10)
  signal := make(chan bool, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // 这基本上是一个“join”。
  num_done := 0
  for num_done < 10 {
    <- signal
    num_done++
  }
  close(c)

  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}

这完全符合我的要求!但对我来说,它似乎有点啰嗦。我的问题是:有没有任何惯用法/技巧可以让我以更简单的方式做类似的事情?

我在这里看了一下:http://golang.org/doc/codewalk/sharemem/
看起来complete通道(在main开始时初始化)在range中使用,但从未关闭。我不明白它是如何工作的。

如果有人有任何见解,我将非常感激。谢谢!

英文:

What I would like to do is have a set of producer goroutines (of which some may or may not complete) and a consumer routine. The issue is with that caveat in parentheses - we don't know the total number that will return an answer.

So what I want to do is this:

package main
 
import (
  &quot;fmt&quot;
  &quot;math/rand&quot;
)

func producer(c chan int) {
  // May or may not produce.
  success := rand.Float32() &gt; 0.5
  if success {
    c &lt;- rand.Int()
  }
}

func main() {
  c := make(chan int, 10)
  for i := 0; i &lt; 10; i++ {
    go producer(c, signal)
  }

  // If we include a close, then that&#39;s WRONG. Chan will be closed
  // but a producer will try to write to it. Runtime error.
  close(c)

  // If we don&#39;t close, then that&#39;s WRONG. All goroutines will
  // deadlock, since the range keyword will look for a close.
  for num := range c {
    fmt.Printf(&quot;Producer produced: %d\n&quot;, num)
  }
  fmt.Println(&quot;All done.&quot;)
}

So the issue is, if I close it's wrong, if I don't close - it's still wrong (see comments in code).

Now, the solution would be an out-of-band signal channel, that ALL producers write to:

package main
 
import (
  &quot;fmt&quot;
  &quot;math/rand&quot;
)

func producer(c chan int, signal chan bool) {
  success := rand.Float32() &gt; 0.5
  if success {
    c &lt;- rand.Int()
  }
  signal &lt;- true
}

func main() {
  c := make(chan int, 10)
  signal := make(chan bool, 10)
  for i := 0; i &lt; 10; i++ {
    go producer(c, signal)
  }

  // This is basically a &#39;join&#39;.
  num_done := 0
  for num_done &lt; 10 {
    &lt;- signal
    num_done++
  }
  close(c)

  for num := range c {
    fmt.Printf(&quot;Producer produced: %d\n&quot;, num)
  }
  fmt.Println(&quot;All done.&quot;)
}

And that totally does what I want! But to me it seems like a mouthful. My question is: Is there any idiom/trick that lets me do something similar in an easier way?

I had a look here: http://golang.org/doc/codewalk/sharemem/
And it seems like the complete chan (initialised at the start of main) is used in a range but never closed. I do not understand how.

If anyone has any insights, I would greatly appreciate it. Cheers!


Edit: fls0815 has the answer, and has also answered the question of how the close-less channel range works.

My code above modifed to work (done before fls0815 kindly supplied code):

package main
    
import (
  &quot;fmt&quot;
  &quot;math/rand&quot;
  &quot;sync&quot;
)

var wg_prod sync.WaitGroup
var wg_cons sync.WaitGroup

func producer(c chan int) {
  success := rand.Float32() &gt; 0.5
  if success {
    c &lt;- rand.Int()
  }
  wg_prod.Done()
}

func main() {
  c := make(chan int, 10)
  wg_prod.Add(10)
  for i := 0; i &lt; 10; i++ {
    go producer(c)
  }

  wg_cons.Add(1)
  go func() {
    for num := range c {
      fmt.Printf(&quot;Producer produced: %d\n&quot;, num)
    }
    wg_cons.Done()
  } ()

  wg_prod.Wait()
  close(c)
  wg_cons.Wait()
  fmt.Println(&quot;All done.&quot;)
}

答案1

得分: 16

只有生产者才能关闭通道。一旦你的生产者开始工作,你可以通过调用迭代(range)结果通道的消费者来实现你的目标。在主线程中,你等待(参见sync.WaitGroup)直到你的消费者/生产者完成工作。生产者完成后,关闭结果通道,这将强制消费者退出(当通道关闭且没有缓冲项时,range将退出)。

示例代码:

package main

import (
    "log"
    "sync"
    "time"
    "math/rand"
    "runtime"
)

func consumer() {
    defer consumer_wg.Done()

    for item := range resultingChannel {
        log.Println("Consumed:", item)
    }
}

func producer() {
    defer producer_wg.Done()
    
    success := rand.Float32() > 0.5
    if success {
        resultingChannel <- rand.Int()
    }
}

var resultingChannel = make(chan int)
var producer_wg sync.WaitGroup
var consumer_wg sync.WaitGroup

func main() {
    rand.Seed(time.Now().Unix())

    for c := 0; c < runtime.NumCPU(); c++ {
        producer_wg.Add(1)    
        go producer()
    }

    for c := 0; c < runtime.NumCPU(); c++ {
        consumer_wg.Add(1)
        go consumer()
    }
    
    producer_wg.Wait()
    
    close(resultingChannel)
    
    consumer_wg.Wait()
}

我将close语句放在主函数中的原因是因为我们有多个生产者。在上面的示例中,关闭一个生产者的通道会导致你已经遇到的问题(在关闭的通道上写入;原因是可能还有一个生产者仍在产生数据)。通道应该只在没有剩余生产者时关闭(因此我建议只由生产者关闭通道)。这是Go中通道的构造方式。这里你可以找到有关关闭通道的更多信息。


关于sharemem示例:据我所见,这个示例通过不断将资源重新排队(从待处理 -> 完成 -> 待处理 -> 完成...等等)来无限运行。这是主函数末尾的迭代所做的。它接收已完成的资源,并使用Resource.Sleep()将它们重新排队到待处理状态。当没有已完成的资源时,它会等待并阻塞以等待新的资源完成。因此,没有必要关闭通道,因为它们一直在使用中。

英文:

Only producers should close channels. You could achieve your goal by invoking consumer(s) which iterates (range) over the resulting channel once your producers were started. In your main thread you wait (see sync.WaitGroup) until your consumers/producers finished their work. After producers finished you close the resulting channel which will force your consumers to exit (range will exit when channels are closed and no buffered item is left).

Example code:

package main

import (
	&quot;log&quot;
	&quot;sync&quot;
	&quot;time&quot;
	&quot;math/rand&quot;
	&quot;runtime&quot;
)

func consumer() {
	defer consumer_wg.Done()

	for item := range resultingChannel {
		log.Println(&quot;Consumed:&quot;, item)
	}
}

func producer() {
	defer producer_wg.Done()
	
	success := rand.Float32() &gt; 0.5
	if success {
    	resultingChannel &lt;- rand.Int()
  	}
}

var resultingChannel = make(chan int)
var producer_wg sync.WaitGroup
var consumer_wg sync.WaitGroup

func main() {
	rand.Seed(time.Now().Unix())

	for c := 0; c &lt; runtime.NumCPU(); c++ {
		producer_wg.Add(1)	
		go producer()
	}

	for c := 0; c &lt; runtime.NumCPU(); c++ {
		consumer_wg.Add(1)
		go consumer()
	}
	
	producer_wg.Wait()
	
	close(resultingChannel)
	
	consumer_wg.Wait()
}

The reason I put the close-statement into the main function is because we have more than one producer. Closing the channel in one producer in the example above would lead to the problem you already ran into (writing on closed channels; the reason is that there could one producer left who still produces data). Channels should only be closed when there is no producer left (therefore my suggestion on closing the channel only by the producer). This is how channels are constructed in Go. Here you'll find some more information on closing channels.


Related to the sharemem example: AFAICS this example runs endless by re-queuing the Resources again and again (from pending -> complete -> pending -> complete... and so on). This is what the iteration at the end of the main-func does. It receives the completed Resources and re-queues them using Resource.Sleep() to pending. When there is no completed Resource it waits and blocks for new Resources being completed. Therefore there is no need to close the channels because they are in use all the time.

答案2

得分: 1

这里是使用Go语言中基本的同步通道来解决问题的一个解决方案。没有缓冲通道,没有关闭通道,也没有等待组。

这个解决方案与你的“冗长”解决方案并没有太大的差别,抱歉让你失望,它也没有更小。它将消费者放在它自己的goroutine中,以便消费者可以在生产者生产数字时消费它们。它还区分了生产的“尝试”可以以成功或失败结束。如果生产失败,尝试立即结束。如果成功,则尝试直到数字被消费才结束。

package main

import (
	"fmt"
	"math/rand"
)

func producer(c chan int, fail chan bool) {
	if success := rand.Float32() > 0.5; success {
		c <- rand.Int()
	} else {
		fail <- true
	}
}

func consumer(c chan int, success chan bool) {
	for {
		num := <-c
		fmt.Printf("Producer produced: %d\n", num)
		success <- true
	}
}

func main() {
	const nTries = 10
	c := make(chan int)
	done := make(chan bool)
	for i := 0; i < nTries; i++ {
		go producer(c, done)
	}
	go consumer(c, done)

	for i := 0; i < nTries; i++ {
		<-done
	}
	fmt.Println("All done.")
}
英文:

There are always lots of ways to solve these problems. Here's a solution using the simple synchronous channels that are fundamental in Go. No buffered channels, no closing channels, no WaitGroups.

It's really not that far from your "mouthful" solution, and--sorry to disappoint--not that much smaller. It does put the consumer in it's own goroutine, so that the consumer can consume numbers as the producer produces them. It also makes the distinction that a production "try" can end in either success or failure. If production fails, the try is done immediately. If it succeeds, the try is not done until the number is consumed.

package main

import (
	&quot;fmt&quot;
	&quot;math/rand&quot;
)

func producer(c chan int, fail chan bool) {
	if success := rand.Float32() &gt; 0.5; success {
		c &lt;- rand.Int()
	} else {
		fail &lt;- true
	}
}

func consumer(c chan int, success chan bool) {
	for {
		num := &lt;-c
		fmt.Printf(&quot;Producer produced: %d\n&quot;, num)
		success &lt;- true
	}
}

func main() {
	const nTries = 10
	c := make(chan int)
	done := make(chan bool)
	for i := 0; i &lt; nTries; i++ {
		go producer(c, done)
	}
	go consumer(c, done)

	for i := 0; i &lt; nTries; i++ {
		&lt;-done
	}
	fmt.Println(&quot;All done.&quot;)
}

答案3

得分: 0

我正在添加这个原因是因为现有的答案没有澄清一些事情。首先,代码演示中的range循环只是一个无限事件循环,用于不断重新检查和更新相同的url列表。

其次,一个通道本身就是Go中惯用的生产者-消费者队列。异步缓冲区的大小决定了生产者在受到背压之前可以生产多少产品。将N = 0设置为了看到生产者和消费者之间的同步,没有人领先或落后。目前,N = 10将允许生产者在阻塞之前生产多达10个产品。

最后,Go中有一些很好的惯用法来编写通信顺序进程(例如,为您启动go例程的函数,使用for/select模式进行通信和接受控制命令)。我认为WaitGroups很笨拙,希望看到惯用的示例。

package main

import (
    "fmt"
    "time"
)

type control int
const  (
    sleep control = iota
    die // receiver will close the control chan in response to die, to ack.
)

func (cmd control) String() string {
    switch cmd {
    case sleep: return "sleep"
    case die: return "die"
    }
    return fmt.Sprintf("%d",cmd)
}

func ProduceTo(writechan chan<- int, ctrl chan control, done chan bool) {
    var product int
    go func() {
        for {
            select {
        case writechan <- product:
            fmt.Printf("Producer produced %v\n", product)
            product++
        case cmd:= <- ctrl:
            fmt.Printf("Producer got control cmd: %v\n", cmd)
            switch cmd {
            case sleep:
                fmt.Printf("Producer sleeping 2 sec.\n")
                time.Sleep(2000 * time.Millisecond)
            case die:
                fmt.Printf("Producer dies.\n")
                close(done)
                return
            }
            }
        }
    }()
}

func ConsumeFrom(readchan <-chan int, ctrl chan control, done chan bool) {
    go func() {
        var product int
        for {
            select {
            case product = <-readchan:
                fmt.Printf("Consumer consumed %v\n", product)
            case cmd:= <- ctrl:
                fmt.Printf("Consumer got control cmd: %v\n", cmd)
                switch cmd {
                case sleep:
                    fmt.Printf("Consumer sleeping 2 sec.\n")
                    time.Sleep(2000 * time.Millisecond)
                case die:
                    fmt.Printf("Consumer dies.\n")
                    close(done)
                    return
                }

            }
        }
    }()
}

func main() {

    N := 10
    q := make(chan int, N)

    prodCtrl := make(chan control)
    consCtrl := make(chan control)

    prodDone := make(chan bool)
    consDone := make(chan bool)


    ProduceTo(q, prodCtrl, prodDone)
    ConsumeFrom(q, consCtrl, consDone)

    // wait for a moment, to let them produce and consume
    timer := time.NewTimer(10 * time.Millisecond)
    <-timer.C

    // tell producer to pause
    fmt.Printf("telling producer to pause\n")
    prodCtrl <- sleep

    // wait for a second
    timer = time.NewTimer(1 * time.Second)
    <-timer.C

    // tell consumer to pause
    fmt.Printf("telling consumer to pause\n")
    consCtrl <- sleep


    // tell them both to finish
    prodCtrl <- die
    consCtrl <- die

    // wait for that to actually happen
    <-prodDone
    <-consDone
}
英文:

I'm adding this because the extant answers don't make a couple things clear. First, the range loop in the codewalk example is just an infinite event loop, there to keep re-checking and updating the same url list forever.

Next, a channel, all by itself, already is the idiomatic consumer-producer queue in Go. The size of the async buffer backing the channel determines how much producers can produce before getting backpressure. Set N = 0 below to see lock-step producer consumer without anyone racing ahead or getting behind. As it is, N = 10 will let the producer produce up to 10 products before blocking.

Last, there are some nice idioms for writing communicating sequential processees in Go (e.g. functions that start go routines for you, and using the for/select pattern to communicate and accept control commands). I think of WaitGroups as clumsy, and would like to see idiomatic examples instead.

package main
import (
&quot;fmt&quot;
&quot;time&quot;
)
type control int
const  (
sleep control = iota
die // receiver will close the control chan in response to die, to ack.
)
func (cmd control) String() string {
switch cmd {
case sleep: return &quot;sleep&quot;
case die: return &quot;die&quot;
}
return fmt.Sprintf(&quot;%d&quot;,cmd)
}
func ProduceTo(writechan chan&lt;- int, ctrl chan control, done chan bool) {
var product int
go func() {
for {
select {
case writechan &lt;- product:
fmt.Printf(&quot;Producer produced %v\n&quot;, product)
product++
case cmd:= &lt;- ctrl:
fmt.Printf(&quot;Producer got control cmd: %v\n&quot;, cmd)
switch cmd {
case sleep:
fmt.Printf(&quot;Producer sleeping 2 sec.\n&quot;)
time.Sleep(2000 * time.Millisecond)
case die:
fmt.Printf(&quot;Producer dies.\n&quot;)
close(done)
return
}
}
}
}()
}
func ConsumeFrom(readchan &lt;-chan int, ctrl chan control, done chan bool) {
go func() {
var product int
for {
select {
case product = &lt;-readchan:
fmt.Printf(&quot;Consumer consumed %v\n&quot;, product)
case cmd:= &lt;- ctrl:
fmt.Printf(&quot;Consumer got control cmd: %v\n&quot;, cmd)
switch cmd {
case sleep:
fmt.Printf(&quot;Consumer sleeping 2 sec.\n&quot;)
time.Sleep(2000 * time.Millisecond)
case die:
fmt.Printf(&quot;Consumer dies.\n&quot;)
close(done)
return
}
}
}
}()
}
func main() {
N := 10
q := make(chan int, N)
prodCtrl := make(chan control)
consCtrl := make(chan control)
prodDone := make(chan bool)
consDone := make(chan bool)
ProduceTo(q, prodCtrl, prodDone)
ConsumeFrom(q, consCtrl, consDone)
// wait for a moment, to let them produce and consume
timer := time.NewTimer(10 * time.Millisecond)
&lt;-timer.C
// tell producer to pause
fmt.Printf(&quot;telling producer to pause\n&quot;)
prodCtrl &lt;- sleep
// wait for a second
timer = time.NewTimer(1 * time.Second)
&lt;-timer.C
// tell consumer to pause
fmt.Printf(&quot;telling consumer to pause\n&quot;)
consCtrl &lt;- sleep
// tell them both to finish
prodCtrl &lt;- die
consCtrl &lt;- die
// wait for that to actually happen
&lt;-prodDone
&lt;-consDone
}

答案4

得分: 0

你可以使用简单的非缓冲通道和生成器模式配合使用fanIn函数。

在生成器模式中,每个生产者返回一个通道,并负责关闭它。然后,fanIn函数遍历这些通道,并将它们返回的值转发到一个单独的通道中。

当然,问题是当每个通道关闭时,fanIn函数会转发通道类型(int)的零值。

你可以通过使用通道类型的零值作为哨兵值来解决这个问题,并且只在fanIn通道的结果不是零值时使用它们。

这里是一个例子:

package main
import (
"fmt"
"math/rand"
)
const offset = 1
func producer() chan int {
cout := make(chan int)
go func() {
defer close(cout)
// 可能会产生或者不产生。
success := rand.Float32() > 0.5
if success {
cout <- rand.Int() + offset
}
}()
return cout
}
func fanIn(cin []chan int) chan int {
cout := make(chan int)
go func() {
defer close(cout)
for _, c := range cin {
cout <- <-c
}
}()
return cout
}
func main() {
chans := make([]chan int, 0)
for i := 0; i < 10; i++ {
chans = append(chans, producer())
}
for num := range fanIn(chans) {
if num > offset {
fmt.Printf("Producer produced: %d\n", num)
}
}
fmt.Println("All done.")
}
英文:

You can use simple unbuffered channels without wait groups if you use the generator pattern with a fanIn function.

In the generator pattern, each producer returns a channel and is responsible for closing it. A fanIn function then iterates over these channels and forwards the values returned on them down a single channel that it returns.

The problem of course, is that the fanIn function forwards the zero value of the channel type (int) when each channel is closed.

You can work around it by using the zero value of your channel type as a sentinel value and only using the results from the fanIn channel if they are not the zero value.

Here's an example:

package main
import (
&quot;fmt&quot;
&quot;math/rand&quot;
)
const offset = 1
func producer() chan int {
cout := make(chan int)
go func() {
defer close(cout)
// May or may not produce.
success := rand.Float32() &gt; 0.5
if success {
cout &lt;- rand.Int() + offset
}
}()
return cout
}
func fanIn(cin []chan int) chan int {
cout := make(chan int)
go func() {
defer close(cout)
for _, c := range cin {
cout &lt;- &lt;-c
}
}()
return cout
}
func main() {
chans := make([]chan int, 0)
for i := 0; i &lt; 10; i++ {
chans = append(chans, producer())
}
for num := range fanIn(chans) {
if num &gt; offset {
fmt.Printf(&quot;Producer produced: %d\n&quot;, num)
}
}
fmt.Println(&quot;All done.&quot;)
}

答案5

得分: 0

生产者-消费者是一种常见的模式,我编写了一个名为prosumer的库,用于方便处理通道通信。例如:

func main() {
	maxLoop := 10
	var wg sync.WaitGroup
	wg.Add(maxLoop)
	defer wg.Wait()
	
	consumer := func(ls []interface{}) error {
		fmt.Printf("获取到 %+v \n", ls)
		wg.Add(-len(ls))
		return nil
	}

	conf := prosumer.DefaultConfig(prosumer.Consumer(consumer))
	c := prosumer.NewCoordinator(conf)
	c.Start()
    defer c.Close(true)

	for i := 0; i < maxLoop; i++ {
		fmt.Printf("尝试放入 %v\n", i)
		discarded, err := c.Put(i)
		if err != nil {
			fmt.Errorf("因错误 %v 丢弃元素 %+v", err, discarded)
			wg.Add(-len(discarded))
		}
		time.Sleep(time.Second)
	}

}

close函数有一个名为graceful的参数,表示是否排空底层通道。

英文:

producer-consumer is such a common pattern that I write a library prosumer for convenience with dealing with chan communication carefully. Eg:

func main() {
	maxLoop := 10
	var wg sync.WaitGroup
	wg.Add(maxLoop)
	defer wg.Wait()
	
	consumer := func(ls []interface{}) error {
		fmt.Printf(&quot;get %+v \n&quot;, ls)
		wg.Add(-len(ls))
		return nil
	}

	conf := prosumer.DefaultConfig(prosumer.Consumer(consumer))
	c := prosumer.NewCoordinator(conf)
	c.Start()
    defer c.Close(true)

	for i := 0; i &lt; maxLoop; i++ {
		fmt.Printf(&quot;try put %v\n&quot;, i)
		discarded, err := c.Put(i)
		if err != nil {
			fmt.Errorf(&quot;discarded elements %+v for err %v&quot;, discarded, err)
			wg.Add(-len(discarded))
		}
		time.Sleep(time.Second)
	}

}

close has a param called graceful, which means whether drain the underlying chan.

huangapple
  • 本文由 发表于 2012年6月18日 08:38:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/11075876.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定