
huangapple go评论133阅读模式

How to timeout a semaphore






问题: 如果出现问题,如何优雅地处理?

问题: 如何在信号量上实现超时?









Semaphore in Go is implemented with a channel:

An example is this:


We have a few hundred servers and there are shared resources that we want to limit access to. So for a given resource, we want to use a semaphore to limit access to only 5 concurrent access by those servers. In order to do that, we are planning to use a lock server. When a machine accesses the resource, it will first register with the lock server that it is accessing the resource by a key. And then when it is done, it will send another request to the lock server to say that its done and release the semaphore. This ensures that we limit access to those resources to a maximal number of concurrent access.

Problem: Want to handle this gracefully if something goes wrong.


How do you go about implementing a timeout on the semaphore?


Let's say I have a semaphore size of 5. There are simultaneously 10 processes trying to acquire a lock in the semaphore so in this case only 5 will acquire it.

Sometimes, processes will die without responding (the real reason is a bit complicated to explain, but basically sometimes the process might not unlock it) so that causes a problem as a space in the semaphore is now permanently locked.

So I would like to have a timeout on this. Here are some issues:

The processes will run from anywhere between 2 seconds up to 60 minutes.

We have some race conditions, because if it times out and then the process tries to unlock it, then we have unlocked the semaphore twice instead of once. And vice versa, we unlock it first and then it times out.

How do I take the suggested pattern posted above and turn this into a thread-safe semaphore with timeouts?


得分: 1




使用这种策略,你可以在每次调用新的goroutine之前增加一个计数器,并使用defer确保它从计数器中减去(这样无论是超时还是其他原因返回,都会从计数器中减去)。然后使用wg.Wait()命令来确保在所有goroutine返回之前不会继续执行。这里有一个示例:http://play.golang.org/p/wnm24TcBZg 请注意,如果没有wg.Wait(),它将在主函数返回并终止之前不会等待goroutine完成。





It's a little difficult to figure out what you're trying to accomplish, but from what I can tell, you're trying to have concurrent goroutines access a shared resource and handle it gracefully if something doesn't go well. I have a couple suggestions on how you could handle this.

  1. Use a WaitGroup from the sync package: http://golang.org/pkg/sync/#example_WaitGroup

With this strategy you basically add to a counter before each call to a new goroutine and use a defer to make sure it removes from the counter (so whether it times out or returns for another reason, it will still remove from the counter). Then you use a <code>wg.Wait()</code> command to make sure it doesn't go any further until all go routines have been returned. Here is an example: http://play.golang.org/p/wnm24TcBZg Note that without the <code>wg.Wait()</code> it will not wait on the go routines to finish before returning from main and terminating.

  1. Use a time.Ticker to auto time out: http://golang.org/pkg/time/#Ticker

This approach will basically set a timer which will fire off at a set interval. You can use this timer to control time based events. Basically this has to run in a for loop that waits for the channel to be fed a tick, like in this example: http://play.golang.org/p/IHeqmiFBSS

Again, not entirely sure what you're trying to accomplish, but you may consider combining these two approaches so that if your process times out and sits in a loop the ticker will catch it and return after a set amount of time and call the defer <code>wg.Done()</code> function so that the part of the code thats waiting on it moves on. Hope this was at least a little helpful.


得分: 1



4)您可以使用“defer release()”(其中release会消耗一条消息,如果您已经锁定)。


  1. package main
  2. import "fmt"
  3. import "time"
  4. type Locker struct {
  5. ch chan int
  6. locked bool
  7. }
  8. func (l *Locker) lock(){
  9. l.ch <- 1
  10. l.locked=true
  11. }
  12. func (l *Locker) unlock() {
  13. if l.locked { // called directly or via defer, make sure we don't unlock if we don't have the lock
  14. l.locked = false // avoid unlocking twice if socket crashes after unlock
  15. <- l.ch
  16. }
  17. }
  18. func dostuff(name string, locker Locker) {
  19. locker.lock()
  20. defer locker.unlock()
  21. fmt.Println(name,"Doing stuff")
  22. time.Sleep(1 * time.Second)
  23. }
  24. func main() {
  25. ch := make(chan int, 2)
  26. go dostuff("1",Locker{ch,false})
  27. go dostuff("2",Locker{ch,false})
  28. go dostuff("3",Locker{ch,false})
  29. go dostuff("4",Locker{ch,false})
  30. time.Sleep(4 * time.Second)
  31. }

