在启动时从文件中加载数据,然后处理新文件,并从映射中清除旧状态。

huangapple go评论86阅读模式
英文:

Load data from reading files during startup and then process new files and clear old state from the map

问题

我正在进行一个项目,在启动时需要读取特定的文件并将其存储在内存中的映射中,然后定期查找是否有新文件,如果有的话,用这个新数据替换之前在启动时存储在内存中的映射数据。基本上,每当有一个新的文件是一个full state时,我希望将内存中的映射对象刷新为这个新的对象,而不是追加到它上面。

在服务器启动期间调用下面的loadAtStartupAndProcessNewChanges方法,它读取文件并将数据存储在内存中。同时,它启动一个名为detectNewFiles的go例程,定期检查是否有新文件,并将其存储在deltaChan通道中,稍后由另一个名为processNewFiles的go例程访问该通道,再次读取该新文件并将数据存储在同一个映射中。如果有任何错误,则将其存储在err通道中。loadFiles是一个函数,用于将文件读取到内存中并存储在映射中。

type customerConfig struct {
  deltaChan   chan string
  err         chan error
  wg          sync.WaitGroup
  data        *cmap.ConcurrentMap
}

// 这个方法在服务器启动时被调用。
func (r *customerConfig) loadAtStartupAndProcessNewChanges() error {
  path, err := r.GetPath("...", "....")
  if err != nil {
    return err
  }

  r.wg.Add(1)
  go r.detectNewFiles(path)
  err = r.loadFiles(4, path)
  if err != nil {
    return err
  }
  r.wg.Add(1)
  go r.processNewFiles()
  return nil
}

该方法基本上确定是否有任何需要处理的新文件,如果有,则将其放入deltaChan通道中,稍后由processNewFiles go例程消费并读取内存中的文件。如果有任何错误,则将其添加到错误通道中。

func (r *customerConfig) detectNewFiles(rootPath string) {

}

这将读取所有s3文件并将其存储在内存中,并返回错误。在这个方法中,我清除了映射的先前状态,以便它可以从新文件中获得新的状态。该方法在服务器启动时调用,也在我们需要从processNewFiles go例程处理新文件时调用。

func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // 重置映射,以便它可以从新文件中获得新的状态。
  r.data.Clear()
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  for _, file := range files {
    select {
    case <-ctx.Done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { <-sem }()
      return r.read(spn, file, bucket)
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }
  return nil
}

该方法读取文件并将其添加到data并发映射中。

func (r *customerConfig) read(file string, bucket string) error {
  // 读取文件并将其存储在"data"并发映射中
  // 如果有任何错误,则返回错误
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return errs.Wrap(err)
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return errs.Wrap(err)
  }

  if pr.GetNumRows() == 0 {
    spn.Infof("Skipping %s due to 0 rows", file)
    return nil
  }

  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return errs.Wrap(err)
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return errs.Wrap(err)
    }
    var invMods []CompModel
    err = json.Unmarshal(byteSlice, &invMods)
    if err != nil {
      return errs.Wrap(err)
    }

    for i := range invMods {
      key := strconv.FormatInt(invMods[i].ProductID, 10) + ":" + strconv.Itoa(int(invMods[i].Iaz))
      hasInventory := false
      if invMods[i].Available > 0 {
        hasInventory = true
      }
      r.data.Set(key, hasInventory)
    }
  }
  return nil
}

该方法将选择delta channel上的内容,如果有新文件,则调用loadFiles方法开始读取该新文件。如果有任何错误,则将其添加到错误通道中。

// processNewFiles - load new files found by detectNewFiles
func (r *customerConfig) processNewFiles() {
  // 在delta channel上找到新文件
  // 并调用"loadFiles"方法读取它
  // 如果有错误,则将其添加到错误通道中。
}

如果在error channel上有任何错误,则会从下面的方法中记录这些错误 -

func (r *customerConfig) handleError() {
  // 从错误通道中读取错误
  // 然后记录它
}

问题陈述

