Golang:在goroutine中节流(时间延迟)函数不起作用(在主线程中正常工作)

huangapple go评论95阅读模式
英文:

Golang: throttle (time delay) function is not working in goroutine (works fine in main thread)

问题

所以我正在编写一个实用程序来查询工作中的API,他们每10秒限制20个调用。很简单,我只需要将我的调用限制在至少0.5秒的时间间隔内。我的限流实用程序在不使用goroutine时工作正常。

目前我正在使用结构体/方法组合:

func (c *CTKAPI) Throttle() {
    if c.Debug {
        fmt.Println("\t\t\tEntering Throttle()")
    }
    for { //in case something else makes a call while we're sleeping, we need to re-check
        if t := time.Now().Sub(c.LastCallTime); t < c.ThrottleTime {
            if c.Debug {
                fmt.Printf("\t\t\tThrottle: Sleeping %v\n", c.ThrottleTime-t)
            }
            time.Sleep(c.ThrottleTime - t)
        } else {
            if c.Debug {
                fmt.Println("\t\t\tThrottle: Released.")
            }
            break
        }
    }
    c.LastCallTime = time.Now()
    if c.Debug {
        fmt.Println("\t\t\tExiting Throttle()")
    }
}

然后在每个goroutine的每个调用之前调用whatever.Throttle(),以确保在启动下一个调用之前等待至少0.5秒。

但是这似乎不可靠,并且给出了不可预测的结果。有没有更优雅的方法来限制并发请求?

  • Mike
英文:

So I am writing a utility to query an API at work, and they throttle to 20 calls every 10 seconds. Easy, I'll just throttle my calls to at least .5 seconds passed since last call. My Throttle utility worked fine until I tried to use goroutines.

Right now I am using a struct/method combo:

func (c *CTKAPI) Throttle() {
if c.Debug{fmt.Println(&quot;\t\t\tEntering Throttle()&quot;)}
for { //in case something else makes a call while we&#39;re sleeping, we need to re-check
    if t := time.Now().Sub(c.LastCallTime); t &lt; c.ThrottleTime {
        if c.Debug{fmt.Printf(&quot;\t\t\tThrottle: Sleeping %v\n&quot;, c.ThrottleTime - t)}
        time.Sleep(c.ThrottleTime - t)
    } else {
        if c.Debug{fmt.Println(&quot;\t\t\tThrottle: Released.&quot;)}
        break
    }
}
c.LastCallTime = time.Now()
if c.Debug{fmt.Println(&quot;\t\t\tExiting Throttle()&quot;)}

}

And then I call whatever.Throttle() before each call in each goroutine to make sure i've waited at least a half second before launching my next call.

But that seems to be unreliable and gives unpredictable results. Is there a more elegant way of throttling concurrent requests?

-Mike

答案1

得分: 1

因为你引入了数据竞争,多个例程正在访问/更改c.LastCallTime。

你可以使用time.Tick或者将c.LastCallTime改为int64类型(c.LastCallTime = time.Now().Unix()),然后使用atomic.LoadInt64/StoreInt64来检查它。

英文:

Because you're introducing a data race, multiple routines are accessing / changing c.LastCallTime.

You use time.Tick instead or make c.LastCallTime an int64 (c.LastCallTime = time.Now().Unix()) and use atomic.LoadInt64/StoreInt64 to check it.

答案2

得分: 1

这里实际上有一种更简单的方法来实现这个:创建一个时间计时器

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	rateLimit := time.Tick(500 * time.Millisecond)
	<-rateLimit

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			<-rateLimit
			fmt.Println("Hello", i)
			wg.Done()
		}(i)
	}
	wg.Wait()
}
英文:

There is actually a much easier way to do this: create a time ticker.

package main

import (
	&quot;fmt&quot;
    &quot;sync&quot;
    &quot;time&quot;
)

func main() {
    rateLimit := time.Tick(500 * time.Millisecond)
    &lt;-rateLimit

    var wg sync.WaitGroup
    for i := 0; i &lt; 10; i++ {
	    wg.Add(1)
	    go func(i int) {
		    &lt;-rateLimit
		    fmt.Println(&quot;Hello&quot;, i)
		    wg.Done()
	    }(i)
    }
    wg.Wait()
}

答案3

得分: 0

