如何通过消费者限制生产者并读取消息?

huangapple go评论79阅读模式
英文:

How to limited producer and read messages by consumers?

问题

我想用Go语言实现一个生产者-消费者模型(通过信号进行关闭)。

生产者不断在一个队列中生成消息,队列的限制为10个。一些消费者读取并处理这个通道。如果队列中的消息数量为0,生产者会再次生成10个消息。当接收到停止信号时,生产者停止生成新的消息,消费者处理通道中的所有内容。

我找到了一段代码,但无法确定它是否正常工作,因为发现了一些奇怪的事情:

  1. 为什么在程序停止后,队列中的并非所有消息都被处理,似乎有一部分数据丢失了。(在截图中,发送了15条消息,但只处理了5条)
  2. 如何正确地将队列限制为10条消息,即写入10条消息,等待队列计数器变为0时进行处理,然后再写入10条消息?
  3. 是否可以在停止信号后通知生产者,使其不再向通道生成新的消息?(在截图中,生产者成功写入了队列-12、13、14、15)

结果:

如何通过消费者限制生产者并读取消息?

代码示例:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

func main() {
	const nConsumers = 2

	in := make(chan int, 10)
	p := Producer{&in}
	c := Consumer{&in, make(chan int, nConsumers)}
	go p.Produce()
	ctx, cancelFunc := context.WithCancel(context.Background())
	go c.Consume(ctx)
	wg := &sync.WaitGroup{}
	wg.Add(nConsumers)
	for i := 1; i <= nConsumers; i++ {
		go c.Work(wg, i)
	}
	termChan := make(chan os.Signal, 1)
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

	<-termChan

	cancelFunc()
	wg.Wait()
}

type Consumer struct {
	in   *chan int
	jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
	defer wg.Done()
	for job := range c.jobs {
		fmt.Printf("Worker #%d start job %d\n", i, job)
		time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
		fmt.Printf("Worker #%d finish job %d\n", i, job)
	}
	fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
	for {
		select {
		case job := <-*c.in:
			c.jobs <- job
		case <-ctx.Done():
			close(c.jobs)
			fmt.Println("Consumer close channel")
			return
		}
	}
}

type Producer struct {
	in *chan int
}

func (p Producer) Produce() {
	task := 1
	for {
		*p.in <- task
		fmt.Printf("Send value %d\n", task)
		task++
		time.Sleep(time.Millisecond * 500)
	}
}
英文:

I want get application producer-consumer(with shutdown by signals) with Go.

A producer constantly generate messages in a queue with a limit of 10.
Some consumers read and process this channel.
If the number of messages in the queue is 0, the producer again generates 10 messages.
When a stop signal is received, the producer stops generating new messages, and the consumers process everything that is in the channel.

I found a code, but can't understand if it works correctly because found strange things:

  1. Why, after stopping the program, not all messages from the queue were processed, it seems that part of the data was lost. (in the screenshot, 15 messages were sent, but 5 were processed)
  2. How to correctly limit the queue to a limit of 10 messages, that is, we must write 10 messages, wait for processing when the counter of queue becomes 0 and write 10 again?
  3. Is it possible inform the producer after stop signal so that he no longer generates new messages to the channel? (In the screenshot, the producer managed to write to the queue - 12,13,14,15)

Result:

如何通过消费者限制生产者并读取消息?

Code example:

package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;math/rand&quot;
&quot;os&quot;
&quot;os/signal&quot;
&quot;sync&quot;
&quot;syscall&quot;
&quot;time&quot;
)
func main() {
const nConsumers = 2
in := make(chan int, 10)
p := Producer{&amp;in}
c := Consumer{&amp;in, make(chan int, nConsumers)}
go p.Produce()
ctx, cancelFunc := context.WithCancel(context.Background())
go c.Consume(ctx)
wg := &amp;sync.WaitGroup{}
wg.Add(nConsumers)
for i := 1; i &lt;= nConsumers; i++ {
go c.Work(wg, i)
}
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
&lt;-termChan
cancelFunc()
wg.Wait()
}
type Consumer struct {
in   *chan int
jobs chan int
}
func (c Consumer) Work(wg *sync.WaitGroup, i int) {
defer wg.Done()
for job := range c.jobs {
fmt.Printf(&quot;Worker #%d start job %d\n&quot;, i, job)
time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
fmt.Printf(&quot;Worker #%d finish job %d\n&quot;, i, job)
}
fmt.Printf(&quot;Worker #%d interrupted\n&quot;, i)
}
func (c Consumer) Consume(ctx context.Context) {
for {
select {
case job := &lt;-*c.in:
c.jobs &lt;- job
case &lt;-ctx.Done():
close(c.jobs)
fmt.Println(&quot;Consumer close channel&quot;)
return
}
}
}
type Producer struct {
in *chan int
}
func (p Producer) Produce() {
task := 1
for {
*p.in &lt;- task
fmt.Printf(&quot;Send value %d\n&quot;, task)
task++
time.Sleep(time.Millisecond * 500)
}
}

答案1

得分: 1

这是因为当ctx完成时,(Consumer).Consume停止从in通道读取,但由go p.Produce()创建的goroutine仍然向in通道写入。

下面的演示修复了这个问题并简化了源代码。

注意

  1. Producectx完成时停止,并关闭in通道。

  2. Consumer中的jobs字段被移除,工作线程直接从in通道读取。

  3. 忽略了以下要求,因为它很奇怪。常见的行为是,当产生一个作业时,如果in通道不满,作业将立即发送到in通道;当通道满时,发送操作将阻塞,直到从in通道读取一个作业。

    如果队列中的消息数为0,则生产者再次生成10条消息

package main

import (
	"context"
	"fmt"
	"math/rand"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

func main() {
	const nConsumers = 2

	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	in := make(chan int, 10)
	p := Producer{in}
	c := Consumer{in}
	go p.Produce(ctx)

	var wg sync.WaitGroup
	wg.Add(nConsumers)
	for i := 1; i <= nConsumers; i++ {
		go c.Work(&wg, i)
	}

	<-ctx.Done()
	fmt.Printf("\n收到结束信号,等待%d个作业完成\n", len(in))
	wg.Wait()
}

type Consumer struct {
	in chan int
}

func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
	defer wg.Done()
	for job := range c.in {
		fmt.Printf("工人 #%d 开始作业 %d\n", i, job)
		time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
		fmt.Printf("工人 #%d 完成作业 %d\n", i, job)
	}
	fmt.Printf("工人 #%d 被中断\n", i)
}

type Producer struct {
	in chan int
}

func (p *Producer) Produce(ctx context.Context) {
	task := 1
	for {
		select {
		case p.in <- task:
			fmt.Printf("发送值 %d\n", task)
			task++
			time.Sleep(time.Millisecond * 500)
		case <-ctx.Done():
			close(p.in)
			return
		}
	}
}
英文:

> Why, after stopping the program, not all messages from the queue were processed, it seems that part of the data was lost.

That's because when the ctx is done, (Consumer).Consume stops reading from the in channel, but the goroutine created by go p.Produce() still writes to the in channel.

The demo below fixes this issue and simplify the source code.

Notes:

  1. Produce stops when the ctx is done. And it closes the in channel.

  2. The field jobs is removed from the Consumer and the workers read from the in channel directly.

  3. The following requirement is ignored because it's weird. A common behavior is when a job is produced, and the in channel is not full, the job will be sent to the in channel immediately; when it's full, the send operation will block until a job is read from the in channel.

    > If the number of messages in the queue is 0, the producer again generates 10 messages

package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;math/rand&quot;
	&quot;os/signal&quot;
	&quot;sync&quot;
	&quot;syscall&quot;
	&quot;time&quot;
)

func main() {
	const nConsumers = 2

	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	in := make(chan int, 10)
	p := Producer{in}
	c := Consumer{in}
	go p.Produce(ctx)

	var wg sync.WaitGroup
	wg.Add(nConsumers)
	for i := 1; i &lt;= nConsumers; i++ {
		go c.Work(&amp;wg, i)
	}

	&lt;-ctx.Done()
	fmt.Printf(&quot;\nGot end signal, waiting for %d jobs to finish\n&quot;, len(in))
	wg.Wait()
}

type Consumer struct {
	in chan int
}

func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
	defer wg.Done()
	for job := range c.in {
		fmt.Printf(&quot;Worker #%d start job %d\n&quot;, i, job)
		time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
		fmt.Printf(&quot;Worker #%d finish job %d\n&quot;, i, job)
	}
	fmt.Printf(&quot;Worker #%d interrupted\n&quot;, i)
}

type Producer struct {
	in chan int
}

func (p *Producer) Produce(ctx context.Context) {
	task := 1
	for {
		select {
		case p.in &lt;- task:
			fmt.Printf(&quot;Send value %d\n&quot;, task)
			task++
			time.Sleep(time.Millisecond * 500)
		case &lt;-ctx.Done():
			close(p.in)
			return
		}
	}
}

huangapple
  • 本文由 发表于 2023年5月24日 03:22:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76318008.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定