上述逻辑对我来说没有任何问题,但我的代码中有一个小错误,我无法弄清楚如何解决它。正如你所看到的,我有一个并发映射,我在read方法中填充它,在loadFiles方法中清除整个映射。因为每当在delta channel上有一个新文件时,我不想保留映射中的先前状态,所以我将其从映射中删除,然后将新文件的新状态添加到其中。

现在,如果read方法中有任何错误,那么bug就会发生,因为我已经清除了data映射中的所有数据,这将导致一个空映射,这不是我想要的。基本上,如果有任何错误,我希望保留data映射中的先前状态。如何在上述当前设计中解决这个问题?

**注意:**我正在使用golang的concurrent map

英文:

I am working on a project where during startup I need to read certain files and store it in memory in a map and then periodically look for new files if there are any and then replace whatever I had in memory in the map earlier during startup with this new data. Basically every time if there is a new file which is a full state then I want to refresh my in memory map objects to this new one instead of appending to it.

Below method loadAtStartupAndProcessNewChanges is called during server startup which reads the file and store data in memory. Also it starts a go-routine detectNewFiles which periodically checks if there are any new files and store it on a deltaChan channel which is later accessed by another go-routine processNewFiles to read that new file again and store data in the same map. If there is any error then we store it on err channel. loadFiles is the function which will read files in memory and store it in map.

type customerConfig struct {
  deltaChan   chan string
  err         chan error
  wg          sync.WaitGroup
  data        *cmap.ConcurrentMap
}

// this is called during server startup.
func (r *customerConfig) loadAtStartupAndProcessNewChanges() error {
  path, err := r.GetPath(&quot;...&quot;, &quot;....&quot;)
  if err != nil {
    return err
  }

  r.wg.Add(1)
  go r.detectNewFiles(path)
  err = r.loadFiles(4, path)
  if err != nil {
    return err
  }
  r.wg.Add(1)
  go r.processNewFiles()
  return nil
}

This method basically figures out if there are any new files that needs to be consumed and if there is any then it will put it on the deltaChan channel which will be later on consumed by processNewFiles go-routine and read the file in memory. If there is any error then it will add error to the error channel.

func (r *customerConfig) detectNewFiles(rootPath string) {

}

This will read all s3 files and store it in memory and return error. In this method I clear previous state of my map so that it can have fresh state from new files. This method is called during server startup and also called whenever we need to process new files from processNewFiles go-routine.

func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  r.data.Clear()
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  for _, file := range files {
    select {
    case &lt;-ctx.Done():
      break
    case sem &lt;- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { &lt;-sem }()
      return r.read(spn, file, bucket)
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }
  return nil
}

This method read the files and add in the data concurrent map.

func (r *customerConfig) read(file string, bucket string) error {
  // read file and store it in &quot;data&quot; concurrent map 
  // and if there is any error then return the error
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return errs.Wrap(err)
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return errs.Wrap(err)
  }

  if pr.GetNumRows() == 0 {
    spn.Infof(&quot;Skipping %s due to 0 rows&quot;, file)
    return nil
  }

  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return errs.Wrap(err)
    }
    if len(rows) &lt;= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return errs.Wrap(err)
    }
    var invMods []CompModel
    err = json.Unmarshal(byteSlice, &amp;invMods)
    if err != nil {
      return errs.Wrap(err)
    }

    for i := range invMods {
      key := strconv.FormatInt(invMods[i].ProductID, 10) + &quot;:&quot; + strconv.Itoa(int(invMods[i].Iaz))
      hasInventory := false
      if invMods[i].Available &gt; 0 {
        hasInventory = true
      }
      r.data.Set(key, hasInventory)
    }
  }
  return nil
}

This method will pick what is there on the delta channel and if there are any new files then it will start reading that new file by calling loadFiles method. If there is any error then it will add error to the error channel.

// processNewFiles - load new files found by detectNewFiles
func (r *customerConfig) processNewFiles() {
  // find new files on delta channel
  // and call &quot;loadFiles&quot; method to read it
  // if there is any error, then it will add it to the error channel.
}

If there is any error on the error channel then it will log those errors from below method -

func (r *customerConfig) handleError() {
  // read error from error channel if there is any
  // then log it
}

Problem Statement