Since you are making a distributed lock service, I assume your lock server listens on a port, and when you accept() a connection you loop, waiting for commands in a goroutine per connection. And that goroutine exits when the socket is dropped (ie: remote node crash)

So, assuming that is true, you can do a couple things.

  1. create a channel with a depth matching how many concurrent locks
  2. when you lock, send a message to the channel (it will block if full)
  3. when you unlock, just read a message from the channel
  4. you can "defer release()" (where release consumes a message if you have already locked)

Here's a rough working example, all but the socket stuff.
Hopefully it makes sense.

  1. package main
  2. import &quot;fmt&quot;
  3. import &quot;time&quot;
  4. type Locker struct {
  5. ch chan int
  6. locked bool
  7. }
  8. func (l *Locker) lock(){
  9. l.ch &lt;- 1
  10. l.locked=true
  11. }
  12. func (l *Locker) unlock() {
  13. if l.locked { // called directly or via defer, make sure we don&#39;t unlock if we don&#39;t have the lock
  14. l.locked = false // avoid unlocking twice if socket crashes after unlock
  15. &lt;- l.ch
  16. }
  17. }
  18. func dostuff(name string, locker Locker) {
  19. locker.lock()
  20. defer locker.unlock()
  21. fmt.Println(name,&quot;Doing stuff&quot;)
  22. time.Sleep(1 * time.Second)
  23. }
  24. func main() {
  25. ch := make(chan int, 2)
  26. go dostuff(&quot;1&quot;,Locker{ch,false})
  27. go dostuff(&quot;2&quot;,Locker{ch,false})
  28. go dostuff(&quot;3&quot;,Locker{ch,false})
  29. go dostuff(&quot;4&quot;,Locker{ch,false})
  30. time.Sleep(4 * time.Second)
  31. }


得分: 1

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
sem := semaphore.NewWeighted(int64(10))

