Golang使用互斥锁(mutex)来处理自定义并发映射时仍然存在数据竞争问题。

huangapple go评论79阅读模式
英文:

Golang data race even with mutex for custom concurrent maps

问题

这是我为了学习目的编写的一个简单的并发映射。

package concurrent_hashmap

import (
	"hash/fnv"
	"sync"
)

type ConcurrentMap struct {
	buckets     []ThreadSafeMap
	bucketCount uint32
}

type ThreadSafeMap struct {
	mapLock sync.RWMutex
	hashMap map[string]interface{}
}

func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
	var threadSafeMapInstance ThreadSafeMap
	var bucketOfThreadSafeMap []ThreadSafeMap

	for i := 0; i <= int(bucketSize); i++ {
		threadSafeMapInstance = ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
		bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
	}

	return &ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
}

func (cMap *ConcurrentMap) Put(key string, val interface{}) {
	bucketIndex := hash(key) % cMap.bucketCount
	bucket := cMap.buckets[bucketIndex]
	bucket.mapLock.Lock()
	bucket.hashMap[key] = val
	bucket.mapLock.Unlock()
}

// Helper
func hash(s string) uint32 {
	h := fnv.New32a()
	h.Write([]byte(s))
	return h.Sum32()
}

我正在尝试编写一个简单的基准测试,并发访问会出现以下错误:

fatal error: concurrent map writes

这是我的基准测试运行命令:go test -bench=. -race

package concurrent_hashmap

import (
	"testing"
	"runtime"
	"math/rand"
	"strconv"
	"sync"
)

// 并发访问无法正常工作
func BenchmarkMyFunc(b *testing.B) {
	var wg sync.WaitGroup

	runtime.GOMAXPROCS(runtime.NumCPU())

	my_map := NewConcurrentMap(uint32(4))
	for n := 0; n < b.N; n++ {
		go insert(my_map, wg)
	}
	wg.Wait()
}

func insert(my_map *ConcurrentMap, wg sync.WaitGroup) {
	wg.Add(1)
	var rand_int int
	for element_num := 0; element_num < 1000; element_num++ {
		rand_int = rand.Intn(100)
		my_map.Put(strconv.Itoa(rand_int), rand_int)
	}
	defer wg.Done()
}

// 这个可以正常工作
func BenchmarkMyFuncSynchronize(b *testing.B) {
	my_map := NewConcurrentMap(uint32(4))
	for n := 0; n < b.N; n++ {
		my_map.Put(strconv.Itoa(123), 123)
	}
}

WARNING: DATA RACE 表明 bucket.hashMap[key] = val 导致了问题,但我对为什么会出现问题感到困惑,因为我在写入时锁定了该逻辑。

我认为我可能遗漏了一些基本的东西,有人能指出我的错误吗?

谢谢。

编辑1:

不确定这是否有帮助,但是如果我不锁定任何东西,我的互斥锁看起来是这样的:

{{0 0} 0 0 0 0}

如果我锁定写入操作,它看起来是这样的:

{{1 0} 0 0 -1073741824 0}

不确定为什么我的 readerCount 是一个很小的负数。

编辑2:

我认为我找到了问题所在,但不确定为什么我必须这样编码。

问题在于:

type ThreadSafeMap struct {
    mapLock sync.RWMutex // 这是引起问题的地方
    hashMap map[string]interface{}
}

它应该是:

type ThreadSafeMap struct {
    mapLock *sync.RWMutex
    hashMap map[string]interface{}
}

另一个奇怪的事情是,如果我在锁定内部放置打印语句:

bucket.mapLock.Lock()
fmt.Println("start")
fmt.Println(bucket)
fmt.Println(bucketIndex)
fmt.Println(bucket.mapLock)
fmt.Println(&bucket.mapLock)
bucket.hashMap[key] = val
defer bucket.mapLock.Unlock()

以下打印输出是可能的:

start
start
{0x4212861c0 map[123:123]}
{0x4212241c0 map[123:123]}

这很奇怪,因为每个 start 的输出应该后面跟着 4 行 bucket 信息,因为不能连续出现 start,否则会表明多个线程正在访问锁内的行。

而且,由于我使 bucketIndex 是静态的,每个 bucket.mapLock 却具有不同的地址,这表明我甚至没有访问相同的锁。