Above logic works for me without any issues but there is one small bug in my code which I am not able to figure out on how to solve it. As you can see I have a concurrent map which I am populating in my read method and also clearing that whole map in loadFiles method. Because whenever there is a new file on delta channel I don't want to keep previous state in the map so that's why I am removing everything from the map and then adding new state from new files to it.

Now if there is any error in read method then the bug happens bcoz I have already cleared all the data in my data map which will have empty map which is not what I want. Basically if there is any error then I would like to preserve previous state in the data map. How can I resolve this issue in my above current design.

Note: I am using golang concurrent map

答案1

得分: 3

我认为你的设计过于复杂了。它可以更简单地解决问题,同时实现你所期望的所有好处:

  • 并发访问安全
  • 检测到的更改会重新加载
  • 访问配置会返回最新成功加载的配置
  • 最新的配置始终可以立即访问,即使由于检测到的更改而加载新配置需要很长时间
  • 如果加载新配置失败,将保留先前的“快照”,并保持当前配置
  • 作为额外的好处,它更简单,甚至不使用第三方库

让我们看看如何实现这个:


首先,创建一个CustomerConfig结构体,用于保存你想要缓存的所有内容(这就是“快照”):

type CustomerConfig struct {
    Data     map[string]bool

    // 如果需要,可以添加其他属性:
    LoadedAt time.Time
}

提供一个函数来加载你想要缓存的配置。注意:这个函数是无状态的,它不访问/操作包级变量:

func loadConfig() (*CustomerConfig, error) {
    cfg := &CustomerConfig{
        Data:     map[string]bool{},
        LoadedAt: time.Now(),
    }

    // 加载文件并填充cfg.Data的逻辑
    // 如果发生错误,返回错误

    // 如果加载成功,返回配置
    return cfg, nil
}

现在,让我们创建我们的“缓存管理器”。缓存管理器存储实际/当前的配置(即快照),并提供对它的访问。为了安全并发访问(和更新),我们使用sync.RWMutex。它还具有停止管理器的方法(停止并发刷新):

type ConfigCache struct {
    configMu sync.RWMutex
    config   *CustomerConfig
    closeCh  chan struct{}
}

创建缓存时加载初始配置。同时启动一个goroutine来定期检查更改:

func NewConfigCache() (*ConfigCache, error) {
    cfg, err := loadConfig()
    if err != nil {
        return nil, fmt.Errorf("加载初始配置失败:%w", err)
    }

    cc := &ConfigCache{
        config:  cfg,
        closeCh: make(chan struct{}),
    }

    // 启动goroutine来定期检查更改并加载新配置
    go cc.refresher()

    return cc, nil
}

refresher()函数定期检查更改,如果检测到更改,则调用loadConfig()加载新数据进行缓存,并将其存储为当前/实际配置(在锁定configMu时)。它还监视closeCh以停止刷新:

func (cc *ConfigCache) refresher() {
    ticker := time.NewTicker(1 * time.Minute) // 每分钟
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            // 检查是否有更改
            changes := false // 检测更改的逻辑
            if !changes {
                continue // 没有更改,继续
            }

            // 有更改!加载新配置:
            cfg, err := loadConfig()
            if err != nil {
                log.Printf("加载配置失败:%v", err)
                continue // 保留先前的配置
            }

            // 应用/存储新配置
            cc.configMu.Lock()
            cc.config = cfg
            cc.configMu.Unlock()

        case <-cc.closeCh:
            return
        }
    }
}

关闭缓存管理器(即刷新的goroutine)很简单:

func (cc *ConfigCache) Stop() {
    close(cc.closeCh)
}

最后缺失的部分是如何访问当前配置。这是一个简单的GetConfig()方法(它也使用configMu,但是以只读模式):

func (cc *ConfigCache) GetConfig() *CustomerConfig {
    cc.configMu.RLock()
    defer cc.configMu.RUnlock()
    return cc.config
}

这是如何使用它的示例:

cc, err := NewConfigCache()
if err != nil {
    // 决定如何处理:重试、终止等
}

// 在你的应用程序中的任何时间、任何地方需要实际(最新)配置的地方:

