使用ticker定期从频繁更改的路径加载所有文件到内存中。

huangapple go评论102阅读模式
英文:

Use ticker to periodically load all the files in memory from a path which keeps changing frequently?

问题

我有一个应用程序,需要从两个不同的路径读取文件。在读取所有这些文件之后,我需要将它们加载到内存中的products映射中。

路径:

  • Full:这是在服务器启动期间需要加载到内存中的所有文件的路径。该路径大约有50个文件,每个文件的大小约为60MB。
  • Delta:这是一个路径,其中包含我们需要定期每1分钟加载到内存中的所有增量文件。这些文件只包含与完整路径文件的差异。该路径大约有60个文件,每个文件的大小约为20MB。

下面的代码watchDeltaPath在服务器启动时被调用以监视增量更改。它将从GetDeltaPath方法中获取增量路径,并从该路径中加载所有文件到内存中。这个增量路径每隔几分钟就会发生变化,我不能错过任何一个增量路径和该路径中的所有文件

loadAllFiles方法中加载所有文件到内存可能需要一些时间(大约5分钟),所以我正在尝试找到一种方法,不会错过任何新的增量路径(因为它每隔几分钟就会发生变化),并且能够周期性地从增量路径中再次加载所有这些文件到内存中,而且没有任何问题和高效。

我找到了下面的代码,它每1分钟运行一次,并每次查找新的增量路径,然后从该路径中加载所有文件到内存中。它工作得很好,但我认为这不是正确的做法。如果loadAllFiles方法花费超过10分钟来加载所有文件到内存中,而我的定时器每1分钟运行一次来查找新的增量路径,然后在该新路径中查找所有文件并加载到内存中,会发生什么?它会不会不断创建大量的后台线程,可能大幅增加CPU使用率?

type applicationRepository struct {
  client         customer.Client
  logger         log.Logger
  done           chan struct{}
  products       *cmap.ConcurrentMap
}

// this will be called only once
func (r *applicationRepository) watchDeltaPath() error {
	ticker := time.NewTicker(1 * time.Minute)
	go func() {
		select {
		case <-r.done:
			ticker.Stop()
			return
		case <-ticker.C:
			func() (result error) {
				trans := r.logger.StartTransaction(nil, "delta-changes", "")
				defer trans.End()
				defer func() {
					if result != nil {
						trans.Errorf("Recovered from error: %v")
					} else if err := recover(); err != nil {
						trans.Errorf("Recovered from panic: %v", err)
					}
				}()
				// get latest delta path everytime as it keeps changing every few minutes
				path, err := r.client.GetDeltaPath("delta")
				if err != nil {
					return err
				}
				// load all the files in memory in "products" map from that path
				err = r.loadAllFiles(path)
				if err != nil {
					return err
				}
				return nil
			}()
		}
	}()
	return nil
}

func (r *applicationRepository) Stop() {
	r.done <- struct{}{}
}

在生产环境中,如何以高效的方式实现这一点?

这是我对代码的尝试,展示了它的执行方式- https://go.dev/play/p/FS4-B0FWwTe

英文:

I have an application which needs to read files from two different path. After reading all these files, I need to load them up in memory in products map.

Path:

  • Full: This is the path which will have all files that we need to load up during server startup in memory. This path will have around 50 files and each file size is ~60MB.
  • Delta: This is the path which will have all the delta files that we need to load up in memory periodically every 1 minute. These files will only contain difference from the full path files. This path will have around 60 files and each file size is ~20MB.

Below code watchDeltaPath is called during server startup to watch for delta changes. It will get the delta path from GetDeltaPath method and from that path I need to load all the files in memory. This delta path keeps changing every few minutes and I cannot miss any one delta path and all the files in that path.

Loading all files in memory from loadAllFiles method can take some time (approx 5mins) so I am trying to find a way where I should not miss any new delta path (as it keeps changing every few minutes) and should be able to load all those files in memory from the delta path again and again periodically without any issue and efficiently.

I got the below code which runs every 1 minute and look for new delta path every time and then load all the files from that path in the memory. It works fine but I don't think this is the right approach to do it. What happens if loadAllFiles method takes more than 10 minutes to load all the files in memory and my ticker is running every 1 minute to look for new delta path and then find all the files in that new path and then load up in memory? Will it keep creating lot of background threads and maybe increase cpu-usage by a lot?

