英文:
Golang data race even with mutex for custom concurrent maps
问题
这是我为了学习目的编写的一个简单的并发映射。
package concurrent_hashmap
import (
"hash/fnv"
"sync"
)
type ConcurrentMap struct {
buckets []ThreadSafeMap
bucketCount uint32
}
type ThreadSafeMap struct {
mapLock sync.RWMutex
hashMap map[string]interface{}
}
func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
var threadSafeMapInstance ThreadSafeMap
var bucketOfThreadSafeMap []ThreadSafeMap
for i := 0; i <= int(bucketSize); i++ {
threadSafeMapInstance = ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
}
return &ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
}
func (cMap *ConcurrentMap) Put(key string, val interface{}) {
bucketIndex := hash(key) % cMap.bucketCount
bucket := cMap.buckets[bucketIndex]
bucket.mapLock.Lock()
bucket.hashMap[key] = val
bucket.mapLock.Unlock()
}
// Helper
func hash(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}
我正在尝试编写一个简单的基准测试,并发访问会出现以下错误:
fatal error: concurrent map writes
这是我的基准测试运行命令:go test -bench=. -race
package concurrent_hashmap
import (
"testing"
"runtime"
"math/rand"
"strconv"
"sync"
)
// 并发访问无法正常工作
func BenchmarkMyFunc(b *testing.B) {
var wg sync.WaitGroup
runtime.GOMAXPROCS(runtime.NumCPU())
my_map := NewConcurrentMap(uint32(4))
for n := 0; n < b.N; n++ {
go insert(my_map, wg)
}
wg.Wait()
}
func insert(my_map *ConcurrentMap, wg sync.WaitGroup) {
wg.Add(1)
var rand_int int
for element_num := 0; element_num < 1000; element_num++ {
rand_int = rand.Intn(100)
my_map.Put(strconv.Itoa(rand_int), rand_int)
}
defer wg.Done()
}
// 这个可以正常工作
func BenchmarkMyFuncSynchronize(b *testing.B) {
my_map := NewConcurrentMap(uint32(4))
for n := 0; n < b.N; n++ {
my_map.Put(strconv.Itoa(123), 123)
}
}
WARNING: DATA RACE
表明 bucket.hashMap[key] = val
导致了问题,但我对为什么会出现问题感到困惑,因为我在写入时锁定了该逻辑。
我认为我可能遗漏了一些基本的东西,有人能指出我的错误吗?
谢谢。
编辑1:
不确定这是否有帮助,但是如果我不锁定任何东西,我的互斥锁看起来是这样的:
{{0 0} 0 0 0 0}
如果我锁定写入操作,它看起来是这样的:
{{1 0} 0 0 -1073741824 0}
不确定为什么我的 readerCount 是一个很小的负数。
编辑2:
我认为我找到了问题所在,但不确定为什么我必须这样编码。
问题在于:
type ThreadSafeMap struct {
mapLock sync.RWMutex // 这是引起问题的地方
hashMap map[string]interface{}
}
它应该是:
type ThreadSafeMap struct {
mapLock *sync.RWMutex
hashMap map[string]interface{}
}
另一个奇怪的事情是,如果我在锁定内部放置打印语句:
bucket.mapLock.Lock()
fmt.Println("start")
fmt.Println(bucket)
fmt.Println(bucketIndex)
fmt.Println(bucket.mapLock)
fmt.Println(&bucket.mapLock)
bucket.hashMap[key] = val
defer bucket.mapLock.Unlock()
以下打印输出是可能的:
start
start
{0x4212861c0 map[123:123]}
{0x4212241c0 map[123:123]}
这很奇怪,因为每个 start
的输出应该后面跟着 4 行 bucket 信息,因为不能连续出现 start
,否则会表明多个线程正在访问锁内的行。
而且,由于我使 bucketIndex
是静态的,每个 bucket.mapLock
却具有不同的地址,这表明我甚至没有访问相同的锁。
但是,尽管存在上述奇怪的情况,将互斥锁更改为指针解决了我的问题。
我很想知道为什么我需要使用互斥锁的指针,以及为什么打印输出似乎表明多个线程正在访问锁,以及为什么每个锁都有不同的地址。
英文:
Here is a simple concurrent map that I wrote for learning purpose
package concurrent_hashmap
import (
"hash/fnv"
"sync"
)
type ConcurrentMap struct {
buckets []ThreadSafeMap
bucketCount uint32
}
type ThreadSafeMap struct {
mapLock sync.RWMutex
hashMap map[string]interface{}
}
func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
var threadSafeMapInstance ThreadSafeMap
var bucketOfThreadSafeMap []ThreadSafeMap
for i := 0; i <= int(bucketSize); i++ {
threadSafeMapInstance = ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
}
return &ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
}
func (cMap *ConcurrentMap) Put(key string, val interface{}) {
bucketIndex := hash(key) % cMap.bucketCount
bucket := cMap.buckets[bucketIndex]
bucket.mapLock.Lock()
bucket.hashMap[key] = val
bucket.mapLock.Unlock()
}
// Helper
func hash(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}
I am trying to write a simple benchmark and I find that synchronize access will work correctly but concurrent access will get
fatal error: concurrent map writes
Here is my benchmark run with go test -bench=. -race
package concurrent_hashmap
import (
"testing"
"runtime"
"math/rand"
"strconv"
"sync"
)
// Concurrent does not work
func BenchmarkMyFunc(b *testing.B) {
var wg sync.WaitGroup
runtime.GOMAXPROCS(runtime.NumCPU())
my_map := NewConcurrentMap(uint32(4))
for n := 0; n < b.N; n++ {
go insert(my_map, wg)
}
wg.Wait()
}
func insert(my_map *ConcurrentMap, wg sync.WaitGroup) {
wg.Add(1)
var rand_int int
for element_num := 0; element_num < 1000; element_num++ {
rand_int = rand.Intn(100)
my_map.Put(strconv.Itoa(rand_int), rand_int)
}
defer wg.Done()
}
// This works
func BenchmarkMyFuncSynchronize(b *testing.B) {
my_map := NewConcurrentMap(uint32(4))
for n := 0; n < b.N; n++ {
my_map.Put(strconv.Itoa(123), 123)
}
}
The WARNING: DATA RACE
is saying that bucket.hashMap[key] = val
is causing the problem, but I am confused on why that is possible, since I lock that logic whenever write is happening.
I think I am missing something basic, can someone point out my mistake?
Thanks
Edit1:
Not sure if this helps but here is what my mutex looks like if I don't lock anything
{{0 0} 0 0 0 0}
Here is what it looks like if I lock the write
{{1 0} 0 0 -1073741824 0}
Not sure why my readerCount is a low negative number
Edit:2
I think I find where the issue is at, but not sure why I have to code that way
The issue is
type ThreadSafeMap struct {
mapLock sync.RWMutex // This is causing problem
hashMap map[string]interface{}
}
it should be
type ThreadSafeMap struct {
mapLock *sync.RWMutex
hashMap map[string]interface{}
}
Another weird thing is that in Put
if I put print statement inside lock
bucket.mapLock.Lock()
fmt.Println("start")
fmt.Println(bucket)
fmt.Println(bucketIndex)
fmt.Println(bucket.mapLock)
fmt.Println(&bucket.mapLock)
bucket.hashMap[key] = val
defer bucket.mapLock.Unlock()
The following prints is possible
start
start
{0x4212861c0 map[123:123]}
{0x4212241c0 map[123:123]}
Its weird because each start
printout should be follow with 4 lines of bucket info since you cannot have start
back to back because that would indicate that multiple thread is access the line inside lock
Also for some reason each bucket.mapLock
have different address even if I make the bucketIndex static, that indicate that I am not even accessing the same lock.
But despite the above weirdness changing mutex to pointer solves my problem
I would love to find out why I need pointers for mutex and why the prints seem to indicate multiple thread is accessing the lock and why each lock has different address.
答案1
得分: 1
问题出在以下语句:
bucket := cMap.buckets[bucketIndex]
bucket
现在包含该索引处 ThreadSafeMap
的副本。由于 sync.RWMutex
存储为值,因此在赋值时会进行复制。但是,map 会持有对底层数据结构的引用,因此传递的是指针的副本或同一个 map。该代码在写入单个 map 时锁定了锁的副本,这导致了问题。
这就是为什么当你将 sync.RWMutex
更改为 *sync.RWMutex
时,你不会遇到任何问题。最好将引用存储在 map 中,如示例所示。
以下是翻译好的代码:
package concurrent_hashmap
import (
"hash/fnv"
"sync"
)
type ConcurrentMap struct {
buckets []*ThreadSafeMap
bucketCount uint32
}
type ThreadSafeMap struct {
mapLock sync.RWMutex
hashMap map[string]interface{}
}
func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
var threadSafeMapInstance *ThreadSafeMap
var bucketOfThreadSafeMap []*ThreadSafeMap
for i := 0; i <= int(bucketSize); i++ {
threadSafeMapInstance = &ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
}
return &ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
}
func (cMap *ConcurrentMap) Put(key string, val interface{}) {
bucketIndex := hash(key) % cMap.bucketCount
bucket := cMap.buckets[bucketIndex]
bucket.mapLock.Lock()
bucket.hashMap[key] = val
bucket.mapLock.Unlock()
}
// Helper
func hash(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}
可以通过修改 Put
函数来验证这种情况,如下所示:
func (cMap *ConcurrentMap) Put(key string, val interface{}) {
//fmt.Println("index", key)
bucketIndex := 1
bucket := cMap.buckets[bucketIndex]
fmt.Printf("%p %p\n", &(bucket.mapLock), bucket.hashMap)
}
英文:
The problem is with the statement
bucket := cMap.buckets[bucketIndex]
bucket
now contains copy of the ThreadSafeMap
at that index. As sync.RWMutex
is stored as value, a copy of it is made while assigning. But map maps hold references to an underlying data structure, so the copy of the pointer or the same map is passed. The code locks a copy of the lock while writing to a single map, which cause the problem.
Thats why you don't face any problem when you change sync.RWMutex
to *sync.RWMutex
. It's better to store reference to structure in map as shown.
package concurrent_hashmap
import (
"hash/fnv"
"sync"
)
type ConcurrentMap struct {
buckets []*ThreadSafeMap
bucketCount uint32
}
type ThreadSafeMap struct {
mapLock sync.RWMutex
hashMap map[string]interface{}
}
func NewConcurrentMap(bucketSize uint32) *ConcurrentMap {
var threadSafeMapInstance *ThreadSafeMap
var bucketOfThreadSafeMap []*ThreadSafeMap
for i := 0; i <= int(bucketSize); i++ {
threadSafeMapInstance = &ThreadSafeMap{sync.RWMutex{}, make(map[string]interface{})}
bucketOfThreadSafeMap = append(bucketOfThreadSafeMap, threadSafeMapInstance)
}
return &ConcurrentMap{bucketOfThreadSafeMap, bucketSize}
}
func (cMap *ConcurrentMap) Put(key string, val interface{}) {
bucketIndex := hash(key) % cMap.bucketCount
bucket := cMap.buckets[bucketIndex]
bucket.mapLock.Lock()
bucket.hashMap[key] = val
bucket.mapLock.Unlock()
}
// Helper
func hash(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}
It's possible to validate the scenario by modifying the function Put
as follows
func (cMap *ConcurrentMap) Put(key string, val interface{}) {
//fmt.Println("index", key)
bucketIndex := 1
bucket := cMap.buckets[bucketIndex]
fmt.Printf("%p %p\n", &(bucket.mapLock), bucket.hashMap)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论