if err := sem.Acquire(ctx, 1); err != nil {
// - 错误表示超时,否则表示锁定

  1. ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
  2. sem := semaphore.NewWeighted(int64(10))
  3. if err := sem.Acquire(ctx, 1); err != nil {
  4. // - error means timeout else lock
  5. }


得分: 0


  • 每次需要大约5个服务器才能通过锁服务器。
  • 对该资源的访问时间较短且相似。



  • 如果共享资源能够检测到自身的负载,它可以告诉配额服务器它可以承受更多或更少的并发访问。
  • 当服务器完成任务时,可以向配额服务器发送ping请求。这不是必需的,但可以更早地释放资源。

Some assumptions:

  • You need around 5 servers to get past the lock server at a time.
  • Access to that resource are shortish and similar in length.

Use a quota server instead of a lock server. replenish the quota (a simple counter) at 5x the average (mean, 75th, etc) access/lock time. Only replenish the quota if it less than max. That way on average you'll maintain about 5 concurrent accesses/locks.

Some advanced features:

  • If the shared resource can detect it's own load it could tell the quota server it can take more or fewer concurrent accesses.
  • The servers can ping the quota server when they get done. This is not required, but frees up the resource sooner.


得分: 0


  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "math/rand"
  6. "strconv"
  7. )
  8. type Empty interface{}
  9. type Semaphore struct {
  10. dur time.Duration
  11. ch chan Empty
  12. }
  13. func NewSemaphore(max int, dur time.Duration) (sem *Semaphore) {
  14. sem = new(Semaphore)
  15. sem.dur = dur
  16. sem.ch = make(chan Empty, max)
  17. return
  18. }
  19. type Timeout struct{}
  20. type Work struct{}
  21. var null Empty
  22. var timeout Timeout
  23. var work Work
  24. var global = time.Now()
  25. func (sem *Semaphore) StartJob(id int, job func()) {
  26. sem.ch <- null
  27. go func() {
  28. ch := make(chan interface{})
  29. go func() {
  30. time.Sleep(sem.dur)
  31. ch <- timeout
  32. }()
  33. go func() {
  34. fmt.Println("Job ", strconv.Itoa(id), " is started", time.Since(global))
  35. job()
  36. ch <- work
  37. }()
  38. switch (<-ch).(type) {
  39. case Timeout:
  40. fmt.Println("Timeout for job ", strconv.Itoa(id), time.Since(global))
  41. case Work:
  42. fmt.Println("Job ", strconv.Itoa(id), " is finished", time.Since(global))
  43. }
  44. <-sem.ch
  45. }()
  46. }
  47. func main() {
  48. rand.Seed(time.Now().Unix())
  49. sem := NewSemaphore(3, 3*time.Second)
  50. for i := 0; i < 10; i++ {
  51. id := i
  52. go sem.StartJob(i, func() {
  53. seconds := 2 + rand.Intn(5)
  54. fmt.Println("For job ", strconv.Itoa(id), " was allocated ", seconds, " secs")
  55. time.Sleep(time.Duration(seconds) * time.Second)
  56. })
  57. }
  58. time.Sleep(30 * time.Second)
  59. }



Maybe this helps, but I think this implementation is too expansive
I will appreciate any suggestions about the code.

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;time&quot;
  5. &quot;math/rand&quot;
  6. &quot;strconv&quot;
  7. )
  8. type Empty interface{}
  9. type Semaphore struct {
  10. dur time.Duration
  11. ch chan Empty
  12. }
  13. func NewSemaphore(max int, dur time.Duration) (sem *Semaphore) {
  14. sem = new(Semaphore)
  15. sem.dur = dur
  16. sem.ch = make(chan Empty, max)
  17. return
  18. }
  19. type Timeout struct{}
  20. type Work struct{}
  21. var null Empty
  22. var timeout Timeout
  23. var work Work
  24. var global = time.Now()
  25. func (sem *Semaphore) StartJob(id int, job func()) {
  26. sem.ch &lt;- null
  27. go func() {
  28. ch := make(chan interface{})
  29. go func() {
  30. time.Sleep(sem.dur)
  31. ch &lt;- timeout
  32. }()
  33. go func() {
  34. fmt.Println(&quot;Job &quot;, strconv.Itoa(id), &quot; is started&quot;, time.Since(global))
  35. job()
  36. ch &lt;- work
  37. }()
  38. switch (&lt;-ch).(type) {
  39. case Timeout:
  40. fmt.Println(&quot;Timeout for job &quot;, strconv.Itoa(id), time.Since(global))
  41. case Work:
  42. fmt.Println(&quot;Job &quot;, strconv.Itoa(id), &quot; is finished&quot;, time.Since(global))
  43. }
  44. &lt;-sem.ch
  45. }()
  46. }
  47. func main() {
  48. rand.Seed(time.Now().Unix())
  49. sem := NewSemaphore(3, 3*time.Second)
  50. for i := 0; i &lt; 10; i++ {
  51. id := i
  52. go sem.StartJob(i, func() {
  53. seconds := 2 + rand.Intn(5)
  54. fmt.Println(&quot;For job &quot;, strconv.Itoa(id), &quot; was allocated &quot;, seconds, &quot; secs&quot;)
  55. time.Sleep(time.Duration(seconds) * time.Second)
  56. })
  57. }
  58. time.Sleep(30 * time.Second)
  59. }

  • 本文由 发表于 2013年10月29日 08:03:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/19647344.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
