Go协程泄漏在哪里?

huangapple go评论106阅读模式
英文:

Where is the Go routine leak?

问题

我正在尝试同时运行多个任务,并在任何错误发生时立即返回,而不必等待所有例程返回。代码如下所示。我已经删除了一些噪音,以便更容易理解,但如果泄漏不明显,我可以发布完整的代码。
值得注意的是,我正在将其部署在Google App Engine上。我无法在我的机器上重现泄漏,但是当我在“// Consume the results”注释后替换并发性时,应用程序正常工作,尽管我不明白为什么,因为代码在我看来是正确的。

package main

import "fmt"
import "sync"
import "errors"

func main() {
    indexes := []int{1, 2, 3, 4, 5, 6, 7}
    devCh := make(chan int, 7)
    stopCh := make(chan struct{})
    errCh := make(chan error, 7)
    var wg sync.WaitGroup
    go func() {
        for _, sub := range indexes {
            wg.Add(1)
            go func(sub int) {
                defer wg.Done()
                // some code which creates other
                // wait groups and spans other go routines
                // handle errors
                if sub == 99 { // unreachable 
                    errCh <- errors.New("new error")
                    
                }
            }(sub)
            select {
            // If there is any error we better stop the
            // loop
            case <-stopCh:
                return
            default:
            }
            devCh <- sub
        }
        wg.Wait()
        close(devCh)
    }()
    // Consume the results
    var results []int
    var wt sync.WaitGroup
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devCh {
            results = append(results, s)
        }
        return
    }()
    done := make(chan struct{})
    go func() {
        wt.Wait()
        close(done)
    }()

L:
    for {
        select {
        case err := <-errCh:
            fmt.Printf("error was %v", err)
            close(stopCh)
            return
        case <-done:
            break L
        default:
        }
    }
    fmt.Printf("all done, %v", results)
}

编辑:添加了一些可工作的代码。

编辑:添加了更接近真实代码的代码,这可能解释了需要使用for循环的原因。

package main

import "fmt"
import "sync"
import "errors"

func main() {
    indexes := []int{1, 2, 3, 4, 5, 6, 7}
    indexesString := []string{"a", "b", "c", "d"}
    devChS := make(chan string, 1000)

    devCh := make(chan int, 7)
    stopCh := make(chan struct{})
    errCh := make(chan error, 7)
    var wg sync.WaitGroup
    go func() {
        for _, sub := range indexes {
            wg.Add(1)
            go func(sub int) {
                defer wg.Done()
                // some code which creates other
                // wait groups and spans other go routines
                // handle errors
                if sub == 99 { // unreachable
                    errCh <- errors.New("new error")

                }
                wg.Add(1)
                go func(sub int) {
                    defer wg.Done()
                    for _, s := range indexesString {
                        devChS <- fmt.Sprintf("%s %s", s, sub)

                    }

                    return
                }(sub)
            }(sub)
            select {
            // If there is any error we better stop the
            // loop
            case <-stopCh:
                return
            default:
            }
            devCh <- sub
        }
        wg.Wait()
        close(devCh)
        close(devChS)
    }()
    // Consume the results
    var results = struct {
        integers []int
        strings  []string
    }{}
    var wt sync.WaitGroup
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devCh {
            results.integers = append(results.integers, s)
        }
        return
    }()
    wt.Add(1)
    go func() {
        defer wt.Done()
        for s := range devChS {
            results.strings = append(results.strings, s)
        }
        return
    }()
    done := make(chan struct{})
    go func() {
        wt.Wait()
        close(done)
    }()

L:
    for {
        select {
        case err := <-errCh:
            fmt.Printf("error was %v", err)
            close(stopCh)
            return
        case <-done:
            break L
        default:
        }
    }
    fmt.Printf("all done, can return the results: %v", results)
}
英文:

I'm trying to run several tasks concurrently and return immediately if there is any error without to wait for all of the routines to return. The code looks as below. I've stripped out the noise to make it easier to digest but I can post the full code if the leak is not obvious.
It's worth to note that I'm deploying this on google app engine. I can't reproduce the leak on my machine but when I replace the concurrency after // Consume the results comment the app is working fine, though I don't understand why because the code looks correct to me.

package main
import &quot;fmt&quot;
import &quot;sync&quot;
import &quot;errors&quot;
func main() {
indexes := []int{1, 2, 3, 4, 5, 6, 7}
devCh := make(chan int, 7)
stopCh := make(chan struct{})
errCh := make(chan error, 7)
var wg sync.WaitGroup
go func() {
for _, sub := range indexes {
wg.Add(1)
go func(sub int) {
defer wg.Done()
// some code which creates other
// wait groups and spans other go routines
// handle errors
if sub == 99 { // unreachable 
errCh &lt;- errors.New(&quot;new error&quot;)
}
}(sub)
select {
// If there is any error we better stop the
// loop
case &lt;-stopCh:
return
default:
}
devCh &lt;- sub
}
wg.Wait()
close(devCh)
}()
// Consume the results
var results []int
var wt sync.WaitGroup
wt.Add(1)
go func() {
defer wt.Done()
for s := range devCh {
results = append(results, s)
}
return
}()
done := make(chan struct{})
go func() {
wt.Wait()
close(done)
}()
L:
for {
select {
case err := &lt;-errCh:
fmt.Printf(&quot;error was %v&quot;, err)
close(stopCh)
return
case &lt;-done:
break L
default:
}
}
fmt.Printf(&quot;all done, %v&quot;, results)
}

Edit: added some working code.

Edit: added code closer to the real code which may explain the need of the for loop.

package main
import &quot;fmt&quot;
import &quot;sync&quot;
import &quot;errors&quot;
func main() {
indexes := []int{1, 2, 3, 4, 5, 6, 7}
indexesString := []string{&quot;a&quot;, &quot;b&quot;, &quot;c&quot;, &quot;d&quot;}
devChS := make(chan string, 1000)
devCh := make(chan int, 7)
stopCh := make(chan struct{})
errCh := make(chan error, 7)
var wg sync.WaitGroup
go func() {
for _, sub := range indexes {
wg.Add(1)
go func(sub int) {
defer wg.Done()
// some code which creates other
// wait groups and spans other go routines
// handle errors
if sub == 99 { // unreachable
errCh &lt;- errors.New(&quot;new error&quot;)
}
wg.Add(1)
go func(sub int) {
defer wg.Done()
for _, s := range indexesString {
devChS &lt;- fmt.Sprintf(&quot;%s %s&quot;, s, sub)
}
return
}(sub)
}(sub)
select {
// If there is any error we better stop the
// loop
case &lt;-stopCh:
return
default:
}
devCh &lt;- sub
}
wg.Wait()
close(devCh)
close(devChS)
}()
// Consume the results
var results = struct {
integers []int
strings  []string
}{}
var wt sync.WaitGroup
wt.Add(1)
go func() {
defer wt.Done()
for s := range devCh {
results.integers = append(results.integers, s)
}
return
}()
wt.Add(1)
go func() {
defer wt.Done()
for s := range devChS {
results.strings = append(results.strings, s)
}
return
}()
done := make(chan struct{})
go func() {
wt.Wait()
close(done)
}()
L:
for {
select {
case err := &lt;-errCh:
fmt.Printf(&quot;error was %v&quot;, err)
close(stopCh)
return
case &lt;-done:
break L
default:
}
}
fmt.Printf(&quot;all done, can return the results: %v&quot;, results)
}

答案1

得分: 1

tl;dr:一个只重复非阻塞检查直到成功的循环可能会导致难以诊断的问题(至少会过度使用CPU);使用阻塞检查可以解决这个问题。