type applicationRepository struct {
  client         customer.Client
  logger         log.Logger
  done           chan struct{}
  products       *cmap.ConcurrentMap
}

// this will be called only once
func (r *applicationRepository) watchDeltaPath() error {
	ticker := time.NewTicker(1 * time.Minute)
	go func() {
		select {
		case &lt;-r.done:
			ticker.Stop()
			return
		case &lt;-ticker.C:
			func() (result error) {
				trans := r.logger.StartTransaction(nil, &quot;delta-changes&quot;, &quot;&quot;)
				defer trans.End()
				defer func() {
					if result != nil {
						trans.Errorf(&quot;Recovered from error: %v&quot;)
					} else if err := recover(); err != nil {
						trans.Errorf(&quot;Recovered from panic: %v&quot;, err)
					}
				}()
				// get latest delta path everytime as it keeps changing every few minutes
				path, err := r.client.GetDeltaPath(&quot;delta&quot;)
				if err != nil {
					return err
				}
				// load all the files in memory in &quot;products&quot; map from that path
				err = r.loadAllFiles(path)
				if err != nil {
					return err
				}
				return nil
			}()
		}
	}()
	return nil
}

func (r *applicationRepository) Stop() {
	r.done &lt;- struct{}{}
}

What is the best way to do this efficiently in prod?

Here is my play with code on how it is being executed - https://go.dev/play/p/FS4-B0FWwTe

答案1

得分: 1

根据评论中提到的“在生产环境中高效地完成这个任务的最佳方法”取决于许多因素,可能无法在像 Stack Overflow 这样的网站上得到答案。尽管如此,我可以提供一种方法,可以更容易地思考如何最好地解决这个问题。

下面的代码(playground; 相当粗糙且未经测试)演示了一种使用三个 Go 协程的方法:

  1. 检测新的增量路径并将其推送到缓冲通道
  2. 处理初始加载
  3. 等待初始加载完成,然后应用增量(请注意,这会处理在初始加载进行中发现的增量)

如上所述,问题中的细节不足以确定这是否是一个好方法。可能初始加载和增量可以同时运行而不会饱和 IO,但这需要进行测试(并且是一个相对较小的更改)。

// 模拟执行初始加载和处理增量的过程
package main

import (
	"fmt"
	"strconv"
	"sync"
	"time"
)

const deltaBuffer = 100
const initialLoadTime = time.Duration(time.Duration(1.5 * float32(time.Second)))
const deltaCheckFrequency = time.Duration(500 * time.Millisecond)

func main() {
	ar := NewApplicationRepository()
	time.Sleep(5 * time.Second)
	ar.Stop()
	fmt.Println(time.Now(), "complete")
}

type applicationRepository struct {
	deltaChan       chan string   // 可以是其他类型...
	initialLoadDone chan struct{} // 初始加载完成时关闭

	done chan struct{}
	wg   sync.WaitGroup
}

func NewApplicationRepository() *applicationRepository {
	ar := applicationRepository{
		deltaChan:       make(chan string, deltaBuffer),
		initialLoadDone: make(chan struct{}),
		done:            make(chan struct{}),
	}

	ar.wg.Add(3)
	go ar.detectNewDeltas()
	go ar.initialLoad()
	go ar.deltaLoad()

	return &ar
}

// detectNewDeltas - 监听新的增量路径
func (a *applicationRepository) detectNewDeltas() {
	defer a.wg.Done()
	var previousDelta string
	for {
		select {
		case <-time.After(deltaCheckFrequency):
			dp := a.getDeltaPath()
			if dp != previousDelta {
				select {
				case a.deltaChan <- dp:
				default:
					panic("channel full - no idea what to do here!")
				}
				previousDelta = dp
			}
		case <-a.done:
			return
		}
	}
}

// getDeltaPath 在实际应用中,这将检索增量路径
func (a *applicationRepository) getDeltaPath() string {
	return strconv.Itoa(time.Now().Second()) // 现在只返回当前秒数...
}

// initialLoad - 加载初始数据
func (a *applicationRepository) initialLoad() {
	defer a.wg.Done()
	defer close(a.initialLoadDone)
	time.Sleep(initialLoadTime) // 模拟初始加载所需的时间
}

