多个并发动态锁和超时,如果无法获取锁

huangapple go评论89阅读模式
英文:

Multiple concurrent dynamic locks and timeouts if failure to acquire locks

问题

我有一个使用案例,需要锁定函数的参数。

函数本身可以同时访问。

函数签名类似于:

func (m objectType) operate(key string) (bool) {
    // 在“key”上获取锁(如果在X毫秒内无法获取锁,则返回false-例如:100毫秒)
    // 操作
    // 释放“key”上的锁
    return true;
}

可以锁定的数据空间在数百万范围内(约1000万)。

对operate()的并发访问在数千范围内(1-5k)。

尽管可能在关键字的热点情况下存在竞争,但预期的争用是低的(因此需要锁)。

如何正确实现这一点?我探索了几个选项,使用并发哈希映射:

  1. sync.Map - 这适用于只有追加条目且读写比高的情况。因此在这里不适用。
  2. 分片哈希映射,其中每个分片由RWMutex锁定 - https://github.com/orcaman/concurrent-map - 虽然这可以工作,但并发性受到分片数量的限制,而不是实际键之间的争用。而且,它不能在出现对一部分键的大量争用时启用超时场景。

尽管超时是P1要求,但P0要求是通过细粒度锁定(如果可能)来提高吞吐量。

有没有好的方法来实现这一点?

英文:

I have a use case where I need to lock on arguments of a function.

The function itself can be accessed concurrently

Function signature is something like

func (m objectType) operate(key string) (bool) {
    // get lock on "key" (return false if unable to get lock in X ms - eg: 100 ms)
	// operate
    // release lock on "key"
    return true;
}

The data space which can be locked is in the range of millions (~10 million)

Concurrent access to operate() is in the range of thousands (1 - 5k)

Expected contention is low though possible in case of hotspots in key (hence the lock)

What is the right way to implement this ? Few options I explored using a concurrent hash map

  1. sync.Map - this is suited for cases with append only entries and high read ratio compared to writes. Hence not applicable here
  2. sharded hashmap where each shard is locked by RWMutex - https://github.com/orcaman/concurrent-map - While this would work, concurrency is limited by no of shards rather than actual contention between keys. Also doesn't enable the timeout scenarios when lot of contention happens for a subset of keys

Though timeout is a P1 requirement, the P0 requirement would be to increase throughput here by granular locking if possible.

Is there a good way to achieve this ?

答案1

得分: 2

我会使用一个带有缓冲通道的映射来实现:

  • 要获取一个“互斥锁”,尝试向缓冲通道中填充一个值
  • 执行工作
  • 完成后,清空缓冲通道,以便另一个goroutine可以使用它

示例代码如下:

package main

import (
	"fmt"
	"sync"
	"time"
)

type MutexMap struct {
	mut     sync.RWMutex        // 处理chanMap的并发访问
	chanMap map[int](chan bool) // 动态互斥锁映射
}

func NewMutextMap() *MutexMap {
	var mut sync.RWMutex
	return &MutexMap{
		mut:     mut,
		chanMap: make(map[int](chan bool)),
	}
}

// 获取锁,带有超时
func (mm *MutexMap) Lock(id int, timeout time.Duration) error {
	// 获取全局锁以从映射中读取并获取通道
	mm.mut.Lock()
	if _, ok := mm.chanMap[id]; !ok {
		mm.chanMap[id] = make(chan bool, 1)
	}
	ch := mm.chanMap[id]
	mm.mut.Unlock()

	// 尝试向缓冲通道写入,带有超时
	select {
	case ch <- true:
		return nil
	case <-time.After(timeout):
		return fmt.Errorf("正在处理 %v 的操作超时", id)
	}
}

// 释放锁
func (mm *MutexMap) Release(id int) {
	mm.mut.Lock()
	ch := mm.chanMap[id]
	mm.mut.Unlock()
	<-ch
}

func work(id int, mm *MutexMap) {
	// 带超时获取锁
	if err := mm.Lock(id, 100*time.Millisecond); err != nil {
		fmt.Printf("错误:%s\n", err)
		return
	}
	fmt.Printf("正在处理任务 %v\n", id)
	// 做一些工作...
	time.Sleep(time.Second)
	fmt.Printf("完成任务 %v\n", id)

	// 释放锁
	mm.Release(id)
}

func main() {
	mm := NewMutextMap()
	var wg sync.WaitGroup
	for i := 0; i < 50; i++ {
		wg.Add(1)
		id := i % 10
		go func(id int, mm *MutexMap, wg *sync.WaitGroup) {
			work(id, mm)
			defer wg.Done()
		}(id, mm, &wg)
	}
	wg.Wait()
}

编辑:不同版本,我们还处理了对chanMap本身的并发访问。

英文:

I would do it by using a map of buffered channels:

  • to acquire a "mutex", try to fill a buffered channel with a value
  • work
  • when done, empty the buffered channel so that another goroutine can use it

Example:

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

type MutexMap struct {
	mut     sync.RWMutex        // handle concurrent access of chanMap
	chanMap map[int](chan bool) // dynamic mutexes map
}

func NewMutextMap() *MutexMap {
	var mut sync.RWMutex
	return &amp;MutexMap{
		mut:     mut,
		chanMap: make(map[int](chan bool)),
	}
}

// Acquire a lock, with timeout
func (mm *MutexMap) Lock(id int, timeout time.Duration) error {
	// get global lock to read from map and get a channel
	mm.mut.Lock()
	if _, ok := mm.chanMap[id]; !ok {
		mm.chanMap[id] = make(chan bool, 1)
	}
	ch := mm.chanMap[id]
	mm.mut.Unlock()

	// try to write to buffered channel, with timeout
	select {
	case ch &lt;- true:
		return nil
	case &lt;-time.After(timeout):
		return fmt.Errorf(&quot;working on %v just timed out&quot;, id)
	}
}

// release lock
func (mm *MutexMap) Release(id int) {
	mm.mut.Lock()
	ch := mm.chanMap[id]
	mm.mut.Unlock()
	&lt;-ch
}

func work(id int, mm *MutexMap) {
	// acquire lock with timeout
	if err := mm.Lock(id, 100*time.Millisecond); err != nil {
		fmt.Printf(&quot;ERROR: %s\n&quot;, err)
		return
	}
	fmt.Printf(&quot;working on task %v\n&quot;, id)
	// do some work...
	time.Sleep(time.Second)
	fmt.Printf(&quot;done working on %v\n&quot;, id)

	// release lock
	mm.Release(id)
}

func main() {
	mm := NewMutextMap()
	var wg sync.WaitGroup
	for i := 0; i &lt; 50; i++ {
		wg.Add(1)
		id := i % 10
		go func(id int, mm *MutexMap, wg *sync.WaitGroup) {
			work(id, mm)
			defer wg.Done()
		}(id, mm, &amp;wg)
	}
	wg.Wait()
}

EDIT: different version, where we also handle the concurrent access to the chanMap itself

huangapple
  • 本文由 发表于 2021年8月18日 14:34:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/68827543.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定