我对你的情况的细节不太确定;我写了一个类似你的循环,在 Playground 上始终会出现“process took too long”的错误,但在本地运行时却可以完成。

正如我评论中提到的,我也建议采用更简单的设计。


Go 只有有限的 goroutine 抢占机制:只有在发生阻塞操作(如 I/O、通道操作或等待锁)时,运行的线程才会将控制权让给 goroutine 调度器。

因此,当 GOMAXPROCS=1 时,如果(唯一的)运行线程开始循环,其他代码可能无法获得运行的机会。

for { select { ...default: } } 可以开始一个循环,检查通道中的项目,但不会放弃主线程的控制权,以便另一个 goroutine 可以写入一个项目。当 GOMAXPROCS 大于 1 时,其他代码仍然可以运行,但当 GOMAXPROCS 为 1(如在 App Engine 或 Playground 上)时,其他代码无法运行。这种行为不仅取决于 GOMAXPROCS,还取决于哪个 goroutine 先运行,这并没有明确定义。

为了避免这种情况,可以删除 default:,这样 select 就成为一个阻塞操作,当无法接收到项目时,它会将控制权让给调度器,从而允许其他代码运行。你可以将这个方法推广到其他可能需要循环进行非阻塞检查的情况;任何这样的情况都可能不断地占用资源,不断重新检查,而阻塞调用则不会这样。当 GOMAXPROCS>1 或运行时的有限抢占机制可以帮助你时,轮询(即重复检查)仍然可能比阻塞消耗更多的 CPU。

例如,这个示例在 Playground 上会出现“process took too long”的错误,尽管令人恼火的是,在我的机器上它总是可靠地完成:

package main

import "fmt"

func main() {
    c := make(chan struct{})
    go func() { c <- struct{}{} }()
    for {
        select {
        case <-c:
            fmt.Println("success")
            return
        default:
        }
    }
}

我无法确定是否还有其他问题,但类似示例的 hang 问题值得注意。

英文:

tl;dr: A loop that does nothing but repeat a non-blocking check until it succeeds can cause hard-to-diagnose trouble (at a minimum, it can overuse CPU); using a blocking check can fix it.

I'm not all that sure about the details of your case; I wrote a loop like yours that consistently hangs with "process took too long" on the Playground, but when I run it locally it does complete.

As I commented, I'd aim for a simpler design, too.


Go only has limited pre-emption of running goroutines: the running thread only yields control to the goroutine scheduler when a blocking operation (an like I/O or channel op or waiting to take a lock) happens.

So with GOMAXPROCS=1, if the (one) running thread starts looping, nothing else will necessarily get a chance to run.

A for { select { ...default: } } can therefore start a loop checking for items in a channel but never give up control of the main thread so that another goroutine can write an item. Other code gets to run anyway when when GOMAXPROCS is over 1, but not when it's 1 as it is on App Engine (or the Playground). The behavior depends not only on GOMAXPROCS, but on which goroutine happens to run first, which isn't necessarily defined.

To avoid that situation, remove the default: so the select is a blocking operation that yields to the scheduler when it can't receive an item, allowing other code to run. You can generalize this to other cases where you might loop doing a nonblocking check; any of them could keep resources busy constantly rechecking when a blocking call would not. When GOMAXPROCS&gt;1 or the runtime's limited preemption saves you, polling (as repeated checking is called) can still consume more CPU than blocking.

For example, this fails with "process took too long" on the Playground, though annoyingly it completes reliably on my machine:

package main
import &quot;fmt&quot;
func main() {
c := make(chan struct{})
go func() { c &lt;- struct{}{} }()
for {
select {
case &lt;-c:
fmt.Println(&quot;success&quot;)
return
default:
}
}
} 

I can't tell if there are other problems, but the hang for a pattern similar to the sample is noteworthy.

huangapple
  • 本文由 发表于 2015年3月26日 09:14:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/29269416.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定