如何在Go语言中对互斥锁的映射进行垃圾回收?

huangapple go评论91阅读模式
英文:

How to gc a map of mutexes in Go?

问题

我正在围绕数据库创建一个缓存包装器。为了考虑到可能的数据库调用速度较慢的情况,我考虑为每个键使用一个互斥锁(伪Go代码):

mutexes = map[string]*sync.Mutex // 实例变量

mutexes[key].Lock()
defer mutexes[key].Unlock()

if value, ok := cache.find(key); ok {
   return value
}
value = databaseCall(key)
cache.save(key, value)
return value

然而,我不希望我的映射表增长太多。我的缓存是一个LRU(最近最少使用)缓存,出于其他原因,我希望它有一个固定的大小。我想做类似于以下的操作:

delete(mutexes, key)

当所有对该键的锁都释放完毕时,但是...这看起来对我来说不是线程安全的...我应该如何做呢?

注意:我找到了这个问题
https://stackoverflow.com/questions/40560007/in-go-can-we-synchronize-each-key-of-a-map-using-a-lock-per-key 但没有答案。

英文:

I am making a cache wrapper around a database. To account for possibly slow database calls, I was thinking of a mutex per key (pseudo Go code):

mutexes = map[string]*sync.Mutex // instance variable

mutexes[key].Lock()
defer mutexes[key].Unlock()

if value, ok := cache.find(key); ok {
   return value
}
value = databaseCall(key)
cache.save(key, value)
return value

However I don't want my map to grow too much. My cache is an LRU and I want to have a fixed size for some other reasons not mentioned here. I would like to do something like

delete(mutexes, key)

when all the locks on the key are over but... that doesn't look thread safe to me... How should I do it?

Note: I found this question
https://stackoverflow.com/questions/40560007/in-go-can-we-synchronize-each-key-of-a-map-using-a-lock-per-key but no answer

答案1

得分: 3

互斥锁映射是一种有效的实现方式,但映射本身也必须进行同步。可以使用引用计数来跟踪并发使用的条目,并在不再需要时将其删除。下面是一个完整的互斥锁映射示例,包括测试和基准测试。

mapofmu.go

// Package mapofmu provides locking per-key.
// For example, you can acquire a lock for a specific user ID and all other requests for that user ID
// will block until that entry is unlocked (effectively your work load will be run serially per-user ID),
// and yet have work for separate user IDs happen concurrently.
package mapofmu

import (
	"fmt"
	"sync"
)

// M wraps a map of mutexes. Each key locks separately.
type M struct {
	ml sync.Mutex              // lock for entry map
	ma map[interface{}]*mentry // entry map
}

type mentry struct {
	m   *M          // point back to M, so we can synchronize removing this mentry when cnt==0
	el  sync.Mutex  // entry-specific lock
	cnt int         // reference count
	key interface{} // key in ma
}

// Unlocker provides an Unlock method to release the lock.
type Unlocker interface {
	Unlock()
}

// New returns an initalized M.
func New() *M {
	return &M{ma: make(map[interface{}]*mentry)}
}

// Lock acquires a lock corresponding to this key.
// This method will never return nil and Unlock() must be called
// to release the lock when done.
func (m *M) Lock(key interface{}) Unlocker {

	// read or create entry for this key atomically
	m.ml.Lock()
	e, ok := m.ma[key]
	if !ok {
		e = &mentry{m: m, key: key}
		m.ma[key] = e
	}
	e.cnt++ // ref count
	m.ml.Unlock()

	// acquire lock, will block here until e.cnt==1
	e.el.Lock()

	return e
}

