英文:
Multiple concurrent dynamic locks and timeouts if failure to acquire locks
问题
我有一个使用案例,需要锁定函数的参数。
函数本身可以同时访问。
函数签名类似于:
func (m objectType) operate(key string) (bool) {
// 在“key”上获取锁(如果在X毫秒内无法获取锁,则返回false-例如:100毫秒)
// 操作
// 释放“key”上的锁
return true;
}
可以锁定的数据空间在数百万范围内(约1000万)。
对operate()的并发访问在数千范围内(1-5k)。
尽管可能在关键字的热点情况下存在竞争,但预期的争用是低的(因此需要锁)。
如何正确实现这一点?我探索了几个选项,使用并发哈希映射:
- sync.Map - 这适用于只有追加条目且读写比高的情况。因此在这里不适用。
- 分片哈希映射,其中每个分片由RWMutex锁定 - https://github.com/orcaman/concurrent-map - 虽然这可以工作,但并发性受到分片数量的限制,而不是实际键之间的争用。而且,它不能在出现对一部分键的大量争用时启用超时场景。
尽管超时是P1要求,但P0要求是通过细粒度锁定(如果可能)来提高吞吐量。
有没有好的方法来实现这一点?
英文:
I have a use case where I need to lock on arguments of a function.
The function itself can be accessed concurrently
Function signature is something like
func (m objectType) operate(key string) (bool) {
// get lock on "key" (return false if unable to get lock in X ms - eg: 100 ms)
// operate
// release lock on "key"
return true;
}
The data space which can be locked is in the range of millions (~10 million)
Concurrent access to operate() is in the range of thousands (1 - 5k)
Expected contention is low though possible in case of hotspots in key (hence the lock)
What is the right way to implement this ? Few options I explored using a concurrent hash map
- sync.Map - this is suited for cases with append only entries and high read ratio compared to writes. Hence not applicable here
- sharded hashmap where each shard is locked by RWMutex - https://github.com/orcaman/concurrent-map - While this would work, concurrency is limited by no of shards rather than actual contention between keys. Also doesn't enable the timeout scenarios when lot of contention happens for a subset of keys
Though timeout is a P1 requirement, the P0 requirement would be to increase throughput here by granular locking if possible.
Is there a good way to achieve this ?
答案1
得分: 2
我会使用一个带有缓冲通道的映射来实现:
- 要获取一个“互斥锁”,尝试向缓冲通道中填充一个值
- 执行工作
- 完成后,清空缓冲通道,以便另一个goroutine可以使用它
示例代码如下:
package main
import (
"fmt"
"sync"
"time"
)
type MutexMap struct {
mut sync.RWMutex // 处理chanMap的并发访问
chanMap map[int](chan bool) // 动态互斥锁映射
}
func NewMutextMap() *MutexMap {
var mut sync.RWMutex
return &MutexMap{
mut: mut,
chanMap: make(map[int](chan bool)),
}
}
// 获取锁,带有超时
func (mm *MutexMap) Lock(id int, timeout time.Duration) error {
// 获取全局锁以从映射中读取并获取通道
mm.mut.Lock()
if _, ok := mm.chanMap[id]; !ok {
mm.chanMap[id] = make(chan bool, 1)
}
ch := mm.chanMap[id]
mm.mut.Unlock()
// 尝试向缓冲通道写入,带有超时
select {
case ch <- true:
return nil
case <-time.After(timeout):
return fmt.Errorf("正在处理 %v 的操作超时", id)
}
}
// 释放锁
func (mm *MutexMap) Release(id int) {
mm.mut.Lock()
ch := mm.chanMap[id]
mm.mut.Unlock()
<-ch
}
func work(id int, mm *MutexMap) {
// 带超时获取锁
if err := mm.Lock(id, 100*time.Millisecond); err != nil {
fmt.Printf("错误:%s\n", err)
return
}
fmt.Printf("正在处理任务 %v\n", id)
// 做一些工作...
time.Sleep(time.Second)
fmt.Printf("完成任务 %v\n", id)
// 释放锁
mm.Release(id)
}
func main() {
mm := NewMutextMap()
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
id := i % 10
go func(id int, mm *MutexMap, wg *sync.WaitGroup) {
work(id, mm)
defer wg.Done()
}(id, mm, &wg)
}
wg.Wait()
}
编辑:不同版本,我们还处理了对chanMap本身的并发访问。
英文:
I would do it by using a map of buffered channels:
- to acquire a "mutex", try to fill a buffered channel with a value
- work
- when done, empty the buffered channel so that another goroutine can use it
Example:
package main
import (
"fmt"
"sync"
"time"
)
type MutexMap struct {
mut sync.RWMutex // handle concurrent access of chanMap
chanMap map[int](chan bool) // dynamic mutexes map
}
func NewMutextMap() *MutexMap {
var mut sync.RWMutex
return &MutexMap{
mut: mut,
chanMap: make(map[int](chan bool)),
}
}
// Acquire a lock, with timeout
func (mm *MutexMap) Lock(id int, timeout time.Duration) error {
// get global lock to read from map and get a channel
mm.mut.Lock()
if _, ok := mm.chanMap[id]; !ok {
mm.chanMap[id] = make(chan bool, 1)
}
ch := mm.chanMap[id]
mm.mut.Unlock()
// try to write to buffered channel, with timeout
select {
case ch <- true:
return nil
case <-time.After(timeout):
return fmt.Errorf("working on %v just timed out", id)
}
}
// release lock
func (mm *MutexMap) Release(id int) {
mm.mut.Lock()
ch := mm.chanMap[id]
mm.mut.Unlock()
<-ch
}
func work(id int, mm *MutexMap) {
// acquire lock with timeout
if err := mm.Lock(id, 100*time.Millisecond); err != nil {
fmt.Printf("ERROR: %s\n", err)
return
}
fmt.Printf("working on task %v\n", id)
// do some work...
time.Sleep(time.Second)
fmt.Printf("done working on %v\n", id)
// release lock
mm.Release(id)
}
func main() {
mm := NewMutextMap()
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
id := i % 10
go func(id int, mm *MutexMap, wg *sync.WaitGroup) {
work(id, mm)
defer wg.Done()
}(id, mm, &wg)
}
wg.Wait()
}
EDIT: different version, where we also handle the concurrent access to the chanMap itself
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论