// deltaLoad- 加载 detectNewDeltas 发现的增量
func (a *applicationRepository) deltaLoad() {
	defer a.wg.Done()
	fmt.Println(time.Now(), "deltaLoad started")

	// 等待初始加载完成后再执行任何操作
	<-a.initialLoadDone
	fmt.Println(time.Now(), "Initial Load Done")

	// 等待传入的增量并加载它们
	for {
		select {
		case newDelta := <-a.deltaChan:
			fmt.Println(time.Now(), newDelta)
		case <-a.done:
			return
		}
	}
}

// Stop - 通知加载器停止并等待完成
func (a *applicationRepository) Stop() {
	close(a.done)
	a.wg.Wait()
}

以上是代码的翻译部分。

英文:

As per the comments the "best way to do this efficiently in prod" depends on a lot of factors and is probably not answerable on a site like Stack overflow. Having said that I can suggest an approach that might make it easier to think about how the problem could be best solved.

The below code (playground; pretty rough and untested) demonstrates an approach with three go routines:

  1. Detects new delta paths and pushes them to a buffered channel
  2. Handles the initial load
  3. Waits for initial load to finish then applies deltas (note that this does process deltas found while the initial load is underway)

As mentioned above there is insufficient detail in the question to ascertain whether this a good approach. It may be that the initial load and deltas can run simultaneously without saturating the IO but that would require testing (and would be a relatively small change).

// Simulation of process to perform initial load and handle deltas
package main

import (
	&quot;fmt&quot;
	&quot;strconv&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

const deltaBuffer = 100
const initialLoadTime = time.Duration(time.Duration(1.5 * float32(time.Second)))
const deltaCheckFrequency = time.Duration(500 * time.Millisecond)

func main() {
	ar := NewApplicationRepository()
	time.Sleep(5 * time.Second)
	ar.Stop()
	fmt.Println(time.Now(), &quot;complete&quot;)
}

type applicationRepository struct {
	deltaChan       chan string   // Could be some other type...
	initialLoadDone chan struct{} // Closed when initial load finished

	done chan struct{}
	wg   sync.WaitGroup
}

func NewApplicationRepository() *applicationRepository {
	ar := applicationRepository{
		deltaChan:       make(chan string, deltaBuffer),
		initialLoadDone: make(chan struct{}),
		done:            make(chan struct{}),
	}

	ar.wg.Add(3)
	go ar.detectNewDeltas()
	go ar.initialLoad()
	go ar.deltaLoad()

	return &amp;ar
}

// detectNewDeltas - watch for new delta paths
func (a *applicationRepository) detectNewDeltas() {
	defer a.wg.Done()
	var previousDelta string
	for {
		select {
		case &lt;-time.After(deltaCheckFrequency):
			dp := a.getDeltaPath()
			if dp != previousDelta {
				select {
				case a.deltaChan &lt;- dp:
				default:
					panic(&quot;channel full - no idea what to do here!&quot;)
				}
				previousDelta = dp
			}
		case &lt;-a.done:
			return
		}
	}
}

// getDeltaPath in real application this will retrieve the delta path
func (a *applicationRepository) getDeltaPath() string {
	return strconv.Itoa(time.Now().Second()) // For now just return the current second..
}

// initialLoad - load the initial data
func (a *applicationRepository) initialLoad() {
	defer a.wg.Done()
	defer close(a.initialLoadDone)
	time.Sleep(initialLoadTime) // Simulate time taken for initial load
}

// deltaLoad- load deltas found by detectNewDeltas
func (a *applicationRepository) deltaLoad() {
	defer a.wg.Done()
	fmt.Println(time.Now(), &quot;deltaLoad started&quot;)

	// Wait for initial load to complete before doing anything
	&lt;-a.initialLoadDone
	fmt.Println(time.Now(), &quot;Initial Load Done&quot;)

	// Wait for incoming deltas and load them
	for {
		select {
		case newDelta := &lt;-a.deltaChan:
			fmt.Println(time.Now(), newDelta)
		case &lt;-a.done:
			return
		}
	}
}

// Stop - signal loader to stop and wait until this is done
func (a *applicationRepository) Stop() {
	close(a.done)
	a.wg.Wait()
}

答案2

得分: 1

我认为你想要的是Golang并发模式:Fan in,Fan out。你可以在Google中搜索它。

这是我创建的一个示例代码。你可以复制粘贴它,并创建名为fulldelta的文件夹,其中包含虚拟文件。

package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"
	"sync"
	"time"
)