回答我自己的问题,因为昨晚很晚我偶然找到了一些解决方案。
对我来说,最简单的答案是使用sync.Mutex变量来锁定和解锁节流函数,以确保我不会在同一时间意外地触发它。另一个选择是将我的节流服务移动到自己的goroutine函数中(从而消除并发调用),并使用通道进行节流/OK的通信,但对于这个应用程序,Mutex是一个更清晰的解决方案。
以下是简化版本的工作代码,供那些寻找类似解决方案的人参考:

package main

import (
    "fmt"
    "time"
    "sync"
)

type tStruct struct {
    delay time.Duration
    last time.Time
    lock sync.Mutex //this will be our locking variable
}

func (t *tStruct) createT() *tStruct {
    return &tStruct {
        delay: 500*time.Millisecond,
        last: time.Now(),
    }
}

func (t *tStruct) throttle(th int) {
    //we lock our function, and any other routine calling this function will block.
    t.lock.Lock()
    //and we'll defer an unlock, so when we exit the throttle, we'll be ready for another call.
    defer t.lock.Unlock()
    fmt.Printf("\tThread %v Entering Throttle Check.\n", th)
    defer fmt.Printf("\tThread %v Leaving Throttle Check.\n", th)
    for {
        p := time.Now().Sub(t.last)
        if p < t.delay {
            fmt.Printf("\tThread %v Sleeping %v.\n", th, t.delay-p)
            time.Sleep(t.delay-p)
        } else {
            fmt.Printf("\tThread %v No longer Throttled.\n", th)
            t.last = time.Now()
            break
        }
    }
}

func (t *tStruct) worker(rch <-chan string, sch chan<- string, th int) {
    fmt.Printf("Thread %v starting up.\n", th)
    defer fmt.Printf("Thread %v Dead.\n", th)
    sch <- "READY"
    for {
        r := <-rch
        fmt.Printf("Thread %v received %v\n", th, r)
        switch r {
            case "STOP":
                fmt.Printf("Thread %v returning.\n", th)
                sch <- "QUITTING"
                return
            default:
                fmt.Printf("Thread %v processing %v.\n", th, r)
                t.throttle(th)
                fmt.Printf("Thread %v done with %v.\n", th, r)
                sch <- "OK"
        }
    }
}

func main() {
    ts := tStruct{}
    ts.delay = 500*time.Millisecond
    ts.last = time.Now()
    sch := make(chan string)
    rch := make(chan string)
    tC := 3
    tA := 0

    fmt.Println("Starting Threads")
    for i:=1; i<(tC+1); i++ {
        go ts.worker(sch, rch, i)
        r := <-rch
        if r=="READY" {
            tA++
        } else {
            fmt.Println("ERROR not READY")
        }
    }

    fmt.Println("Feeding All Threads")
    for i:=1; i<(tC+1); i++ {
        sch <- "WORK"
    }

    fmt.Println("Listening for threads")
    for tA > 0{
        r := <-rch
        switch r {
            case "QUITTING":
                tA--
                fmt.Println("main received QUITTING")
                continue
            case "OK":
                fmt.Println("main received OK")
                sch <- "STOP"
                continue
            default:
                fmt.Println("Shouldn't be here!!!")
        }
    }
}

希望对你有帮助!

英文:

Answering my own question, as I happened across a few solutions late last night.
The simplest answer for me was to use a sync.Mutex variable to lock and unlock the throttle function to make sure I wasn't accidentally hitting it at the same time. Another option would have been to move my throttling service to be housed in its own goroutine function (thus eliminating concurrent calls) and communicating throttled/OK with channels, but for this application the Mutex was a cleaner solution.
Here is the simplified version of the working code for those looking for a similar solution:

