英文:
How to timeout a semaphore
问题
在Go语言中,可以使用通道来实现信号量(Semaphore):
以下是一个示例:
https://sites.google.com/site/gopatterns/concurrency/semaphores
背景:
我们有几百台服务器,存在一些共享资源,我们希望限制对这些资源的访问。因此,对于给定的资源,我们希望使用信号量来限制这些服务器的并发访问量为5个。为了实现这一点,我们计划使用一个锁服务器。当一台机器访问资源时,它首先会向锁服务器注册,以一个键表示它正在访问该资源。然后,在完成访问后,它会向锁服务器发送另一个请求,表示它已完成并释放信号量。这样可以确保我们将对这些资源的访问限制为最大并发访问数。
问题: 如果出现问题,如何优雅地处理?
问题: 如何在信号量上实现超时?
示例:
假设我有一个大小为5的信号量。同时有10个进程尝试在信号量中获取锁,因此只有5个进程能够获取锁。
有时,进程会在不响应的情况下终止(真正的原因有点复杂,但基本上有时进程可能无法解锁),这会导致问题,因为信号量中的一个空间现在永久被锁定。
因此,我希望在这个过程中设置一个超时。以下是一些问题:
这些进程的运行时间从2秒到60分钟不等。
我们存在一些竞态条件,因为如果超时后进程尝试解锁,那么我们就解锁了两次而不是一次。反之亦然,我们先解锁,然后超时。
如何将上述建议的模式转化为具有超时功能的线程安全信号量?
英文:
Semaphore in Go is implemented with a channel:
An example is this:
https://sites.google.com/site/gopatterns/concurrency/semaphores
Context:
We have a few hundred servers and there are shared resources that we want to limit access to. So for a given resource, we want to use a semaphore to limit access to only 5 concurrent access by those servers. In order to do that, we are planning to use a lock server. When a machine accesses the resource, it will first register with the lock server that it is accessing the resource by a key. And then when it is done, it will send another request to the lock server to say that its done and release the semaphore. This ensures that we limit access to those resources to a maximal number of concurrent access.
Problem: Want to handle this gracefully if something goes wrong.
Question:
How do you go about implementing a timeout on the semaphore?
Example:
Let's say I have a semaphore size of 5. There are simultaneously 10 processes trying to acquire a lock in the semaphore so in this case only 5 will acquire it.
Sometimes, processes will die without responding (the real reason is a bit complicated to explain, but basically sometimes the process might not unlock it) so that causes a problem as a space in the semaphore is now permanently locked.
So I would like to have a timeout on this. Here are some issues:
The processes will run from anywhere between 2 seconds up to 60 minutes.
We have some race conditions, because if it times out and then the process tries to unlock it, then we have unlocked the semaphore twice instead of once. And vice versa, we unlock it first and then it times out.
How do I take the suggested pattern posted above and turn this into a thread-safe semaphore with timeouts?
答案1
得分: 1
这是你要翻译的内容:
理解你想要实现的目标有点困难,但从我所了解的情况来看,你想要并发地让goroutine访问一个共享资源,并在出现问题时进行优雅处理。我有几个建议可以帮助你处理这个问题。
1)使用sync包中的WaitGroup:http://golang.org/pkg/sync/#example_WaitGroup
使用这种策略,你可以在每次调用新的goroutine之前增加一个计数器,并使用defer确保它从计数器中减去(这样无论是超时还是其他原因返回,都会从计数器中减去)。然后使用wg.Wait()
命令来确保在所有goroutine返回之前不会继续执行。这里有一个示例:http://play.golang.org/p/wnm24TcBZg 请注意,如果没有wg.Wait()
,它将在主函数返回并终止之前不会等待goroutine完成。
2)使用time.Ticker自动超时:http://golang.org/pkg/time/#Ticker
这种方法基本上会设置一个定时器,在设定的时间间隔后触发。你可以使用这个定时器来控制基于时间的事件。基本上,这个定时器需要在一个循环中运行,等待通道接收到一个tick,就像这个示例中所示:http://play.golang.org/p/IHeqmiFBSS
再次强调,我不完全确定你想要实现什么,但你可以考虑将这两种方法结合起来,这样如果你的进程超时并进入循环,定时器将捕捉到它,并在设定的时间后返回,并调用defer中的wg.Done()
函数,以便等待它的代码继续执行。希望这些建议对你有所帮助。
英文:
It's a little difficult to figure out what you're trying to accomplish, but from what I can tell, you're trying to have concurrent goroutines access a shared resource and handle it gracefully if something doesn't go well. I have a couple suggestions on how you could handle this.
- Use a WaitGroup from the sync package: http://golang.org/pkg/sync/#example_WaitGroup
With this strategy you basically add to a counter before each call to a new goroutine and use a defer to make sure it removes from the counter (so whether it times out or returns for another reason, it will still remove from the counter). Then you use a <code>wg.Wait()</code> command to make sure it doesn't go any further until all go routines have been returned. Here is an example: http://play.golang.org/p/wnm24TcBZg Note that without the <code>wg.Wait()</code> it will not wait on the go routines to finish before returning from main and terminating.
- Use a time.Ticker to auto time out: http://golang.org/pkg/time/#Ticker
This approach will basically set a timer which will fire off at a set interval. You can use this timer to control time based events. Basically this has to run in a for loop that waits for the channel to be fed a tick, like in this example: http://play.golang.org/p/IHeqmiFBSS
Again, not entirely sure what you're trying to accomplish, but you may consider combining these two approaches so that if your process times out and sits in a loop the ticker will catch it and return after a set amount of time and call the defer <code>wg.Done()</code> function so that the part of the code thats waiting on it moves on. Hope this was at least a little helpful.
答案2
得分: 1
由于您正在创建一个分布式锁服务,我假设您的锁服务器在一个端口上进行监听,并且当您接受一个连接时,会在每个连接的goroutine中循环等待命令。当套接字断开连接时,该goroutine会退出(例如:远程节点崩溃)。
所以,假设这是正确的,您可以做一些事情。
1)创建一个通道,其深度与并发锁的数量相匹配。
2)当您锁定时,向通道发送一条消息(如果通道已满,则会阻塞)。
3)当您解锁时,只需从通道中读取一条消息。
4)您可以使用“defer release()”(其中release会消耗一条消息,如果您已经锁定)。
这是一个粗略的工作示例,除了套接字部分之外。
希望它能有意义。
http://play.golang.org/p/DLOX7m8m6q
package main
import "fmt"
import "time"
type Locker struct {
ch chan int
locked bool
}
func (l *Locker) lock(){
l.ch <- 1
l.locked=true
}
func (l *Locker) unlock() {
if l.locked { // called directly or via defer, make sure we don't unlock if we don't have the lock
l.locked = false // avoid unlocking twice if socket crashes after unlock
<- l.ch
}
}
func dostuff(name string, locker Locker) {
locker.lock()
defer locker.unlock()
fmt.Println(name,"Doing stuff")
time.Sleep(1 * time.Second)
}
func main() {
ch := make(chan int, 2)
go dostuff("1",Locker{ch,false})
go dostuff("2",Locker{ch,false})
go dostuff("3",Locker{ch,false})
go dostuff("4",Locker{ch,false})
time.Sleep(4 * time.Second)
}
英文:
Since you are making a distributed lock service, I assume your lock server listens on a port, and when you accept() a connection you loop, waiting for commands in a goroutine per connection. And that goroutine exits when the socket is dropped (ie: remote node crash)
So, assuming that is true, you can do a couple things.
- create a channel with a depth matching how many concurrent locks
- when you lock, send a message to the channel (it will block if full)
- when you unlock, just read a message from the channel
- you can "defer release()" (where release consumes a message if you have already locked)
Here's a rough working example, all but the socket stuff.
Hopefully it makes sense.
http://play.golang.org/p/DLOX7m8m6q
package main
import "fmt"
import "time"
type Locker struct {
ch chan int
locked bool
}
func (l *Locker) lock(){
l.ch <- 1
l.locked=true
}
func (l *Locker) unlock() {
if l.locked { // called directly or via defer, make sure we don't unlock if we don't have the lock
l.locked = false // avoid unlocking twice if socket crashes after unlock
<- l.ch
}
}
func dostuff(name string, locker Locker) {
locker.lock()
defer locker.unlock()
fmt.Println(name,"Doing stuff")
time.Sleep(1 * time.Second)
}
func main() {
ch := make(chan int, 2)
go dostuff("1",Locker{ch,false})
go dostuff("2",Locker{ch,false})
go dostuff("3",Locker{ch,false})
go dostuff("4",Locker{ch,false})
time.Sleep(4 * time.Second)
}
答案3
得分: 1
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
sem := semaphore.NewWeighted(int64(10))
if err := sem.Acquire(ctx, 1); err != nil {
// - 错误表示超时,否则表示锁定
}
英文:
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
sem := semaphore.NewWeighted(int64(10))
if err := sem.Acquire(ctx, 1); err != nil {
// - error means timeout else lock
}
答案4
得分: 0
一些假设:
- 每次需要大约5个服务器才能通过锁服务器。
- 对该资源的访问时间较短且相似。
使用配额服务器而不是锁服务器。以平均(均值、75分位数等)访问/锁定时间的5倍速率补充配额(一个简单的计数器)。只有在配额小于最大值时才补充配额。这样平均而言,您将维持大约5个并发访问/锁定。
一些高级功能:
- 如果共享资源能够检测到自身的负载,它可以告诉配额服务器它可以承受更多或更少的并发访问。
- 当服务器完成任务时,可以向配额服务器发送ping请求。这不是必需的,但可以更早地释放资源。
英文:
Some assumptions:
- You need around 5 servers to get past the lock server at a time.
- Access to that resource are shortish and similar in length.
Use a quota server instead of a lock server. replenish the quota (a simple counter) at 5x the average (mean, 75th, etc) access/lock time. Only replenish the quota if it less than max. That way on average you'll maintain about 5 concurrent accesses/locks.
Some advanced features:
- If the shared resource can detect it's own load it could tell the quota server it can take more or fewer concurrent accesses.
- The servers can ping the quota server when they get done. This is not required, but frees up the resource sooner.
答案5
得分: 0
也许这可以帮助,但我认为这个实现太冗长了。我会欣赏对代码的任何建议。
package main
import (
"fmt"
"time"
"math/rand"
"strconv"
)
type Empty interface{}
type Semaphore struct {
dur time.Duration
ch chan Empty
}
func NewSemaphore(max int, dur time.Duration) (sem *Semaphore) {
sem = new(Semaphore)
sem.dur = dur
sem.ch = make(chan Empty, max)
return
}
type Timeout struct{}
type Work struct{}
var null Empty
var timeout Timeout
var work Work
var global = time.Now()
func (sem *Semaphore) StartJob(id int, job func()) {
sem.ch <- null
go func() {
ch := make(chan interface{})
go func() {
time.Sleep(sem.dur)
ch <- timeout
}()
go func() {
fmt.Println("Job ", strconv.Itoa(id), " is started", time.Since(global))
job()
ch <- work
}()
switch (<-ch).(type) {
case Timeout:
fmt.Println("Timeout for job ", strconv.Itoa(id), time.Since(global))
case Work:
fmt.Println("Job ", strconv.Itoa(id), " is finished", time.Since(global))
}
<-sem.ch
}()
}
func main() {
rand.Seed(time.Now().Unix())
sem := NewSemaphore(3, 3*time.Second)
for i := 0; i < 10; i++ {
id := i
go sem.StartJob(i, func() {
seconds := 2 + rand.Intn(5)
fmt.Println("For job ", strconv.Itoa(id), " was allocated ", seconds, " secs")
time.Sleep(time.Duration(seconds) * time.Second)
})
}
time.Sleep(30 * time.Second)
}
这是一个使用信号量实现的并发作业调度程序。它创建了一个Semaphore
结构体,用于控制并发作业的数量。StartJob
方法用于启动一个作业,并在作业完成或超时后打印相应的消息。main
函数创建了一个Semaphore
实例,并启动了10个作业。每个作业分配一个随机的执行时间,并在完成后打印相应的消息。整个程序运行30秒后结束。
英文:
Maybe this helps, but I think this implementation is too expansive
I will appreciate any suggestions about the code.
package main
import (
"fmt"
"time"
"math/rand"
"strconv"
)
type Empty interface{}
type Semaphore struct {
dur time.Duration
ch chan Empty
}
func NewSemaphore(max int, dur time.Duration) (sem *Semaphore) {
sem = new(Semaphore)
sem.dur = dur
sem.ch = make(chan Empty, max)
return
}
type Timeout struct{}
type Work struct{}
var null Empty
var timeout Timeout
var work Work
var global = time.Now()
func (sem *Semaphore) StartJob(id int, job func()) {
sem.ch <- null
go func() {
ch := make(chan interface{})
go func() {
time.Sleep(sem.dur)
ch <- timeout
}()
go func() {
fmt.Println("Job ", strconv.Itoa(id), " is started", time.Since(global))
job()
ch <- work
}()
switch (<-ch).(type) {
case Timeout:
fmt.Println("Timeout for job ", strconv.Itoa(id), time.Since(global))
case Work:
fmt.Println("Job ", strconv.Itoa(id), " is finished", time.Since(global))
}
<-sem.ch
}()
}
func main() {
rand.Seed(time.Now().Unix())
sem := NewSemaphore(3, 3*time.Second)
for i := 0; i < 10; i++ {
id := i
go sem.StartJob(i, func() {
seconds := 2 + rand.Intn(5)
fmt.Println("For job ", strconv.Itoa(id), " was allocated ", seconds, " secs")
time.Sleep(time.Duration(seconds) * time.Second)
})
}
time.Sleep(30 * time.Second)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论