type MyFile struct {
	full         map[string][]byte
	delta        map[string][]byte
	files        []string
	stopAutoLoad chan struct{}
}

func FilePathWalkDir(root string) ([]string, error) {
	var files []string
	err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
		if !info.IsDir() {
			files = append(files, path)
		}
		return nil
	})
	return files, err
}

func main() {
	mf := NewMyFile()
	mf.StartAutoLoadDelta(10 * time.Second)

	// time.Sleep(15 * time.Second)
	// mf.StopAutoLoadDelta()

	time.Sleep(50 * time.Minute)
	fmt.Println(len(mf.full))
	fmt.Println(len(mf.delta))
}

func NewMyFile() *MyFile {
	mf := &MyFile{
		full:         make(map[string][]byte),
		delta:        make(map[string][]byte),
		stopAutoLoad: make(chan struct{}),
	}

	mf.LoadFile("full", 0)
	mf.LoadFile("delta", 0)
	return mf
}

func (mf *MyFile) StartAutoLoadDelta(d time.Duration) {
	ticker := time.NewTicker(d)

	go func() {
		defer func() {
			ticker.Stop()
		}()

		i := 1
		for {
			select {
			case <-ticker.C:
				// mf.deleteCurrentDelta()
				mf.LoadFile("delta", i)
				fmt.Println("In Memory:")
				for k, v := range mf.delta {
					fmt.Printf("key : %s\t\tlen: %d\n", k, len(v))
				}
				i++
			case <-mf.stopAutoLoad:
				return
			}
		}
	}()
}

func (mf *MyFile) StopAutoLoadDelta() {
	fmt.Println("Stopping autoload Delta")
	mf.stopAutoLoad <- struct{}{}
}

func (mf *MyFile) deleteCurrentDelta() {
	for k, _ := range mf.delta {
		fmt.Println("data deleted: ", k)
		delete(mf.delta, k)
	}
}

type Fileinfo struct {
	name string
	data []byte
	err  error
}

func (mf *MyFile) LoadFile(prefix string, i int) {
	log.Printf("%s load : %d", prefix, i)
	files, err := FilePathWalkDir(prefix)
	if err != nil {
		panic("failed to open delta directory")
	}

	newFiles := make([]string, 0)

	for _, v := range files {
		if _, ok := mf.delta[v]; !ok {
			newFiles = append(newFiles, v)
		}
	}

	chanJobs := GenerateJobs(prefix, newFiles)
	chanResultJobs := ReadFiles(chanJobs, 8)
	counterTotal := 0
	counterSuccess := 0
	for results := range chanResultJobs {
		if results.err != nil {
			log.Printf("error creating file %s. stack trace: %s", results.name, results.err)
		} else {
			switch prefix {
			case "delta":
				mf.delta[results.name] = results.data
			case "full":
				mf.full[results.name] = results.data
			default:
				panic("not implemented")
			}
			counterSuccess++
		}
		counterTotal++
	}

	log.Printf("status jobs running: %d/%d", counterSuccess, counterTotal)
}

func GenerateJobs(prefix string, files []string) <-chan Fileinfo {
	chanOut := make(chan Fileinfo)

	go func() {
		for _, v := range files {
			chanOut <- Fileinfo{
				name: v,
			}
		}
		close(chanOut)
	}()

	return chanOut
}

func ReadFiles(chanIn <-chan Fileinfo, worker int) <-chan Fileinfo {
	chanOut := make(chan Fileinfo)

	var wg sync.WaitGroup

	wg.Add(worker)

	go func() {
		for i := 0; i < worker; i++ {
			go func(workerIndex int) {
				defer wg.Done()
				for job := range chanIn {
					log.Printf("worker %d is reading file %s", workerIndex, job.name)
					data, err := os.ReadFile(job.name)
					chanOut <- Fileinfo{
						name: job.name,
						data: data,
						err:  err,
					}
				}
			}(i)
		}
	}()

	go func() {
		wg.Wait()
		close(chanOut)
	}()
	return chanOut
}

希望对你有帮助!

英文:

I think you want Golang Concurrency Patterns : Fan in, Fan out. You can search it in Google.

This I create an example code. You can copy-paste it and create folder full and delta with dummy file inside it.

package main