package main
import (
&quot;fmt&quot;
&quot;time&quot;
&quot;sync&quot;
)
type tStruct struct {
delay time.Duration
last time.Time
lock sync.Mutex //this will be our locking variable
}
func (t *tStruct) createT() *tStruct {
return &amp;tStruct {
delay: 500*time.Millisecond,
last: time.Now(),
}
}
func (t *tStruct) throttle(th int) {
//we lock our function, and any other routine calling this function will block.
t.lock.Lock()
//and we&#39;ll defer an unlock, so when we exit the throttle, we&#39;ll be ready for another call.
defer t.lock.Unlock()
fmt.Printf(&quot;\tThread %v Entering Throttle Check.\n&quot;, th)
defer fmt.Printf(&quot;\tThread %v Leaving Throttle Check.\n&quot;, th)
for {
p := time.Now().Sub(t.last)
if p &lt; t.delay {
fmt.Printf(&quot;\tThread %v Sleeping %v.\n&quot;, th, t.delay-p)
time.Sleep(t.delay-p)
} else {
fmt.Printf(&quot;\tThread %v No longer Throttled.\n&quot;, th)
t.last = time.Now()
break
}
}
}
func (t *tStruct) worker(rch &lt;-chan string, sch chan&lt;- string, th int) {
fmt.Printf(&quot;Thread %v starting up.\n&quot;, th)
defer fmt.Printf(&quot;Thread %v Dead.\n&quot;, th)
sch &lt;-&quot;READY&quot;
for {
r := &lt;-rch
fmt.Printf(&quot;Thread %v received %v\n&quot;, th, r)
switch r {
case &quot;STOP&quot;:
fmt.Printf(&quot;Thread %v returning.\n&quot;, th)
sch &lt;-&quot;QUITTING&quot;
return
default:
fmt.Printf(&quot;Thread %v processing %v.\n&quot;, th, r)
t.throttle(th)
fmt.Printf(&quot;Thread %v done with %v.\n&quot;, th, r)
sch &lt;-&quot;OK&quot;
}
}
}
func main() {
ts := tStruct{}
ts.delay = 500*time.Millisecond
ts.last = time.Now()
sch := make(chan string)
rch := make(chan string)
tC := 3
tA := 0
fmt.Println(&quot;Starting Threads&quot;)
for i:=1; i&lt;(tC+1); i++ {
go ts.worker(sch, rch, i)
r := &lt;-rch
if r==&quot;READY&quot; {
tA++
} else {
fmt.Println(&quot;ERROR not READY&quot;)
}
}
fmt.Println(&quot;Feeding All Threads&quot;)
for i:=1; i&lt;(tC+1); i++ {
sch &lt;- &quot;WORK&quot;
}
fmt.Println(&quot;Listening for threads&quot;)
for tA &gt; 0{
r := &lt;-rch
switch r {
case &quot;QUITTING&quot;:
tA--
fmt.Println(&quot;main received QUITTING&quot;)
continue
case &quot;OK&quot;:
fmt.Println(&quot;main received OK&quot;)
sch &lt;-&quot;STOP&quot;
continue
default:
fmt.Println(&quot;Shouldn&#39;t be here!!!&quot;)
}
}
}

答案4

得分: 0

你的新代码更好。如另一个答案中提到的,你遇到了一个竞争条件。Go语言内置了一个竞争检测器 go build -race。这是一个很棒的工具,通过一个良好的单元测试,它可以帮助你找到竞争条件。

我认为你最初的假设有一个缺陷。通过对所有API调用进行节流,你消除了任何突发请求的机会。在你的方案中,每个API调用都会受到延迟的影响,即使它可能不需要。除非你确定每个API调用都会触发限流,否则有更好的方法。

你可以使用 time.NewTicker 来设置一个10秒的定时器,并初始化一个计数器为0。每次API请求时增加计数器的值。如果计数器达到20,就让goroutine休眠,直到定时器触发。当定时器触发时,重置计数器并继续休眠的goroutine。

我一直想编写一个API速率限制器,所以我编写了一个示例代码,你可以在这里看到:
https://github.com/tildeleb/limiter/blob/master/limiter.go

除了示例之外,它没有经过测试。如果有任何反馈,请在GitHub上创建一个issue。

英文:

Your new code is better. As mentioned in another answer you had a race. Go has a built in race detector go build -race. It's an amazing tool and would have found the race for you with a good unit test.

I believe one of your initial assumptions is flawed. By pacing all API calls you eliminate the chance for any bursting. In your scheme every API call takes a latency hit even when it might not have to. Unless you are sure that every API call is going to hit the throttle there is a better way.

Start a time.NewTicker to 10 seconds and initialize a counter to 0. Increment the counter for each API request. If the counter gets to 20 sleep the goroutines until the timer goes off. When the timer goes off reset the counter and continue the sleeping goroutines.

I've always wanted to code an API rate limiter so I coded it up and you can see it here:
https://github.com/tildeleb/limiter/blob/master/limiter.go

Except for the example, it's untested. Any feedback, please create an issue on github.

huangapple
  • 本文由 发表于 2015年2月6日 10:23:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/28357556.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定