英文:
Limit goroutines in a loop
问题
我需要帮助或至少一些提示。我正在尝试逐行从大文件(100MB-11GB)中读取数据,然后将一些数据存储到Map中。
var m map[string]string
// 昂贵的函数
func stress(s string, mutex sync.Mutex) {
// 一些非常耗时的操作....这就是为什么我想使用goroutines
mutex.Lock()
m[s] = s // 存储结果
mutex.Unlock()
}
func main() {
file, err := os.Open("somefile.txt")
if err != nil {
fmt.Println(err)
return
}
defer func() {
if err = file.Close(); err != nil {
fmt.Println(err)
return
}
}()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
go stress(scanner.Text(), mutex)
}
}
没有使用goroutines时,它可以正常工作,但速度很慢。正如你所看到的,文件很大,所以在循环中会有很多goroutines。这个事实带来了两个问题:
- 有时互斥锁不起作用,程序崩溃了(goroutines应该有多少个互斥锁?)
- 每次都会丢失一些数据(但程序不会崩溃)
我想我应该使用WaitGroup,但我无法理解应该如何使用它。我还猜想应该对goroutines设置一些限制,可能是一些计数器。最好在5-20个goroutines中运行它。
英文:
I need someome to help or at least any tip. I'm trying to read from large files (100mb - 11gb) line by line and then store some data into Map.
var m map[string]string
// expansive func
func stress(s string, mutex sync.Mutex) {
// some very cost operation .... that's why I want to use goroutines
mutex.Lock()
m展开收缩 = s // store result
mutex.Unlock()
}
func main() {
file, err := os.Open("somefile.txt")
if err != nil {
fmt.Println(err)
return
}
defer func() {
if err = file.Close(); err != nil {
fmt.Println(err)
return
}
}()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
go stress(scanner.Text(), mutex)
}
}
Without gouroutines it works fine but slow. As you can see, file is large so within loop there will be a lot of gouroutines. And that fact provides two problems:
- Sometimes mutex doesn't work properly. And programm crashes. (How many goroutines mutex suppose?)
- Everytime some data just lost (But programm doesn't crash)
I suppose I should use WaitGroup, but I cannot understand how it should be. Also I guess there should be some limit for goroutines, maybe some counter. It would be great to run it in 5-20 goroutines.
UPD. Yes, As @user229044 mentioned, I have to pass mutex by pointer. But the problem with limiting goroutines within loop still active.
UPD2. This is how I workaround this problem. I don't exactly understand which way program handle these goroutines and how memory and process time go. Also almost all commentors point on Map structure, but the main problem was to handle runtime of goroutines. How many goroutines spawn if it would be 10billions iterations of Scan() loop, and how goroutines store in RAM?
func stress(s string, mutex *sync.Mutex) {
// a lot of coslty ops
// ...
// ...
mutex.Lock()
m[where] = result // store result
mutex.Unlock()
wg.Done()
}
// main
for scanner.Scan() {
wg.Add(1)
go func(data string) {
stress(data, &mutex)
}(scanner.Text())
}
wg.Wait()
答案1
得分: 1
你的具体问题是你通过值复制了互斥锁。你应该传递一个互斥锁的指针,这样你的互斥锁的单个实例将被所有函数调用共享。你还创建了数量不受限制的Go协程,这将最终耗尽系统的内存。
然而,你可以创建任意数量的Go协程,但这只会浪费资源而没有任何收益,并且管理所有这些无用的Go协程可能会导致性能的净损失。当每个并行进程都必须等待对数据结构的串行访问时,增加并行性对你没有帮助,这在你的map
中是这种情况。在这里,sync.WaitGroup
和互斥锁是错误的方法。
相反,为了添加和控制并发,你需要一个带缓冲的通道和单个负责map
插入的Go协程。这样,你就有一个进程从文件中读取,一个进程将数据插入到map
中,将磁盘IO与map
插入分离开来。
类似这样的代码:
scanner := bufio.NewScanner(file)
ch := make(chan string, 10)
go func() {
for s := range ch {
m[s] = s
}
}()
for scanner.Scan() {
ch <- scanner.Text()
}
close(ch)
英文:
Your specific problem is that you're copying the mutex by value. You should be passing a pointer to the mutex, so that a single instance of your mutex is shared by all function invocations. You're also spawning an unbounded number of go routines, which will eventually exhaust your system's memory.
However, you can spawn as many Go routines as you want and you're only wasting resources for no gain, and juggling all of those useless Go routines will probably cause a net loss of performance. Increased parallelism can't help you when every parallel process has to wait for serialized access to a data structure, as is the case with your map
. sync.WaitGroup
and mutexes are the wrong approach here.
Instead, to add and control concurrency, you want a buffered channel and single Go routine responsible for map
inserts. This way you have one process reading from the file, and one process inserting into the map
, decoupling the disk IO from the map insertion.
Something like this:
scanner := bufio.NewScanner(file)
ch := make(chan string, 10)
go func() {
for s := range ch {
m展开收缩 = s
}
}()
for scanner.Scan() {
ch <- scanner.Text()
}
close(ch)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论