import (
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;os&quot;
	&quot;path/filepath&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

type MyFile struct {
	full         map[string][]byte
	delta        map[string][]byte
	files        []string
	stopAutoLoad chan struct{}
}

func FilePathWalkDir(root string) ([]string, error) {
	var files []string
	err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
		if !info.IsDir() {
			files = append(files, path)
		}
		return nil
	})
	return files, err
}

func main() {
	mf := NewMyFile()
	mf.StartAutoLoadDelta(10 * time.Second)

	// time.Sleep(15 * time.Second)
	// mf.StopAutoLoadDelta()

	time.Sleep(50 * time.Minute)
	fmt.Println(len(mf.full))
	fmt.Println(len(mf.delta))
}

func NewMyFile() *MyFile {
	mf := &amp;MyFile{
		full:         make(map[string][]byte),
		delta:        make(map[string][]byte),
		stopAutoLoad: make(chan struct{}),
	}

	mf.LoadFile(&quot;full&quot;, 0)
	mf.LoadFile(&quot;delta&quot;, 0)
	return mf
}

func (mf *MyFile) StartAutoLoadDelta(d time.Duration) {
	ticker := time.NewTicker(d)

	go func() {
		defer func() {
			ticker.Stop()
		}()

		i := 1
		for {
			select {
			case &lt;-ticker.C:
				// mf.deleteCurrentDelta()
				mf.LoadFile(&quot;delta&quot;, i)
				fmt.Println(&quot;In Memory:&quot;)
				for k, v := range mf.delta {
					fmt.Printf(&quot;key : %s\t\tlen: %d\n&quot;, k, len(v))
				}
				i++
			case &lt;-mf.stopAutoLoad:
				return
			}
		}
	}()
}

func (mf *MyFile) StopAutoLoadDelta() {
	fmt.Println(&quot;Stopping autoload Delta&quot;)
	mf.stopAutoLoad &lt;- struct{}{}
}

func (mf *MyFile) deleteCurrentDelta() {
	for k, _ := range mf.delta {
		fmt.Println(&quot;data deleted: &quot;, k)
		delete(mf.delta, k)
	}
}

type Fileinfo struct {
	name string
	data []byte
	err  error
}

func (mf *MyFile) LoadFile(prefix string, i int) {
	log.Printf(&quot;%s load : %d&quot;, prefix, i)
	files, err := FilePathWalkDir(prefix)
	if err != nil {
		panic(&quot;failed to open delta directory&quot;)
	}

	newFiles := make([]string, 0)

	for _, v := range files {
		if _, ok := mf.delta[v]; !ok {
			newFiles = append(newFiles, v)
		}
	}

	chanJobs := GenerateJobs(prefix, newFiles)
	chanResultJobs := ReadFiles(chanJobs, 8)
	counterTotal := 0
	counterSuccess := 0
	for results := range chanResultJobs {
		if results.err != nil {
			log.Printf(&quot;error creating file %s. stack trace: %s&quot;, results.name, results.err)
		} else {
			switch prefix {
			case &quot;delta&quot;:
				mf.delta[results.name] = results.data
			case &quot;full&quot;:
				mf.full[results.name] = results.data
			default:
				panic(&quot;not implemented&quot;)
			}
			counterSuccess++
		}
		counterTotal++
	}

	log.Printf(&quot;status jobs running: %d/%d&quot;, counterSuccess, counterTotal)
}

func GenerateJobs(prefix string, files []string) &lt;-chan Fileinfo {
	chanOut := make(chan Fileinfo)

	go func() {
		for _, v := range files {
			chanOut &lt;- Fileinfo{
				name: v,
			}
		}
		close(chanOut)
	}()

	return chanOut
}

func ReadFiles(chanIn &lt;-chan Fileinfo, worker int) &lt;-chan Fileinfo {
	chanOut := make(chan Fileinfo)

	var wg sync.WaitGroup

	wg.Add(worker)

	go func() {
		for i := 0; i &lt; worker; i++ {
			go func(workerIndex int) {
				defer wg.Done()
				for job := range chanIn {
					log.Printf(&quot;worker %d is reading file %s&quot;, workerIndex, job.name)
					data, err := os.ReadFile(job.name)
					chanOut &lt;- Fileinfo{
						name: job.name,
						data: data,
						err:  err,
					}
				}
			}(i)
		}
	}()

	go func() {
		wg.Wait()
		close(chanOut)
	}()
	return chanOut
}

huangapple
  • 本文由 发表于 2022年2月26日 15:48:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/71274938.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定