// Unlock releases the lock for this entry.
func (me *mentry) Unlock() {

	m := me.m

	// decrement and if needed remove entry atomically
	m.ml.Lock()
	e, ok := m.ma[me.key]
	if !ok { // entry must exist
		m.ml.Unlock()
		panic(fmt.Errorf("Unlock requested for key=%v but no entry found", me.key))
	}
	e.cnt--        // ref count
	if e.cnt < 1 { // if it hits zero then we own it and remove from map
		delete(m.ma, me.key)
	}
	m.ml.Unlock()

	// now that map stuff is handled, we unlock and let
	// anything else waiting on this key through
	e.el.Unlock()

}

mapofmu_test.go:

package mapofmu

import (
	"math/rand"
	"strconv"
	"strings"
	"sync"
	"testing"
	"time"
)

func TestM(t *testing.T) {

	r := rand.New(rand.NewSource(42))

	m := New()
	_ = m

	keyCount := 20
	iCount := 10000
	out := make(chan string, iCount*2)

	// run a bunch of concurrent requests for various keys,
	// the idea is to have a lot of lock contention
	var wg sync.WaitGroup
	wg.Add(iCount)
	for i := 0; i < iCount; i++ {
		go func(rn int) {
			defer wg.Done()
			key := strconv.Itoa(rn)

			// you can prove the test works by commenting the locking out and seeing it fail
			l := m.Lock(key)
			defer l.Unlock()

			out <- key + " A"
			time.Sleep(time.Microsecond) // make 'em wait a mo'
			out <- key + " B"
		}(r.Intn(keyCount))
	}
	wg.Wait()
	close(out)

	// verify the map is empty now
	if l := len(m.ma); l != 0 {
		t.Errorf("unexpected map length at test end: %v", l)
	}

	// confirm that the output always produced the correct sequence
	outLists := make([][]string, keyCount)
	for s := range out {
		sParts := strings.Fields(s)
		kn, err := strconv.Atoi(sParts[0])
		if err != nil {
			t.Fatal(err)
		}
		outLists[kn] = append(outLists[kn], sParts[1])
	}
	for kn := 0; kn < keyCount; kn++ {
		l := outLists[kn] // list of output for this particular key
		for i := 0; i < len(l); i += 2 {
			if l[i] != "A" || l[i+1] != "B" {
				t.Errorf("For key=%v and i=%v got unexpected values %v and %v", kn, i, l[i], l[i+1])
				break
			}
		}
	}
	if t.Failed() {
		t.Logf("Failed, outLists: %#v", outLists)
	}

}

func BenchmarkM(b *testing.B) {

	m := New()

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		// run uncontended lock/unlock - should be quite fast
		m.Lock(i).Unlock()
	}

}

这是一个用于测试和基准测试的互斥锁映射的示例代码。

英文:

A map of mutexes is an efficient way to accomplish this, however the map itself must also be synchronized. A reference count can be used to keep track of entries in concurrent use and remove them when no longer needed. Here is a working map of mutexes complete with a test and benchmark.