但是,尽管存在上述奇怪的情况,将互斥锁更改为指针解决了我的问题。

我很想知道为什么我需要使用互斥锁的指针,以及为什么打印输出似乎表明多个线程正在访问锁,以及为什么每个锁都有不同的地址。

英文:

Here is a simple concurrent map that I wrote for learning purpose

	package concurrent_hashmap

	import (
		&quot;hash/fnv&quot;
	    &quot;sync&quot;
	)

	type ConcurrentMap struct {
		buckets []ThreadSafeMap
		bucketCount uint32
	}

	type ThreadSafeMap struct {
		mapLock sync.RWMutex
		hashMap map[string]interface{}
	}

	func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
		var threadSafeMapInstance ThreadSafeMap
		var bucketOfThreadSafeMap []ThreadSafeMap

	    for i := 0; i &lt;= int(bucketSize); i++ {
	    	threadSafeMapInstance = ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
	    	bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
	    }

		return &amp;ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
	}

	func (cMap *ConcurrentMap) Put(key string, val interface{}) {
		bucketIndex := hash(key) % cMap.bucketCount
		bucket := cMap.buckets[bucketIndex]
		bucket.mapLock.Lock()
		bucket.hashMap[key] = val
		bucket.mapLock.Unlock()
	}

    // Helper
    func hash(s string) uint32 {
        h := fnv.New32a()
        h.Write([]byte(s))
        return h.Sum32()
    }

I am trying to write a simple benchmark and I find that synchronize access will work correctly but concurrent access will get

fatal error: concurrent map writes

Here is my benchmark run with go test -bench=. -race

package concurrent_hashmap

import (
    &quot;testing&quot;
    &quot;runtime&quot;
    &quot;math/rand&quot;
    &quot;strconv&quot;
    &quot;sync&quot;
)
// Concurrent does not work
func BenchmarkMyFunc(b *testing.B) {
    var wg sync.WaitGroup

    runtime.GOMAXPROCS(runtime.NumCPU())

    my_map := NewConcurrentMap(uint32(4))
    for n := 0; n &lt; b.N; n++ {
        go insert(my_map, wg)
    }
    wg.Wait()
}

func insert(my_map *ConcurrentMap, wg sync.WaitGroup) {
    wg.Add(1)
    var rand_int int
    for element_num := 0; element_num &lt; 1000; element_num++ {
        rand_int = rand.Intn(100)
        my_map.Put(strconv.Itoa(rand_int), rand_int)
    }
    defer wg.Done()
}

// This works
func BenchmarkMyFuncSynchronize(b *testing.B) {
    my_map := NewConcurrentMap(uint32(4))
    for n := 0; n &lt; b.N; n++ {
        my_map.Put(strconv.Itoa(123), 123)
    }
}

The WARNING: DATA RACE is saying that bucket.hashMap[key] = val is causing the problem, but I am confused on why that is possible, since I lock that logic whenever write is happening.

I think I am missing something basic, can someone point out my mistake?

Thanks

Edit1:

Not sure if this helps but here is what my mutex looks like if I don't lock anything

{{0 0} 0 0 0 0}

Here is what it looks like if I lock the write

{{1 0} 0 0 -1073741824 0}

Not sure why my readerCount is a low negative number

Edit:2

I think I find where the issue is at, but not sure why I have to code that way

The issue is

type ThreadSafeMap struct {
    mapLock sync.RWMutex // This is causing problem
    hashMap map[string]interface{}
}

it should be

type ThreadSafeMap struct {
    mapLock *sync.RWMutex
    hashMap map[string]interface{}
}

Another weird thing is that in Put if I put print statement inside lock

bucket.mapLock.Lock()
fmt.Println(&quot;start&quot;)
fmt.Println(bucket)
fmt.Println(bucketIndex)
fmt.Println(bucket.mapLock)
fmt.Println(&amp;bucket.mapLock)
bucket.hashMap[key] = val
defer bucket.mapLock.Unlock()

The following prints is possible

start
start
{0x4212861c0 map[123:123]}
{0x4212241c0 map[123:123]}

Its weird because each start printout should be follow with 4 lines of bucket info since you cannot have start back to back because that would indicate that multiple thread is access the line inside lock

