英文:
Golang Memory Leak Concerning Goroutines
问题
我有一个持续运行的Go程序,完全依赖于goroutines和一个manager
线程。主线程只是调用goroutines,其他时间处于睡眠状态。
程序存在内存泄漏问题。程序使用的内存越来越多,直到耗尽所有的16GB RAM + 32GB SWAP,然后每个goroutine都会发生panic。实际上,是操作系统的内存导致了panic,通常是在尝试执行anotherapp
时出现fork/exec ./anotherapp: cannot allocate memory
的panic。
当这种情况发生时,所有的工作线程都会发生panic,然后被恢复并重新启动。因此,每个goroutine都会发生panic,被恢复并重新启动...此时内存使用量不会减少,仍然保持在48GB,即使现在几乎没有分配任何内存。这意味着所有的goroutine都将一直发生panic,因为内存永远不够,直到整个可执行文件被完全终止和重新启动。
整个程序大约有50,000行代码,但实际上有问题的区域如下所示:
type queue struct {
identifier string
type bool
}
func main() {
// 设置可以运行的goroutine数量
var xthreads int32 = 10
var usedthreads int32
runtime.GOMAXPROCS(14)
ready := make(chan *queue, 5)
// 启动管理器goroutine,在后台准备好待处理的标识符,始终保持5个等待就绪
go manager(ready)
// 开始创建goroutine以在就绪时进行处理
for obj := range ready { // 循环遍历"ready"通道,当没有内容时等待
// 这一部分使用原子操作而不是阻塞通道,旨在尝试阻止内存泄漏,但并没有起作用
for atomic.LoadInt32(&usedthreads) >= xthreads {
time.Sleep(time.Second)
}
debug.FreeOSMemory() // 尝试清理内存,但也没有停止泄漏
atomic.AddInt32(&usedthreads, 1) // 标记goroutine已启动
// 解除obj的泄漏,可能是不必要的,但为了安全起见
copy := new(queue)
copy.identifier = unleak.String(obj.identifier) // unleak是一个第三方包,用于复制字符串
copy.type = obj.type
go runit(copy, &usedthreads) // 启动处理线程
}
fmt.Println(`END`) // 这应该永远不会发生,因为通道从未关闭
}
func manager(ready chan *queue) {
// 此线程与另一个服务器通信,并填充"ready"通道
}
// 这是goroutine
func runit(obj *queue, threadcount *int32) {
defer func() {
if r := recover(); r != nil {
// 发生panic
erstring := fmt.Sprint(r)
reportFatal(obj.identifier, erstring)
} else {
// 成功完成
reportDone(obj.identifier)
}
atomic.AddInt32(threadcount, -1) // 标记goroutine已完成
}()
do(obj) // 此函数执行实际处理
}
据我所见,当do
函数(最后一行)结束时,无论是已完成还是发生panic,runit
函数都会结束,从而完全结束了goroutine,这意味着该goroutine的所有内存现在应该被释放。但实际情况并非如此。发生的情况是,该应用程序使用的内存越来越多,直到无法正常运行,所有的runit
goroutine都发生panic,但内存并没有减少。
分析并没有发现任何可疑的情况。泄漏似乎超出了分析器的范围。
英文:
I have a Go program that runs continuously and relies entirely on goroutines + 1 manager
thread. The main thread simply calls goroutines and otherwise sleeps.
There is a memory leak. The program uses more and more memory until it drains all 16GB RAM + 32GB SWAP and then each goroutine panics. It is actually OS memory that causes the panic, usually the panic is fork/exec ./anotherapp: cannot allocate memory
when I try to execute anotherapp
.
When this happens all of the worker threads will panic and be recovered and restarted. So each goroutine will panic, be recovered and restarted... at which point the memory usage will not decrease, it remains at 48GB even though there is now virtually nothing allocated. This means all goroutines will always panic as there is never enough memory, until the entire executable is killed and restarted completely.
The entire thing is about 50,000 lines, but the actual problematic area is as follows:
type queue struct {
identifier string
type bool
}
func main() {
// Set number of gorountines that can be run
var xthreads int32 = 10
var usedthreads int32
runtime.GOMAXPROCS(14)
ready := make(chan *queue, 5)
// Start the manager goroutine, which prepared identifiers in the background ready for processing, always with 5 waiting to go
go manager(ready)
// Start creating goroutines to process as they are ready
for obj := range ready { // loops through "ready" channel and waits when there is nothing
// This section uses atomic instead of a blocking channel in an earlier attempt to stop the memory leak, but it didn't work
for atomic.LoadInt32(&usedthreads) >= xthreads {
time.Sleep(time.Second)
}
debug.FreeOSMemory() // Try to clean up the memory, also did not stop the leak
atomic.AddInt32(&usedthreads, 1) // Mark goroutine as started
// Unleak obj, probably unnecessary, but just to be safe
copy := new(queue)
copy.identifier = unleak.String(obj.identifier) // unleak is a 3rd party package that makes a copy of the string
copy.type = obj.type
go runit(copy, &usedthreads) // Start the processing thread
}
fmt.Println(`END`) // This should never happen as the channels are never closed
}
func manager(ready chan *queue) {
// This thread communicates with another server and fills the "ready" channel
}
// This is the goroutine
func runit(obj *queue, threadcount *int32) {
defer func() {
if r := recover(); r != nil {
// Panicked
erstring := fmt.Sprint(r)
reportFatal(obj.identifier, erstring)
} else {
// Completed successfully
reportDone(obj.identifier)
}
atomic.AddInt32(threadcount, -1) // Mark goroutine as finished
}()
do(obj) // This function does the actual processing
}
As far as I can see, when the do
function (last line) ends, either by having finished or having panicked, the runit
function then ends, which ends the goroutine entirely, which means all of the memory from that goroutine should now be free. This is now what happens. What happens is that this app just uses more and more and more memory until it becomes unable to function, all the runit
goroutines panic, and yet the memory does not decrease.
Profiling does not reveal anything suspicious. The leak appears to be outside of the profiler's scope.
答案1
得分: 2
请考虑反转模式,参见这里或以下代码:
package main
import (
"log"
"math/rand"
"sync"
"time"
)
// I do work
func worker(id int, work chan int) {
for i := range work {
// Work simulation
log.Printf("Worker %d, sleeping for %d seconds\n", id, i)
time.Sleep(time.Duration(rand.Intn(i)) * time.Second)
}
}
// Return some fake work
func getWork() int {
return rand.Intn(2) + 1
}
func main() {
wg := new(sync.WaitGroup)
work := make(chan int)
// 运行10个工作线程
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
worker(i, work)
wg.Done()
}(i)
}
// 主线程
for i := 0; i < 100; i++ {
work <- getWork()
}
// 发送信号表示没有更多的工作要做了
close(work)
// 等待工作线程退出
wg.Wait()
}
英文:
Please consider inverting the pattern, see here or below....
package main
import (
"log"
"math/rand"
"sync"
"time"
)
// I do work
func worker(id int, work chan int) {
for i := range work {
// Work simulation
log.Printf("Worker %d, sleeping for %d seconds\n", id, i)
time.Sleep(time.Duration(rand.Intn(i)) * time.Second)
}
}
// Return some fake work
func getWork() int {
return rand.Intn(2) + 1
}
func main() {
wg := new(sync.WaitGroup)
work := make(chan int)
// run 10 workers
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
worker(i, work)
wg.Done()
}(i)
}
// main "thread"
for i := 0; i < 100; i++ {
work <- getWork()
}
// signal there is no more work to be done
close(work)
// Wait for the workers to exit
wg.Wait()
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论