cfg := cc.GetConfig()
// 使用cfg

在关闭应用程序之前或者你想停止刷新时),可以调用`cc.Stop()`

  [1]: https://pkg.go.dev/sync#RWMutex


<details>
<summary>英文:</summary>

I think your design is over complicated. It can be solved much simpler, which gives all the benefits you desire:

- safe for concurrent access
- detected changes are reloaded
- accessing the config gives you the most recent, successfully loaded config
- the most recent config is always, immediately accessible, even if loading a new config due to detected changes takes long
- if loading new config fails, the previous &quot;snapshot&quot; is kept and remains the current
- as a bonus, it&#39;s much simpler and doesn&#39;t even use 3rd party libs

Let&#39;s see how to achieve this:

---

Have a `CustomerConfig` struct holding everything you want to cache (this is the &quot;snapshot&quot;):

    type CustomerConfig struct {
    	Data map[string]bool
    
    	// Add other props if you need:
    	LoadedAt time.Time
    }

Provide a function that loads the config you wish to cache. Note: this function is stateless, it does not access / operate on package level variables:

    func loadConfig() (*CustomerConfig, error) {
    	cfg := &amp;CustomerConfig{
    		Data:     map[string]bool{},
    		LoadedAt: time.Now(),
    	}
    
    	// Logic to load files, and populate cfg.Data
    	// If an error occurs, return it
    
    	// If loading succeeds, return the config
    	return cfg, nil
    }

Now let&#39;s create our &quot;cache manager&quot;. The cache manager stores the actual / current config (the snapshot), and provides access to it. For safe concurrent access (and update), we use a [`sync.RWMutex`][1]. Also has means to stop the manager (to stop the concurrent refreshing):

    type ConfigCache struct {
    	configMu sync.RWMutex
    	config   *CustomerConfig
    	closeCh  chan struct{}
    }

Creating a cache loads the initial config. Also launches a goroutine that will be responsible to periodically check for changes.

    func NewConfigCache() (*ConfigCache, error) {
    	cfg, err := loadConfig()
    	if err != nil {
    		return nil, fmt.Errorf(&quot;loading initial config failed: %w&quot;, err)
    	}
    
    	cc := &amp;ConfigCache{
    		config:  cfg,
    		closeCh: make(chan struct{}),
    	}
    
    	// launch goroutine to periodically check for changes, and load new configs
    	go cc.refresher()
    
    	return cc, nil
    }

The `refresher()` periodically checks for changes, and if changes are detected, calls `loadConfig()` to load new data to be cached, and stores it as the current / actual config (while locking `configMu`). It also monitors `closeCh` to stop if that is requested:

    func (cc *ConfigCache) refresher() {
    	ticker := time.NewTicker(1 * time.Minute) // Every minute
    	defer ticker.Stop()
    
    	for {
    		select {
    		case &lt;-ticker.C:
    			// Check if there are changes
    			changes := false // logic to detect changes
    			if !changes {
    				continue // No changes, continue
    			}
    
    			// Changes! load new config:
    			cfg, err := loadConfig()
    			if err != nil {
    				log.Printf(&quot;Failed to load config: %v&quot;, err)
    				continue // Keep the previous config
    			}
    
    			// Apply / store new config
    			cc.configMu.Lock()
    			cc.config = cfg
    			cc.configMu.Unlock()

    		case &lt;-cc.closeCh:
    			return
    		}
    	}
    }

Closing the cache manager (the refresher goroutine) is as easy as:

    func (cc *ConfigCache) Stop() {
    	close(cc.closeCh)
    }

The last missing piece is how you access the current config. That&#39;s a simple `GetConfig()` method (that also uses `configMu`, but in read-only mode):

    func (cc *ConfigCache) GetConfig() *CustomerConfig {
    	cc.configMu.RLock()
    	defer cc.configMu.RUnlock()
    	return cc.config
    }

This is how you can use this:

    cc, err := NewConfigCache()
    if err != nil {
        // Decide what to do: retry, terminate etc.
    }

    // Where ever, whenever you need the actual (most recent) config in your app:

    cfg := cc.GetConfig()
    // Use cfg

