英文:
How to limited producer and read messages by consumers?
问题
我想用Go语言实现一个生产者-消费者模型(通过信号进行关闭)。
生产者不断在一个队列中生成消息,队列的限制为10个。一些消费者读取并处理这个通道。如果队列中的消息数量为0,生产者会再次生成10个消息。当接收到停止信号时,生产者停止生成新的消息,消费者处理通道中的所有内容。
我找到了一段代码,但无法确定它是否正常工作,因为发现了一些奇怪的事情:
- 为什么在程序停止后,队列中的并非所有消息都被处理,似乎有一部分数据丢失了。(在截图中,发送了15条消息,但只处理了5条)
- 如何正确地将队列限制为10条消息,即写入10条消息,等待队列计数器变为0时进行处理,然后再写入10条消息?
- 是否可以在停止信号后通知生产者,使其不再向通道生成新的消息?(在截图中,生产者成功写入了队列-12、13、14、15)
结果:
代码示例:
package main
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
const nConsumers = 2
in := make(chan int, 10)
p := Producer{&in}
c := Consumer{&in, make(chan int, nConsumers)}
go p.Produce()
ctx, cancelFunc := context.WithCancel(context.Background())
go c.Consume(ctx)
wg := &sync.WaitGroup{}
wg.Add(nConsumers)
for i := 1; i <= nConsumers; i++ {
go c.Work(wg, i)
}
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
cancelFunc()
wg.Wait()
}
type Consumer struct {
in *chan int
jobs chan int
}
func (c Consumer) Work(wg *sync.WaitGroup, i int) {
defer wg.Done()
for job := range c.jobs {
fmt.Printf("Worker #%d start job %d\n", i, job)
time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
fmt.Printf("Worker #%d finish job %d\n", i, job)
}
fmt.Printf("Worker #%d interrupted\n", i)
}
func (c Consumer) Consume(ctx context.Context) {
for {
select {
case job := <-*c.in:
c.jobs <- job
case <-ctx.Done():
close(c.jobs)
fmt.Println("Consumer close channel")
return
}
}
}
type Producer struct {
in *chan int
}
func (p Producer) Produce() {
task := 1
for {
*p.in <- task
fmt.Printf("Send value %d\n", task)
task++
time.Sleep(time.Millisecond * 500)
}
}
英文:
I want get application producer-consumer(with shutdown by signals) with Go.
A producer constantly generate messages in a queue with a limit of 10.
Some consumers read and process this channel.
If the number of messages in the queue is 0, the producer again generates 10 messages.
When a stop signal is received, the producer stops generating new messages, and the consumers process everything that is in the channel.
I found a code, but can't understand if it works correctly because found strange things:
- Why, after stopping the program, not all messages from the queue were processed, it seems that part of the data was lost. (in the screenshot, 15 messages were sent, but 5 were processed)
- How to correctly limit the queue to a limit of 10 messages, that is, we must write 10 messages, wait for processing when the counter of queue becomes 0 and write 10 again?
- Is it possible inform the producer after stop signal so that he no longer generates new messages to the channel? (In the screenshot, the producer managed to write to the queue - 12,13,14,15)
Result:
Code example:
package main
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
const nConsumers = 2
in := make(chan int, 10)
p := Producer{&in}
c := Consumer{&in, make(chan int, nConsumers)}
go p.Produce()
ctx, cancelFunc := context.WithCancel(context.Background())
go c.Consume(ctx)
wg := &sync.WaitGroup{}
wg.Add(nConsumers)
for i := 1; i <= nConsumers; i++ {
go c.Work(wg, i)
}
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
cancelFunc()
wg.Wait()
}
type Consumer struct {
in *chan int
jobs chan int
}
func (c Consumer) Work(wg *sync.WaitGroup, i int) {
defer wg.Done()
for job := range c.jobs {
fmt.Printf("Worker #%d start job %d\n", i, job)
time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
fmt.Printf("Worker #%d finish job %d\n", i, job)
}
fmt.Printf("Worker #%d interrupted\n", i)
}
func (c Consumer) Consume(ctx context.Context) {
for {
select {
case job := <-*c.in:
c.jobs <- job
case <-ctx.Done():
close(c.jobs)
fmt.Println("Consumer close channel")
return
}
}
}
type Producer struct {
in *chan int
}
func (p Producer) Produce() {
task := 1
for {
*p.in <- task
fmt.Printf("Send value %d\n", task)
task++
time.Sleep(time.Millisecond * 500)
}
}
答案1
得分: 1
这是因为当ctx
完成时,(Consumer).Consume
停止从in
通道读取,但由go p.Produce()
创建的goroutine仍然向in
通道写入。
下面的演示修复了这个问题并简化了源代码。
注意:
-
Produce
在ctx
完成时停止,并关闭in
通道。 -
Consumer
中的jobs
字段被移除,工作线程直接从in
通道读取。 -
忽略了以下要求,因为它很奇怪。常见的行为是,当产生一个作业时,如果
in
通道不满,作业将立即发送到in
通道;当通道满时,发送操作将阻塞,直到从in
通道读取一个作业。如果队列中的消息数为0,则生产者再次生成10条消息
package main
import (
"context"
"fmt"
"math/rand"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
const nConsumers = 2
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
in := make(chan int, 10)
p := Producer{in}
c := Consumer{in}
go p.Produce(ctx)
var wg sync.WaitGroup
wg.Add(nConsumers)
for i := 1; i <= nConsumers; i++ {
go c.Work(&wg, i)
}
<-ctx.Done()
fmt.Printf("\n收到结束信号,等待%d个作业完成\n", len(in))
wg.Wait()
}
type Consumer struct {
in chan int
}
func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
defer wg.Done()
for job := range c.in {
fmt.Printf("工人 #%d 开始作业 %d\n", i, job)
time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
fmt.Printf("工人 #%d 完成作业 %d\n", i, job)
}
fmt.Printf("工人 #%d 被中断\n", i)
}
type Producer struct {
in chan int
}
func (p *Producer) Produce(ctx context.Context) {
task := 1
for {
select {
case p.in <- task:
fmt.Printf("发送值 %d\n", task)
task++
time.Sleep(time.Millisecond * 500)
case <-ctx.Done():
close(p.in)
return
}
}
}
英文:
> Why, after stopping the program, not all messages from the queue were processed, it seems that part of the data was lost.
That's because when the ctx
is done, (Consumer).Consume
stops reading from the in
channel, but the goroutine created by go p.Produce()
still writes to the in
channel.
The demo below fixes this issue and simplify the source code.
Notes:
-
Produce
stops when thectx
is done. And it closes thein
channel. -
The field
jobs
is removed from theConsumer
and the workers read from thein
channel directly. -
The following requirement is ignored because it's weird. A common behavior is when a job is produced, and the
in
channel is not full, the job will be sent to thein
channel immediately; when it's full, the send operation will block until a job is read from thein
channel.> If the number of messages in the queue is 0, the producer again generates 10 messages
package main
import (
"context"
"fmt"
"math/rand"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
const nConsumers = 2
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
in := make(chan int, 10)
p := Producer{in}
c := Consumer{in}
go p.Produce(ctx)
var wg sync.WaitGroup
wg.Add(nConsumers)
for i := 1; i <= nConsumers; i++ {
go c.Work(&wg, i)
}
<-ctx.Done()
fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
wg.Wait()
}
type Consumer struct {
in chan int
}
func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
defer wg.Done()
for job := range c.in {
fmt.Printf("Worker #%d start job %d\n", i, job)
time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
fmt.Printf("Worker #%d finish job %d\n", i, job)
}
fmt.Printf("Worker #%d interrupted\n", i)
}
type Producer struct {
in chan int
}
func (p *Producer) Produce(ctx context.Context) {
task := 1
for {
select {
case p.in <- task:
fmt.Printf("Send value %d\n", task)
task++
time.Sleep(time.Millisecond * 500)
case <-ctx.Done():
close(p.in)
return
}
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论