Also for some reason each bucket.mapLock have different address even if I make the bucketIndex static, that indicate that I am not even accessing the same lock.

But despite the above weirdness changing mutex to pointer solves my problem

I would love to find out why I need pointers for mutex and why the prints seem to indicate multiple thread is accessing the lock and why each lock has different address.

答案1

得分: 1

问题出在以下语句:

bucket := cMap.buckets[bucketIndex]

bucket 现在包含该索引处 ThreadSafeMap 的副本。由于 sync.RWMutex 存储为值,因此在赋值时会进行复制。但是,map 会持有对底层数据结构的引用,因此传递的是指针的副本或同一个 map。该代码在写入单个 map 时锁定了锁的副本,这导致了问题。

这就是为什么当你将 sync.RWMutex 更改为 *sync.RWMutex 时,你不会遇到任何问题。最好将引用存储在 map 中,如示例所示。

以下是翻译好的代码:

package concurrent_hashmap

import (
	"hash/fnv"
	"sync"
)

type ConcurrentMap struct {
	buckets     []*ThreadSafeMap
	bucketCount uint32
}

type ThreadSafeMap struct {
	mapLock sync.RWMutex
	hashMap map[string]interface{}
}

func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
	var threadSafeMapInstance *ThreadSafeMap
	var bucketOfThreadSafeMap []*ThreadSafeMap

	for i := 0; i <= int(bucketSize); i++ {
		threadSafeMapInstance = &ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
		bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
	}

	return &ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
}

func (cMap *ConcurrentMap) Put(key string, val interface{}) {
	bucketIndex := hash(key) % cMap.bucketCount
	bucket := cMap.buckets[bucketIndex]
	bucket.mapLock.Lock()
	bucket.hashMap[key] = val
	bucket.mapLock.Unlock()
}

// Helper
func hash(s string) uint32 {
	h := fnv.New32a()
	h.Write([]byte(s))
	return h.Sum32()
}

可以通过修改 Put 函数来验证这种情况,如下所示:

func (cMap *ConcurrentMap) Put(key string, val interface{}) {
    //fmt.Println("index", key)
    bucketIndex := 1
    bucket := cMap.buckets[bucketIndex]
    fmt.Printf("%p %p\n", &(bucket.mapLock), bucket.hashMap)
}
英文:

The problem is with the statement

bucket := cMap.buckets[bucketIndex]

bucket now contains copy of the ThreadSafeMap at that index. As sync.RWMutex is stored as value, a copy of it is made while assigning. But map maps hold references to an underlying data structure, so the copy of the pointer or the same map is passed. The code locks a copy of the lock while writing to a single map, which cause the problem.

Thats why you don't face any problem when you change sync.RWMutex to *sync.RWMutex. It's better to store reference to structure in map as shown.

package concurrent_hashmap

import (
	&quot;hash/fnv&quot;
	&quot;sync&quot;
)

type ConcurrentMap struct {
	buckets     []*ThreadSafeMap
	bucketCount uint32
}

type ThreadSafeMap struct {
	mapLock sync.RWMutex
	hashMap map[string]interface{}
}

func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
	var threadSafeMapInstance *ThreadSafeMap
	var bucketOfThreadSafeMap []*ThreadSafeMap

	for i := 0; i &lt;= int(bucketSize); i++ {
		threadSafeMapInstance = &amp;ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
		bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
	}

	return &amp;ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
}

func (cMap *ConcurrentMap) Put(key string, val interface{}) {
	bucketIndex := hash(key) % cMap.bucketCount
	bucket := cMap.buckets[bucketIndex]
	bucket.mapLock.Lock()
	bucket.hashMap[key] = val
	bucket.mapLock.Unlock()
}

// Helper
func hash(s string) uint32 {
	h := fnv.New32a()
	h.Write([]byte(s))
	return h.Sum32()
}

It's possible to validate the scenario by modifying the function Put as follows

func (cMap *ConcurrentMap) Put(key string, val interface{}) {
    //fmt.Println(&quot;index&quot;, key)
    bucketIndex := 1
    bucket := cMap.buckets[bucketIndex]
    fmt.Printf(&quot;%p %p\n&quot;, &amp;(bucket.mapLock), bucket.hashMap)
}

huangapple
  • 本文由 发表于 2017年8月21日 01:03:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/45784722.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定