Before you shut down your app (or you want to stop the refreshing), you may call `cc.Stop()`.


  [1]: https://pkg.go.dev/sync#RWMutex

</details>



# 答案2
**得分**: 1

`collectedData`添加了`RWMutex`以便由工作协程并发写保护

```go
type customerConfig struct {
   ...
   m sync.RWMutex
}

read方法中,不再更新map,而是让read方法只返回数据和错误

func (r *customerConfig) read(file string, bucket string) ([]CompModel, error) {
  // 读取文件数据并在有错误时返回
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return (nil, errs.Wrap(err))
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return (nil, errs.Wrap(err))
  }

  if pr.GetNumRows() == 0 {
    spn.Infof("Skipping %s due to 0 rows", file)
    return (nil, errors.New("No Data"))
  }

  var invMods = []CompModel{}
  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    var jsonData []CompModel
    err = json.Unmarshal(byteSlice, &jsonData)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    invMods = append(invMods, jsonData...)
  }
  return invMods, nil
}

然后在loadFiles中,你可以收集read方法返回的数据,如果没有错误,那么清除并更新map,否则保留旧数据不变

func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // 重置map,以便从新文件中获取新状态
  // r.data.Clear() <- 从这里删除clear
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  collectedData := []CompModel{}

  for _, file := range files {
    select {
    case <-ctx.Done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { <-sem }()

      data, err:= r.read(spn, file, bucket)
      if err != nil {
        return err
      }

      r.m.Lock()
      append(collectedData, data...)
      r.m.Unlock()
      return nil
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }

  r.data.Clear()
  for i := range collectedData {
    key := strconv.FormatInt(collectedData[i].ProductID, 10) + ":" + strconv.Itoa(int(collectedData[i].Iaz))
    hasInventory := false
    if collectedData[i].Available > 0 {
      hasInventory = true
    }
    r.data.Set(key, hasInventory)
  }

  return nil
}

**注意:**由于代码无法运行,只是更新了参考方法,并且我没有为更新切片添加互斥锁,你可能需要处理这种情况。


只需使用3个函数即可实现相同的功能-detect、read、load。detect函数将按照一定的时间间隔检查新文件,并将其推送到delta通道中(如果有的话)。load函数将从delta通道获取文件路径,并调用read方法获取数据和错误,然后检查是否没有错误,如果是,则清除map并更新新内容,否则记录错误。因此,你将有2个goroutine和1个由load例程调用的函数。

package main

import (
  "fmt"
  "time"
  "os"
  "os/signal"
  "math/rand"
)

func main() {
  fmt.Println(">>>", center("STARTED", 30), "<<<")

  c := &Config{
    InitialPath: "Old Path",
    DetectInterval: 3000,
  }
  c.start()
  fmt.Println(">>>", center("ENDED", 30), "<<<")
}

// https://stackoverflow.com/questions/41133006/how-to-fmt-printprint-this-on-the-center
func center(s string, w int) string {
    return fmt.Sprintf("%[1]*s", -w, fmt.Sprintf("%[1]*s", (w + len(s))/2, s))
}

type Config struct {
  deltaCh chan string
  ticker *time.Ticker
  stopSignal chan os.Signal
  InitialPath string
  DetectInterval time.Duration
}

func (c *Config) start() {
  c.stopSignal = make(chan os.Signal, 1)
  signal.Notify(c.stopSignal, os.Interrupt)

  c.ticker = time.NewTicker(c.DetectInterval * time.Millisecond)
  c.deltaCh = make(chan string, 1)
  go c.detect()
  go c.load()
  if c.InitialPath != "" {
    c.deltaCh <- c.InitialPath
  }
  <- c.stopSignal
  c.ticker.Stop()
}

// 检测新文件
func (c *Config) detect() {
  for {
    select {
      case <- c.stopSignal:
        return
      case <- c.ticker.C:
        fmt.Println(">>>", center("DETECT", 30), "<<<")
        c.deltaCh <- fmt.Sprintf("PATH %f", rand.Float64() * 1.5)
    }
  }
}
// 读取文件
func read(path string) (map[string]int, error) {
  data := make(map[string]int)
  data[path] = 0
  fmt.Println(">>>", center("READ", 30), "<<<")
  fmt.Println(path)
  return data, nil
}
// 加载文件
func (c *Config) load() {
  for {
    select {
      case <- c.stopSignal:
        return
      case path := <- c.deltaCh:
        fmt.Println(">>>", center("LOAD", 30), "<<<")
        data, err := read(path)
        if err != nil {
          fmt.Println("Log Error")
        } else {
          fmt.Println("Success", data)
        }
        fmt.Println()
    }
  }
}

**注意:**示例代码中未包含map,可以轻松更新以包含map。

英文:

Added RWMutex for collectedData concurrent write protecting by worker goroutine

type customerConfig struct {
   ...
   m sync.RWMutex
}

Instead of updating map in read method let read method just return the data and error

func (r *customerConfig) read(file string, bucket string) ([]CompModel, error) {
  // read file data and return with error if any
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return (nil, errs.Wrap(err))
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return (nil, errs.Wrap(err))
  }

  if pr.GetNumRows() == 0 {
    spn.Infof(&quot;Skipping %s due to 0 rows&quot;, file)
    return (nil, errors.New(&quot;No Data&quot;))
  }

  var invMods = []CompModel{}
  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    if len(rows) &lt;= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    var jsonData []CompModel
    err = json.Unmarshal(byteSlice, &amp;jsonData)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    invMods = append(invMods, jsonData...)
  }
  return invMods, nil
}

And then loadFiles you can collect the data return by read
method and if no error only then clear and update the map else
leave the old data as it was before

func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  // r.data.Clear() &lt;- remove the clear from here
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  collectedData := []CompModel{}

  for _, file := range files {
    select {
    case &lt;-ctx.Done():
      break
    case sem &lt;- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { &lt;-sem }()

      data, err:= r.read(spn, file, bucket)
      if err != nil {
        return err
      }

      r.m.Lock()
      append(collectedData, data...)
      r.m.Unlock()
      return nil
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }

  r.data.Clear()
  for i := range collectedData {
    key := strconv.FormatInt(collectedData[i].ProductID, 10) + &quot;:&quot; + strconv.Itoa(int(collectedData[i].Iaz))
    hasInventory := false
    if collectedData[i].Available &gt; 0 {
      hasInventory = true
    }
    r.data.Set(key, hasInventory)
  }

  return nil
}

Note: Since the code is not runnable just updated methods for reference and I have not include mutex lock for updating the slice you may need to handle for the case.


The same can be achieved with just 3 functions - detect, read, load, detect will check for new files by interval and push to delta channel if found any, load will get file path to read from delta channel and call read method to get the data and error then checks if no error then clear the map and update with new content else log the error, so you would have 2 go routines and 1 function which would be called by load routine

package main

import (
  &quot;fmt&quot;

  &quot;time&quot;
  &quot;os&quot;
  &quot;os/signal&quot;
  &quot;math/rand&quot;
)

func main() {
  fmt.Println(&quot;&gt;&gt;&gt;&quot;, center(&quot;STARTED&quot;, 30), &quot;&lt;&lt;&lt;&quot;)

  c := &amp;Config{
    InitialPath: &quot;Old Path&quot;,
    DetectInterval: 3000,
  }
  c.start()
  fmt.Println(&quot;&gt;&gt;&gt;&quot;, center(&quot;ENDED&quot;, 30), &quot;&lt;&lt;&lt;&quot;)
}

// https://stackoverflow.com/questions/41133006/how-to-fmt-printprint-this-on-the-center
func center(s string, w int) string {
    return fmt.Sprintf(&quot;%[1]*s&quot;, -w, fmt.Sprintf(&quot;%[1]*s&quot;, (w + len(s))/2, s))
}

type Config struct {
  deltaCh chan string
  ticker *time.Ticker
  stopSignal chan os.Signal
  InitialPath string
  DetectInterval time.Duration
}

func (c *Config) start() {
  c.stopSignal = make(chan os.Signal, 1)
  signal.Notify(c.stopSignal, os.Interrupt)

  c.ticker = time.NewTicker(c.DetectInterval * time.Millisecond)
  c.deltaCh = make(chan string, 1)
  go c.detect()
  go c.load()
  if c.InitialPath != &quot;&quot; {
    c.deltaCh &lt;- c.InitialPath
  }
  &lt;- c.stopSignal
  c.ticker.Stop()
}

// Detect New Files
func (c *Config) detect() {
  for {
    select {
      case &lt;- c.stopSignal:
        return
      case &lt;- c.ticker.C:
        fmt.Println(&quot;&gt;&gt;&gt;&quot;, center(&quot;DETECT&quot;, 30), &quot;&lt;&lt;&lt;&quot;)
        c.deltaCh &lt;- fmt.Sprintf(&quot;PATH %f&quot;, rand.Float64() * 1.5)
    }
  }
}
// Read Files
func read(path string) (map[string]int, error) {
  data := make(map[string]int)
  data[path] = 0
  fmt.Println(&quot;&gt;&gt;&gt;&quot;, center(&quot;READ&quot;, 30), &quot;&lt;&lt;&lt;&quot;)
  fmt.Println(path)
  return data, nil
}
// Load Files
func (c *Config) load() {
  for {
    select {
      case &lt;- c.stopSignal:
        return
      case path := &lt;- c.deltaCh:
        fmt.Println(&quot;&gt;&gt;&gt;&quot;, center(&quot;LOAD&quot;, 30), &quot;&lt;&lt;&lt;&quot;)
        data, err := read(path)
        if err != nil {
          fmt.Println(&quot;Log Error&quot;)
        } else {
          fmt.Println(&quot;Success&quot;, data)
        }
        fmt.Println()
    }
  }
}

Note: Not included map in sample code it can be easily updated to include map

答案3

得分: 0

只需分配一个新的地图。像这样:

var mu sync.Mutex
before := map[string]string{} // 读取之前的某个地图

after := make(map[string]string)

// 读取文件并填充`after`地图

mu.Lock()
before = after
mu.Unlock()
英文:

Just allocate new one map. Like this:

var mu sync.Mutex
before := map[string]string{} // Some map before reading

after := make(map[string]string)

// Read files and fill `after` map

mu.Lock()
before = after
mu.Unlock()

答案4

得分: 0

read方法中,不要在loadFile方法中清除地图,而是像这样做:

func (r *customerConfig) read(file string, bucket string) error {
  m := cmap.New() // 创建一个新的地图
  // ...
  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return errs.Wrap(err)
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return errs.Wrap(err)
    }
    var invMods []CompModel
    err = json.Unmarshal(byteSlice, &invMods)
    if err != nil {
      return errs.Wrap(err)
    }

    for i := range invMods {
      key := strconv.FormatInt(invMods[i].ProductID, 10) + ":" + strconv.Itoa(int(invMods[i].Iaz))
      hasInventory := false
      if invMods[i].Available > 0 {
        hasInventory = true
      }
      m.Set(key, hasInventory)
    }
  }
  r.data = m // 使用新的地图
  return nil
}
英文:

Instead of clearing the map in loadFile method, do something like this in read

func (r *customerConfig) read(file string, bucket string) error {
m := cmap.New() // create a new map
// ...
for {
rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
if err != nil {
return errs.Wrap(err)
}
if len(rows) &lt;= 0 {
break
}
byteSlice, err := json.Marshal(rows)
if err != nil {
return errs.Wrap(err)
}
var invMods []CompModel
err = json.Unmarshal(byteSlice, &amp;invMods)
if err != nil {
return errs.Wrap(err)
}
for i := range invMods {
key := strconv.FormatInt(invMods[i].ProductID, 10) + &quot;:&quot; + strconv.Itoa(int(invMods[i].Iaz))
hasInventory := false
if invMods[i].Available &gt; 0 {
hasInventory = true
}
m.Set(key, hasInventory)
}
}
r.data = m // Use the new map
return nil
}
</details>

huangapple
  • 本文由 发表于 2022年4月19日 04:14:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/71916607.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定