使用Golang – 在N个时间单位内执行X任务

huangapple go评论89阅读模式
英文:

Golang - to perform X task in N unit of time

问题

我正在尝试模拟一个程序,它需要在N秒内执行X个任务,并且丢弃其他的工作请求。

我尝试在一个无限循环中使用timerselect,当接收到<-timer.C的值时,我会继续执行受保护的切片中的所有数据的模拟任务,并重新设置timer的初始持续时间。

以下是代码:

type Pool struct {
    // 池中允许的最大项目数
    maxSize int
    // 用于保存数据的队列
    queue []interface{}
    // 池的时间窗口
    time time.Duration
    // 批处理的定时器
    timer *time.Timer
    // 通道
    ch chan interface{}
}

func NewPool(maxSize int, t int32) *Pool {
    p := &Pool{
        maxSize: maxSize,
        queue:   make([]interface{}, maxSize),
        time:    time.Duration(t * int32(time.Second)),
        timer:   time.NewTimer(time.Duration(t) * time.Second),
        ch:      make(chan interface{}),
    }
    go p.Schedule()
    return p
}

func (p *Pool) Add(ele interface{}) {
    p.ch <- ele
}

func (p *Pool) Schedule() {
    for {
        select {
        case <-p.timer.C:
            fmt.Println("时间到了")
            p.queue = make([]interface{}, 0)
            p.timer.Reset(p.time)
            p.flush()
        case data := <-p.ch:
            if len(p.queue) < p.maxSize {
                fmt.Println("添加中")
                p.queue = append(p.queue, data)
            }
            if !p.timer.Stop() {
                <-p.timer.C
            }
            p.queue = make([]interface{}, 0)
        }
    }
}

func (p *Pool) flush() {
    for _, t := range p.queue {
        fmt.Println("在这里模拟一些工作", t)
        time.Sleep(500 * time.Millisecond)
    }
}

func main() {
    p := NewPool(5, 20)
    for i := 0; i < 10000; i++ {
        p.Add("xyz " + fmt.Sprint(i))
    }
}

但是这个代码并没有按照预期工作,我在这里漏掉了一些东西,你能指导我使用哪种并发模式来满足这个需求吗?谢谢。

英文:

I am trying to simulate a program it need to perform X task in tasks in N seconds and discard other requests of work

I am trying to use timer with select in an infinite loop when the value on &lt;-timer.C is received, I am continuing the simulated task with all data in the slice which is guarded against a maxLimit and reseting the timer again to initial duration

Here is the code

     type Pool struct {
// The maximum number of items allowed in pool
maxSize int
//the queue to hold the data
queue []interface{}
// time window for this pool
time time.Duration
//timer for the batch
timer *time.Timer
//channel
ch chan interface{}
}
func NewPool(maxSize int, t int32) *Pool {
p := &amp;Pool{
maxSize: maxSize,
size:    0,
queue:   make([]interface{}, maxSize),
time:    time.Duration(t * int32(time.Second)),
timer:   time.NewTimer(time.Duration(t) * time.Second),
ch:      make(chan interface{}),
}
go p.Schedule()
return p
}
func (p *Pool) Add(ele interface{}) {
p.ch &lt;- ele
}
func (p *Pool) Schedule() {
for {
select {
case &lt;-p.timer.C:
fmt.Println(&quot;Time is over&quot;)
p.queue = make([]interface{}, 0)
p.timer.Reset(p.time)
p.flush()
case data := &lt;-p.ch:
if len(p.queue) &lt; p.maxSize {
fmt.Println(&quot;Addding&quot;)
p.queue = append(p.queue, data)
}
//p.flush()
if !p.timer.Stop() {
&lt;-p.timer.C
}
p.queue = make([]interface{}, 0)
}
}
}
func (p *Pool) flush() {
for _, t := range p.queue {
fmt.Println(&quot;simulate some work here&quot;, t)
time.Sleep(500 * time.Millisecond)
}
}
func main() {
p := NewPool(5, 20)
for i := 0; i &lt; 10000; i++ {
p.Add(&quot;xyz &quot; + fmt.Sprint(i))
}
}

But this is not working as expected I am missing few things here, can you guide me to the concurrency pattern which I can use for this requirement, thanks

答案1

得分: 1

添加一些额外的输出可以帮助你定位问题:

case data := <-p.ch:
	fmt.Println("Got Data", len(p.queue), p.maxSize)
	if len(p.queue) < p.maxSize {
		p.queue = append(p.queue, data)
	}
	if !p.timer.Stop() {
		fmt.Println("Draining")
		<-p.timer.C
	}
	p.queue = make([]interface{}, 0)
}

运行这个修改后的代码,输出结果如下:

Got Data 5 5
Got Data 0 5
Addding
Draining

因此,两条消息被处理了(第一条消息不会输出 Adding,因为 len(p.queue) 是 5;这是因为你用大小为 5 的切片进行了初始化 - make([]interface{}, maxSize))。考虑一下当接收到一条消息时代码在做什么:

  1. 处理 queue
  2. 停止计时器
  3. 重新创建 queue

现在从 timer.Stop() 的文档 中来看:

>Stop 阻止计时器触发。如果调用停止了计时器,返回 true;如果计时器已经过期或已经停止,返回 false。

在第一次迭代中,这个操作是有效的(计时器被停止,如果需要,通道被清空)。然而,在停止计时器后,你没有重置计时器,所以在第二次迭代中,当你调用 p.timer.Stop() 时,计时器已经被停止。这意味着 Stop() 将返回 false(计时器已经停止!),然后你尝试清空通道(但由于计时器已经停止,这将永远阻塞)。

如何修复这个问题取决于你的目标;我猜你可能想要重置计时器?如果不是的话,你可以尝试像这样修改代码:

if p.timer.C != nil && !p.timer.Stop() {
	fmt.Println("Draining")
	<-p.timer.C
}
p.timer.C = nil // 计时器已经停止(nil 通道将永远阻塞)
英文:

Adding some additional output can help you locate the issue:

case data := &lt;-p.ch:
fmt.Println(&quot;Got Data&quot;, len(p.queue), p.maxSize)
if len(p.queue) &lt; p.maxSize {
p.queue = append(p.queue, data)
}
if !p.timer.Stop() {
fmt.Println(&quot;Draining&quot;)
&lt;-p.timer.C
}
p.queue = make([]interface{}, 0)
}

Running with that change the output is:

Got Data 5 5
Got Data 0 5
Addding
Draining

So two messages are being processed (the first will not output Adding because len(p.queue) is 5; this is because you initialise it with a size of five - make([]interface{}, maxSize)). Consider what the code is doing when a message is received:

  1. Deal with the queue
  2. Stop the timer
  3. Remake the queue

Now from the docs for timer.Stop():

>Stop prevents the Timer from firing. It returns true if the call stops the timer, false if the timer has already expired or been stopped.

On the first iteration this works (the timer is stopped and, if necessary, the channel drained). However you do not reset the timer after stopping it so on the second iteration the timer is already stopped when you call p.timer.Stop(). This means that Stop() will return false (the timer is already stopped!) and you try to drain the channel (but as the timer was already stopped this will block forever).

How you fix this depends upon your aim; I suspect that you meant to reset the timer? If not you could do something like this:

if p.timer.C != nil &amp;&amp; !p.timer.Stop() {
fmt.Println(&quot;Draining&quot;)
&lt;-p.timer.C
}
p.timer.C = nil // Timer has been stopped (nil channel will block forever)

huangapple
  • 本文由 发表于 2022年11月16日 23:40:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/74463309.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定