英文:
Golang - to perform X task in N unit of time
问题
我正在尝试模拟一个程序,它需要在N秒内执行X个任务,并且丢弃其他的工作请求。
我尝试在一个无限循环中使用timer
和select
,当接收到<-timer.C
的值时,我会继续执行受保护的切片中的所有数据的模拟任务,并重新设置timer
的初始持续时间。
以下是代码:
type Pool struct {
// 池中允许的最大项目数
maxSize int
// 用于保存数据的队列
queue []interface{}
// 池的时间窗口
time time.Duration
// 批处理的定时器
timer *time.Timer
// 通道
ch chan interface{}
}
func NewPool(maxSize int, t int32) *Pool {
p := &Pool{
maxSize: maxSize,
queue: make([]interface{}, maxSize),
time: time.Duration(t * int32(time.Second)),
timer: time.NewTimer(time.Duration(t) * time.Second),
ch: make(chan interface{}),
}
go p.Schedule()
return p
}
func (p *Pool) Add(ele interface{}) {
p.ch <- ele
}
func (p *Pool) Schedule() {
for {
select {
case <-p.timer.C:
fmt.Println("时间到了")
p.queue = make([]interface{}, 0)
p.timer.Reset(p.time)
p.flush()
case data := <-p.ch:
if len(p.queue) < p.maxSize {
fmt.Println("添加中")
p.queue = append(p.queue, data)
}
if !p.timer.Stop() {
<-p.timer.C
}
p.queue = make([]interface{}, 0)
}
}
}
func (p *Pool) flush() {
for _, t := range p.queue {
fmt.Println("在这里模拟一些工作", t)
time.Sleep(500 * time.Millisecond)
}
}
func main() {
p := NewPool(5, 20)
for i := 0; i < 10000; i++ {
p.Add("xyz " + fmt.Sprint(i))
}
}
但是这个代码并没有按照预期工作,我在这里漏掉了一些东西,你能指导我使用哪种并发模式来满足这个需求吗?谢谢。
英文:
I am trying to simulate a program it need to perform X task in tasks in N seconds and discard other requests of work
I am trying to use timer
with select in an infinite loop when the value on <-timer.C
is received, I am continuing the simulated task with all data in the slice which is guarded against a maxLimit
and reseting the timer
again to initial duration
Here is the code
type Pool struct {
// The maximum number of items allowed in pool
maxSize int
//the queue to hold the data
queue []interface{}
// time window for this pool
time time.Duration
//timer for the batch
timer *time.Timer
//channel
ch chan interface{}
}
func NewPool(maxSize int, t int32) *Pool {
p := &Pool{
maxSize: maxSize,
size: 0,
queue: make([]interface{}, maxSize),
time: time.Duration(t * int32(time.Second)),
timer: time.NewTimer(time.Duration(t) * time.Second),
ch: make(chan interface{}),
}
go p.Schedule()
return p
}
func (p *Pool) Add(ele interface{}) {
p.ch <- ele
}
func (p *Pool) Schedule() {
for {
select {
case <-p.timer.C:
fmt.Println("Time is over")
p.queue = make([]interface{}, 0)
p.timer.Reset(p.time)
p.flush()
case data := <-p.ch:
if len(p.queue) < p.maxSize {
fmt.Println("Addding")
p.queue = append(p.queue, data)
}
//p.flush()
if !p.timer.Stop() {
<-p.timer.C
}
p.queue = make([]interface{}, 0)
}
}
}
func (p *Pool) flush() {
for _, t := range p.queue {
fmt.Println("simulate some work here", t)
time.Sleep(500 * time.Millisecond)
}
}
func main() {
p := NewPool(5, 20)
for i := 0; i < 10000; i++ {
p.Add("xyz " + fmt.Sprint(i))
}
}
But this is not working as expected I am missing few things here, can you guide me to the concurrency pattern which I can use for this requirement, thanks
答案1
得分: 1
添加一些额外的输出可以帮助你定位问题:
case data := <-p.ch:
fmt.Println("Got Data", len(p.queue), p.maxSize)
if len(p.queue) < p.maxSize {
p.queue = append(p.queue, data)
}
if !p.timer.Stop() {
fmt.Println("Draining")
<-p.timer.C
}
p.queue = make([]interface{}, 0)
}
运行这个修改后的代码,输出结果如下:
Got Data 5 5
Got Data 0 5
Addding
Draining
因此,两条消息被处理了(第一条消息不会输出 Adding
,因为 len(p.queue)
是 5;这是因为你用大小为 5 的切片进行了初始化 - make([]interface{}, maxSize)
)。考虑一下当接收到一条消息时代码在做什么:
- 处理
queue
- 停止计时器
- 重新创建
queue
现在从 timer.Stop()
的文档 中来看:
>Stop 阻止计时器触发。如果调用停止了计时器,返回 true;如果计时器已经过期或已经停止,返回 false。
在第一次迭代中,这个操作是有效的(计时器被停止,如果需要,通道被清空)。然而,在停止计时器后,你没有重置计时器,所以在第二次迭代中,当你调用 p.timer.Stop()
时,计时器已经被停止。这意味着 Stop()
将返回 false
(计时器已经停止!),然后你尝试清空通道(但由于计时器已经停止,这将永远阻塞)。
如何修复这个问题取决于你的目标;我猜你可能想要重置计时器?如果不是的话,你可以尝试像这样修改代码:
if p.timer.C != nil && !p.timer.Stop() {
fmt.Println("Draining")
<-p.timer.C
}
p.timer.C = nil // 计时器已经停止(nil 通道将永远阻塞)
英文:
Adding some additional output can help you locate the issue:
case data := <-p.ch:
fmt.Println("Got Data", len(p.queue), p.maxSize)
if len(p.queue) < p.maxSize {
p.queue = append(p.queue, data)
}
if !p.timer.Stop() {
fmt.Println("Draining")
<-p.timer.C
}
p.queue = make([]interface{}, 0)
}
Running with that change the output is:
Got Data 5 5
Got Data 0 5
Addding
Draining
So two messages are being processed (the first will not output Adding
because len(p.queue)
is 5; this is because you initialise it with a size of five - make([]interface{}, maxSize)
). Consider what the code is doing when a message is received:
- Deal with the
queue
- Stop the timer
- Remake the
queue
Now from the docs for timer.Stop()
:
>Stop prevents the Timer from firing. It returns true if the call stops the timer, false if the timer has already expired or been stopped.
On the first iteration this works (the timer is stopped and, if necessary, the channel drained). However you do not reset the timer after stopping it so on the second iteration the timer is already stopped when you call p.timer.Stop()
. This means that Stop()
will return false
(the timer is already stopped!) and you try to drain the channel (but as the timer was already stopped this will block forever).
How you fix this depends upon your aim; I suspect that you meant to reset the timer? If not you could do something like this:
if p.timer.C != nil && !p.timer.Stop() {
fmt.Println("Draining")
<-p.timer.C
}
p.timer.C = nil // Timer has been stopped (nil channel will block forever)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论