在写入和读取映射表时存在竞态条件。

huangapple go评论87阅读模式
英文:

Race condition while writing and reading from the map

问题

根据你提供的代码,你正在使用golang中的concurrent-map库来填充productCatalog并解决并发访问的问题。然而,你遇到了一个问题,即在迭代productCatalog时出现了并发映射迭代和映射写入的错误。

你提到尝试将oldIDs[productID]更改为concurrent map,但这会导致内存占用增加并可能导致OOM。你想知道是否有其他方法可以解决这个问题,既不增加内存占用,又能解决并发访问的问题。

根据你的描述,我建议你尝试使用sync.Map来替代concurrent-map库。sync.Map是Go标准库中提供的并发安全的映射类型,可以解决并发访问的问题。

以下是你可以尝试的修改代码:

首先,你需要将productCatalog的类型更改为sync.Map:

var productCatalog sync.Map

然后,在Upsert方法中,你可以使用sync.Map的LoadOrStore方法来进行插入或更新操作:

r.productCatalog.Upsert(catalogValue, flatProduct.ProductId, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
    productID := newValue.(int64)
    if !exists {
        return map[int64]struct{}{productID: {}}
    }
    oldIDs := valueInMap.(map[int64]struct{})
    
    // value is irrelevant, no need to check if key exists 
    // I think problem is here
    oldIDs[productID] = struct{}{}
    return oldIDs
})

最后,在获取productCatalog的代码中,你可以使用sync.Map的Load方法来获取值:

// get productCatalog map which was populated above
catalogProductMap := clientRepo.GetProductCatalogMap()
productIds, ok := catalogProductMap.Load("211")
if ok {
    data := productIds.(map[int64]struct{})
    // iterate over data
    for pid := range data {
        // do something with pid
    }
}

通过使用sync.Map,你可以解决并发访问的问题,而无需增加额外的内存开销。希望这可以帮助到你!

英文:

Following up on old post here.

I am iterating over flatProduct.Catalogs slice and populating my productCatalog concurrent map in golang. I am using upsert method so that I can add only unique productID's into my productCatalog map.

Below code is called by multiple go routines in parallel that is why I am using concurrent map here to populate data into it. This code runs in background to populate data in the concurrent map every 30 seconds.

var productRows []ClientProduct
err = json.Unmarshal(byteSlice, &productRows)
if err != nil {
    return err
}
for i := range productRows {
    flatProduct, err := r.Convert(spn, productRows[i])
    if err != nil {
        return err
    }
    if flatProduct.StatusCode == definitions.DONE {
        continue
    }
    r.products.Set(strconv.Itoa(flatProduct.ProductId, 10), flatProduct)
    for _, catalogId := range flatProduct.Catalogs {
        catalogValue := strconv.FormatInt(int64(catalogId), 10)
        r.productCatalog.Upsert(catalogValue, flatProduct.ProductId, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
            productID := newValue.(int64)
            if valueInMap == nil {
                return map[int64]struct{}{productID: {}}
            }
            oldIDs := valueInMap.(map[int64]struct{})
            
            // value is irrelevant, no need to check if key exists 
            // I think problem is here
            oldIDs[productID] = struct{}{}
            return oldIDs
        })
    }
}

And below are my getters in the same class where above code is there. These getters are used by main application threads to get data from the map or get the whole map.

func (r *clientRepository) GetProductMap() *cmap.ConcurrentMap {
	return r.products
}

func (r *clientRepository) GetProductCatalogMap() *cmap.ConcurrentMap {
	return r.productCatalog
}

func (r *clientRepository) GetProductData(pid string) *definitions.FlatProduct {
	pd, ok := r.products.Get(pid)
	if ok {
		return pd.(*definitions.FlatProduct)
	}
	return nil
}

This is how I am reading data from this productCatalog cmap but my system is crashing on the below range statement -

// get productCatalog map which was populated above
catalogProductMap := clientRepo.GetProductCatalogMap()
productIds, ok := catalogProductMap.Get("211")
data, _ := productIds.(map[int64]struct{})

// I get panic here after sometime
for _, pid := range data {
  ...
}

Error I am getting as - fatal error: concurrent map iteration and map write.

I think issue is r.productCatalog is a concurrentmap, but oldIDs[productID] is a normal map which is causing issues while I am iterating in the for loop above.

How can I fix this race issue I am seeing? One way I can think of is making oldIDs[productID] as concurrent map but if I do that approach then my memory increase by a lot and eventually goes OOM. Below is what I have tried which works and it solves the race condition but it increases the memory by a lot which is not what I want -

r.productCatalog.Upsert(catalogValue, flatProduct.ProductId, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
    productID := newValue.(int64)
    if valueInMap == nil {
        // return map[int64]struct{}{productID: {}}
        return cmap.New()
    }
    // oldIDs := valueInMap.(map[int64]struct{})
    oldIDs := valueInMap.(cmap.ConcurrentMap)

    // value is irrelevant, no need to check if key exists
    // oldIDs[productID] = struct{}{}
    oldIDs.Set(strconv.FormatInt(productID, 10), struct{}{})
    return oldIDs
})

Any other approach I can do which doesn't increase memory and also fixes the race condition I am seeing?

Note
I am still using v1 version of cmap without generics and it deals with strings as keys.

答案1

得分: 0

与其使用简单的map[int64]struct{}类型,你可以定义一个结构体,该结构体包含一个映射和一个互斥锁来控制对映射的访问:

type myMap struct{
   m sync.Mutex
   data map[int64]struct{}
}

func (m *myMap) Add(productID int64) {
   m.m.Lock()
   defer m.m.Unlock()

   m.data[productID] = struct{}{}
}

func (m *myMap) List() []int64 {
   m.m.Lock()
   defer m.m.Unlock()

   var res []int64
   for id := range m.data {
       res = append(res, id)
   }

   // 如果需要,对切片进行排序
   return res
}

使用上述示例实现时,你需要小心在cmap.ConcurrentMap结构中存储*myMap指针(而不是普通的myMap结构)。

英文:

Rather than a plain map[int64]struct{} type, you could define a struct which holds the map and a mutex to control the access to the map:

type myMap struct{
   m sync.Mutex
   data map[int64]struct{}
}

func (m *myMap) Add(productID int64) {
   m.m.Lock()
   defer m.m.Unlock()

   m.data[productID] = struct{}{}
}

func (m *myMap) List() []int64 {
   m.m.Lock()
   defer m.m.Unlock()

   var res []int64
   for id := range m.data {
       res = append(res, id)
   }

   // sort slice if you need
   return res
}

With the sample implementation above, you would have to be careful to store *myMap pointers (as opposed to plain myMap structs) in your cmap.ConcurrentMap structure.

huangapple
  • 本文由 发表于 2023年1月6日 06:41:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/75025126.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定