(UPDATE: This package provides similar functionality: https://pkg.go.dev/golang.org/x/sync/singleflight )

mapofmu.go

// Package mapofmu provides locking per-key.
// For example, you can acquire a lock for a specific user ID and all other requests for that user ID
// will block until that entry is unlocked (effectively your work load will be run serially per-user ID),
// and yet have work for separate user IDs happen concurrently.
package mapofmu
import (
&quot;fmt&quot;
&quot;sync&quot;
)
// M wraps a map of mutexes.  Each key locks separately.
type M struct {
ml sync.Mutex              // lock for entry map
ma map[interface{}]*mentry // entry map
}
type mentry struct {
m   *M          // point back to M, so we can synchronize removing this mentry when cnt==0
el  sync.Mutex  // entry-specific lock
cnt int         // reference count
key interface{} // key in ma
}
// Unlocker provides an Unlock method to release the lock.
type Unlocker interface {
Unlock()
}
// New returns an initalized M.
func New() *M {
return &amp;M{ma: make(map[interface{}]*mentry)}
}
// Lock acquires a lock corresponding to this key.
// This method will never return nil and Unlock() must be called
// to release the lock when done.
func (m *M) Lock(key interface{}) Unlocker {
// read or create entry for this key atomically
m.ml.Lock()
e, ok := m.ma[key]
if !ok {
e = &amp;mentry{m: m, key: key}
m.ma[key] = e
}
e.cnt++ // ref count
m.ml.Unlock()
// acquire lock, will block here until e.cnt==1
e.el.Lock()
return e
}
// Unlock releases the lock for this entry.
func (me *mentry) Unlock() {
m := me.m
// decrement and if needed remove entry atomically
m.ml.Lock()
e, ok := m.ma[me.key]
if !ok { // entry must exist
m.ml.Unlock()
panic(fmt.Errorf(&quot;Unlock requested for key=%v but no entry found&quot;, me.key))
}
e.cnt--        // ref count
if e.cnt &lt; 1 { // if it hits zero then we own it and remove from map
delete(m.ma, me.key)
}
m.ml.Unlock()
// now that map stuff is handled, we unlock and let
// anything else waiting on this key through
e.el.Unlock()
}

mapofmu_test.go:

package mapofmu
import (
&quot;math/rand&quot;
&quot;strconv&quot;
&quot;strings&quot;
&quot;sync&quot;
&quot;testing&quot;
&quot;time&quot;
)
func TestM(t *testing.T) {
r := rand.New(rand.NewSource(42))
m := New()
_ = m
keyCount := 20
iCount := 10000
out := make(chan string, iCount*2)
// run a bunch of concurrent requests for various keys,
// the idea is to have a lot of lock contention
var wg sync.WaitGroup
wg.Add(iCount)
for i := 0; i &lt; iCount; i++ {
go func(rn int) {
defer wg.Done()
key := strconv.Itoa(rn)
// you can prove the test works by commenting the locking out and seeing it fail
l := m.Lock(key)
defer l.Unlock()
out &lt;- key + &quot; A&quot;
time.Sleep(time.Microsecond) // make &#39;em wait a mo&#39;
out &lt;- key + &quot; B&quot;
}(r.Intn(keyCount))
}
wg.Wait()
close(out)
// verify the map is empty now
if l := len(m.ma); l != 0 {
t.Errorf(&quot;unexpected map length at test end: %v&quot;, l)
}
// confirm that the output always produced the correct sequence
outLists := make([][]string, keyCount)
for s := range out {
sParts := strings.Fields(s)
kn, err := strconv.Atoi(sParts[0])
if err != nil {
t.Fatal(err)
}
outLists[kn] = append(outLists[kn], sParts[1])
}
for kn := 0; kn &lt; keyCount; kn++ {
l := outLists[kn] // list of output for this particular key
for i := 0; i &lt; len(l); i += 2 {
if l[i] != &quot;A&quot; || l[i+1] != &quot;B&quot; {
t.Errorf(&quot;For key=%v and i=%v got unexpected values %v and %v&quot;, kn, i, l[i], l[i+1])
break
}
}
}
if t.Failed() {
t.Logf(&quot;Failed, outLists: %#v&quot;, outLists)
}
}
func BenchmarkM(b *testing.B) {
m := New()
b.ResetTimer()
for i := 0; i &lt; b.N; i++ {
// run uncontended lock/unlock - should be quite fast
m.Lock(i).Unlock()
}
}

答案2

得分: 2

我写了一个简单的类似实现:mapmutex

但是与使用互斥锁的映射不同,在这个实现中,一个互斥锁用于保护映射,映射中的每个项都像一个“锁”一样使用。映射本身只是一个普通的简单映射。

英文:

I wrote a simple similar implementation: mapmutex

But instead of a map of mutexes, in this implementation, a mutex is used to guard the map and each item in the map is used like a 'lock'. The map itself is just simple ordinary map.

huangapple
  • 本文由 发表于 2016年12月2日 19:39:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/40931373.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定