《The Go Programming Language》一书中的示例中存在goroutine泄漏问题。

huangapple go评论87阅读模式
英文:

goroutine leak in example of book The Go Programming Language

问题

我正在阅读《Go语言程序设计》这本书,书中有一个示例展示了goroutine泄漏的问题。

func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // 返回最快的响应
}

func request(hostname string) (response string) { /* ... */ }

我尝试解决这个泄漏问题,并得到了以下代码:

func request(url string) string {
    res, err := http.Get(url)
    if err == nil {
        body, err := io.ReadAll(res.Body)
        if err == nil {
            return string(body)
        } else {
            return err.Error()
        }
    } else {
        return err.Error()
    }
}

func getany() string {
    rsp := make(chan string, 3)
    done := make(chan struct{}, 3)
    doRequest := func(url string) {
        select {
            case rsp <- request(url):
                fmt.Printf("获取到 %s\n", url)
                done <- struct{}{}
            case <-done:
                fmt.Printf("停止 %s\n", url)
                return
        }
    }
    go doRequest("http://google.com")
    go doRequest("http://qq.com")
    go doRequest("http://baidu.com")
    return <-rsp
}

但似乎并没有解决问题?有什么建议吗?

英文:

I am reading The Go Programming Language book, there is an example of the book which demonstrates goroutine leaking


func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses &lt;- request(&quot;asia.gopl.io&quot;) }()
    go func() { responses &lt;- request(&quot;europe.gopl.io&quot;) }()
    go func() { responses &lt;- request(&quot;americas.gopl.io&quot;) }()
    return &lt;-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }

And I have tried to solve the leak, and got the following code

func request(url string) string {
    res, err := http.Get(url)
    if err == nil {
        body, err := io.ReadAll(res.Body)
        if err == nil {
            return string(body)
        } else {
            return err.Error()
        }
    } else {
        return err.Error()
    }
}

func getany() string {
    rsp := make(chan string, 3)
    done := make(chan struct{}, 3)
    doRequest := func(url string) {
        select {
            case rsp &lt;- request(url):
                fmt.Printf(&quot;get %s\n&quot;, url)
                done &lt;- struct{}{}
            case &lt;- done:
                fmt.Printf(&quot;stop %s\n&quot;, url)
                return
        }
    }
    go doRequest(&quot;http://google.com&quot;)
    go doRequest(&quot;http://qq.com&quot;)
    go doRequest(&quot;http://baidu.com&quot;)
    return &lt;-rsp
}

but it seems does not solve the problem? any suggestions?

答案1

得分: 6

在提供的代码中没有goroutine泄漏。mirroredQuery方法使用带缓冲的通道来收集结果并返回第一个答案。当前缓冲区具有足够的空间来收集所有goroutine的答案,即使其余的响应永远不会被读取。如果缓冲区小于N-1(其中N是生成的goroutine数量),情况将发生变化。在这种情况下,由mirroredQuery生成的一些goroutine将被阻塞,尝试向responses通道发送响应。重复调用mirroredQuery将导致阻塞的goroutine数量增加,这可能被称为goroutine泄漏。

以下是添加了日志并显示两种情况输出的代码。

func mirroredQuery() string {
	responses := make(chan string, 2)
	go func() {
		responses <- request("asia.gopl.io")
		log.Printf("Finished goroutine asia.gopl.io\n")
	}()
	go func() {
		responses <- request("europe.gopl.io")
		log.Printf("Finished goroutine europe.gopl.io\n")
	}()
	go func() {
		responses <- request("americas.gopl.io")
		log.Printf("Finished goroutine americas.gopl.io\n")
	}()
	return <-responses // 返回最快的响应
}
func request(hostname string) (response string) {
	duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
	time.Sleep(duration)
	return hostname
}

func main() {
	rand.Seed(time.Now().UnixNano())
	result := mirroredQuery()
	log.Printf("Fastest result for %s\n", result)
	time.Sleep(6*time.Second)
}

缓冲区大小大于等于N-1的输出结果:

2021/06/26 16:05:27 Finished europe.gopl.io
2021/06/26 16:05:27 Fastest result for europe.gopl.io
2021/06/26 16:05:28 Finished asia.gopl.io
2021/06/26 16:05:30 Finished americas.gopl.io

Process finished with the exit code 0

缓冲区大小小于N-1的输出结果:

2021/06/26 15:47:54 Finished europe.gopl.io
2021/06/26 15:47:54 Fastest result for europe.gopl.io

Process finished with the exit code 0

上述实现可以通过在第一个响应到达时终止goroutine来进行“改进”。这可能会降低所使用的资源数量。这严重依赖于request方法的具体实现。对于计算密集型场景,这是有意义的;对于取消HTTP请求的情况,可能会导致连接终止,因此下一个请求必须打开新的连接。对于高负载服务器而言,即使不使用响应,等待响应可能比取消请求更有效。

以下是使用context的改进实现。

func mirroredQuery() string {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	responses := make(chan string)
	f := func(hostname string) {
		response, err := request(ctx, hostname)
		if err != nil {
			log.Printf("Finished %s with error %s\n", hostname, err)
			return
		}
		responses <- response
		log.Printf("Finished %s\n", hostname)
	}
	go f("asia.gopl.io")
	go f("europe.gopl.io")
	go f("americas.gopl.io")
	return <-responses // 返回最快的响应
}

func request(ctx context.Context, hostname string) (string, error) {
	duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
	after := time.After(duration)
	select {
	case <-ctx.Done():
		return "", ctx.Err()
	case <-after:
		return "response for "+hostname, nil
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())
	result := mirroredQuery()
	log.Printf("Fastest result for %s\n", result)
	time.Sleep(6 * time.Second)
}
英文:

There is no goroutine leakage in the provided code. The mirroredQuery method uses a buffered channel to collect the result and return the first answer. And the currently buffer has enough space to collect all answers from all goroutines, even if the rest of the responses are never read. The situation will change if the buffer is smaller than N - 1, where N is the number of spawned goroutines. In this situation some of the goroutines spawned by mirroredQuery will get stuck trying to send a response to the responses channel. Repeating the call to mirroredQuery will cause increase of stucked goroutines which can be called goroutines leak.

Here is the code with the logs added and the output for both scenarios.

func mirroredQuery() string {
	responses := make(chan string, 2)
	go func() {
		responses &lt;- request(&quot;asia.gopl.io&quot;)
		log.Printf(&quot;Finished goroutine asia.gopl.io\n&quot;)
	}()
	go func() {
		responses &lt;- request(&quot;europe.gopl.io&quot;)
		log.Printf(&quot;Finished goroutine europe.gopl.io\n&quot;)
	}()
	go func() {
		responses &lt;- request(&quot;americas.gopl.io&quot;)
		log.Printf(&quot;Finished goroutine americas.gopl.io\n&quot;)
	}()
	return &lt;-responses // return the quickest response
}
func request(hostname string) (response string) {
	duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
	time.Sleep(duration)
	return hostname
}

func main() {
	rand.Seed(time.Now().UnixNano())
	result := mirroredQuery()
	log.Printf(&quot;Fastest result for %s\n&quot;, result)
	time.Sleep(6*time.Second)
}

Output for buffer size >= N-1

2021/06/26 16:05:27 Finished europe.gopl.io
2021/06/26 16:05:27 Fastest result for europe.gopl.io
2021/06/26 16:05:28 Finished asia.gopl.io
2021/06/26 16:05:30 Finished americas.gopl.io

Process finished with the exit code 0

Output for buffer size < N-1

2021/06/26 15:47:54 Finished europe.gopl.io
2021/06/26 15:47:54 Fastest result for europe.gopl.io

Process finished with the exit code 0

Above implementation can be "improved" by introducing goroutines termination when the first response arrives. This can potentially lower the number of used resources. It strongly depends on what request method do. For computation heavy scenarios it makes sense, for cancelling http request may lead to a connection termination, so the next request must open new one. For highly loaded servers it may be less effective than waiting for a response even if response is not used.

Below is the improved implementation with context usage.

func mirroredQuery() string {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	responses := make(chan string)
	f := func(hostname string) {
		response, err := request(ctx, hostname)
		if err != nil {
			log.Printf(&quot;Finished %s with error %s\n&quot;, hostname, err)
			return
		}
		responses &lt;- response
		log.Printf(&quot;Finished %s\n&quot;, hostname)
	}
	go f(&quot;asia.gopl.io&quot;)
	go f(&quot;europe.gopl.io&quot;)
	go f(&quot;americas.gopl.io&quot;)
	return &lt;-responses // return the quickest response
}

func request(ctx context.Context, hostname string) (string, error) {
	duration := time.Duration(rand.Int63n(5000)) * time.Millisecond
	after := time.After(duration)
	select {
	case &lt;-ctx.Done():
		return &quot;&quot;, ctx.Err()
	case &lt;-after:
		return &quot;response for &quot;+hostname, nil
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())
	result := mirroredQuery()
	log.Printf(&quot;Fastest result for %s\n&quot;, result)
	time.Sleep(6 * time.Second)
}

答案2

得分: 2

你错误地阅读了这本书。这本书使用示例来说明如何使用缓冲通道来避免 goroutine 泄漏。

这是书中示例之后的段落(第233页):

如果我们使用了一个无缓冲通道,那么两个较慢的 goroutine 将会被阻塞,试图在一个没有任何 goroutine 接收的通道上发送它们的响应。这种情况被称为 goroutine 泄漏,它将是一个 bug。与垃圾变量不同,泄漏的 goroutine 不会自动回收,因此确保在不再需要时 goroutine 自行终止非常重要。

注意:

  1. 此函数不尝试优化内存占用或资源使用(包括网络资源)。Go 的 net/http 包的客户端函数是上下文感知的,因此它可以在请求中间取消,这将节省一些资源(是否对问题有影响是一个设计决策)。

要使用上下文,你可以:

func mirroredQuery() string {
    responses := make(chan string, 3)
    ctx, cf := context.WithCancel(context.Background())
    defer cf()

    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // 返回最快的响应
}

func request(ctx context.Context, url string) string {
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
    if err != nil {
        panic(err)
    }
    res, err := http.DefaultClient.Do(req)
    if err == nil {
        body, err := io.ReadAll(res.Body)
        if err == nil {
            return string(body)
        } else {
            return err.Error()
        }
    } else {
        return err.Error()
    }
}
  1. 使用缓冲通道会分配内存。当有太多的 goroutine 时,使用缓冲通道会非常浪费。

为了解决这个问题,你可以使用一个通道(就像你尝试的那样):

func getAny() string {
    responses := make(chan string)
    ctx, cf := context.WithCancel(context.Background())
    defer cf()
    done := make(chan struct{})
    defer close(done)

    doRequest := func(url string) {
        select {
        case responses <- request(ctx, url):
            fmt.Printf("get %s\n", url)
        case <-done:
            fmt.Printf("stop %s\n", url)
            return
        }
    }

    go doRequest("http://google.com")
    go doRequest("http://qq.com")
    go doRequest("http://baidu.com")
    return <-responses // 返回最快的响应
}

在关闭的通道上接收总是立即“返回”一个零值,因此它可以用作广播。使用这种类型的“done 通道”是常见的做法。

你还可以使用 context.Context

func mirroredQuery() string {
    responses := make(chan string)
    ctx, cf := context.WithCancel(context.Background())
    defer cf()

    doRequest := func(url string) {
        select {
        case responses <- request(ctx, url):
            fmt.Printf("get %s\n", url)
        case <-ctx.Done():
            fmt.Printf("stop %s\n", url)
            return
        }
    }

    go doRequest("http://google.com")
    go doRequest("http://qq.com")
    go doRequest("http://baidu.com")
    return <-responses // 返回最快的响应
}

在这种情况下,使用 context.Context 更好,因为你已经在 http 中使用了它。

  1. 使用 sync.WorkGroup 会等待所有请求完成,但只返回第一个请求的结果。我认为这违背了函数的目的,并且几乎没有任何好处。我认为让所有由函数生成的 goroutine 在函数本身返回之前返回是没有意义的(除非函数是主函数)。
英文:

You read the book wrong. The book used the example to illustrate how to use a buffered channel to avoid goroutine leak.

This is the paragraph immediately follows the example in the book (page 233):

> Had we used an unbuffered channel, the two slower goroutines would
> have gotten stuck trying to send their responses on a channel from
> which no goroutine will ever receive. This situation, called a
> goroutine leak, would be a bug. Unlike garbage variables, leaked
> goroutines are not automatically collected, so it is important to make
> sure that goroutines terminate themselves when no longer needed.

Note:

  1. This function does not try to optimize for memory footprint or resource uses (including network resources). Go's net/http package's client functions are context aware, so it can cancel in middle of a request, which would save some resources (whether that matters for the trouble would be a design decision).

To use context, you can:

func mirroredQuery() string {
responses := make(chan string, 3)
ctx, cf := context.WithCancel(context.Background())
defer cf()
go func() { responses &lt;- request(&quot;asia.gopl.io&quot;) }()
go func() { responses &lt;- request(&quot;europe.gopl.io&quot;) }()
go func() { responses &lt;- request(&quot;americas.gopl.io&quot;) }()
return &lt;-responses // return the quickest response
}
func request(ctx context.Context, url string) string {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
panic(err)
}
res, err := http.DefaultClient.Do(req)
if err == nil {
body, err := io.ReadAll(res.Body)
if err == nil {
return string(body)
} else {
return err.Error()
}
} else {
return err.Error()
}
}
  1. Using a buffered channel allocates memory. When there are too many goroutines, it would be too wasteful to use a buffered channel.

To solve this, you can use a channel (like what you attempted):

func getAny() string {
responses := make(chan string)
ctx, cf := context.WithCancel(context.Background())
defer cf()
done := make(chan struct{})
defer close(done)
doRequest := func(url string) {
select {
case responses &lt;- request(ctx, url):
fmt.Printf(&quot;get %s\n&quot;, url)
case &lt;-done:
fmt.Printf(&quot;stop %s\n&quot;, url)
return
}
}
go doRequest(&quot;http://google.com&quot;)
go doRequest(&quot;http://qq.com&quot;)
go doRequest(&quot;http://baidu.com&quot;)
return &lt;-responses // return the quickest response
}

Receiving on a closed channel always "returns" a zero value immediately, thus serves as a broadcast. It is common practice to use this kind of "done channel".

You can also use context.Context:

func mirroredQuery() string {
responses := make(chan string)
ctx, cf := context.WithCancel(context.Background())
defer cf()
doRequest := func(url string) {
select {
case responses &lt;- request(ctx, url):
fmt.Printf(&quot;get %s\n&quot;, url)
case &lt;-ctx.Done():
fmt.Printf(&quot;stop %s\n&quot;, url)
return
}
}
go doRequest(&quot;http://google.com&quot;)
go doRequest(&quot;http://qq.com&quot;)
go doRequest(&quot;http://baidu.com&quot;)
return &lt;-responses // return the quickest response
}

This is better in this situation in that you already used a context.Context with http.

  1. Use a sync.WorkGroup would wait all request to finish but returns the first one. I think that defeats the function's purpose, and provides virtually no benefits. And I don't think making all goroutines the functions spawned returns before the function itself returns makes sense (unless the function is main function).

答案3

得分: 1

为了避免泄漏goroutine,您可能希望确保一旦从mirroredQuery返回,原本在该函数中创建的所有goroutine都停止运行。

在这种情况下,最重要的是能够在其中一个goroutine成功完成请求时取消其他goroutine。在Go中,可以使用context.Context来实现取消,而net/http库支持这一功能。

一旦使用上下文取消功能,您需要在主函数中使用sync.WaitGroup来等待所有goroutine完成。

下面是一个使用上下文并封装了书中request函数的“HTTP get”功能的doRequest函数示例:

func doRequest(ctx context.Context, url string) string {
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
    if err != nil {
        log.Fatal(err)
    }
    res, err := http.DefaultClient.Do(req)

    // 如果请求被取消,err也会是非nil值
    if err != nil {
        return ""
    }
    defer res.Body.Close()
    b, err := io.ReadAll(res.Body)
    if err != nil {
        return ""
    }
    return string(b)
}

如果上下文被取消,http.DefaultClient.Do会提前返回,并返回相应的错误。

现在,处理goroutine的函数变为:

func mirroredQuery() string {
    ctx, cancel := context.WithCancel(context.Background())
    responses := make(chan string, 3)

    fetcher := func(url string, wg *sync.WaitGroup) {
        res := doRequest(ctx, url)
        if res != "" {
            responses <- res
        }
        wg.Done()
    }

    urls := []string{
        "asia.gopl.io",
        "europe.gopl.io",
        "http://google.com",
    }

    var wg sync.WaitGroup
    for _, url := range urls {
        wg.Add(1)
        go fetcher(url, &wg)
    }

    res := <-responses
    fmt.Println("got response", res[:300])
    cancel()

    wg.Wait()
    return res
}

注意以下几点:

  • 每个goroutine都运行doRequest,只有在结果非空(表示没有发生错误;取消操作在这里被视为错误)时才将结果写入responses通道。
  • 使用WaitGroup等待所有工作goroutine退出。
  • 主goroutine启动所有工作goroutine,然后等待responses通道中的第一个(非空)结果;然后调用cancel取消上下文,这会通知所有工作goroutine退出,并等待它们完成。

作为练习,可以扩展此代码以解决一些问题:

  • 区分真正的错误和取消操作;在当前代码中,如果所有工作goroutine都遇到错误,可能会发生死锁。
  • 使用select在主goroutine中为<-responses读取添加超时。
  • 编写代码,尽快将第一个结果返回给调用者,同时后台goroutine可以处理取消上下文和等待工作goroutine退出。毕竟,主要目标是快速返回结果。
英文:

To avoid leaking goroutines, you presumably want to ensure that once you return from mirroredQuery, no goroutines that were originally created in this function remain running?

In that case, the most important thing is to be able to cancel the other goroutines when one of them manages to do the request successfully. This cancellation is achieved in Go using context.Context, which net/http supports.

Once you have context cancellation in place, you need a sync.WaitGroup in your main function to wait for all goroutines to be Done.

Here's a doRequest that uses a context and wraps the "HTTP get" functionality of the book's request function:

func doRequest(ctx context.Context, url string) string {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
log.Fatal(err)
}
res, err := http.DefaultClient.Do(req)
// err will be non-nil also if the request was canceled
if err != nil {
return &quot;&quot;
}
defer res.Body.Close()
b, err := io.ReadAll(res.Body)
if err != nil {
return &quot;&quot;
}
return string(b)
}

http.DefaultClient.Do will return early if the context is cancelled, with an appropriate error.

Now, the function to juggle the goroutines becomes:

func mirroredQuery() string {
ctx, cancel := context.WithCancel(context.Background())
responses := make(chan string, 3)
fetcher := func(url string, wg *sync.WaitGroup) {
res := doRequest(ctx, url)
if res != &quot;&quot; {
responses &lt;- res
}
wg.Done()
}
urls := []string{
&quot;asia.gopl.io&quot;,
&quot;europe.gopl.io&quot;,
&quot;http://google.com&quot;,
}
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go fetcher(url, &amp;wg)
}
res := &lt;-responses
fmt.Println(&quot;got response&quot;, res[:300])
cancel()
wg.Wait()
return res
}

Note a few things:

  • Each goroutine runs doRequest and only writes the result to responses if the result is non-empty (meaning no error occurred; cancel counts as an error here)
  • The WaitGroup is used to wait for all worker goroutines to exit
  • The main goroutine launches all workers then waits for the first (non-empty) result in responses; it then calls cancel to cancel the context which signals all the worker goroutines to exit, and waits for them to be done.

As an exercise, extend this code to solve a few issues:

  • Distinguish between real errors and cancellation; in the current code there could be a deadlock if all workers run into an error
  • Add a timeout to the &lt;- responses read in the main goroutine using select.
  • Write code to return the first result ASAP to the caller, while a background goroutine can deal with canceling the context and waiting for the workers to exit. After all, the primary goal here is to return the result fast.

答案4

得分: 0

使用上下文和sync.WaitGroup

英文:

Use context and sync.WaitGroup

huangapple
  • 本文由 发表于 2021年6月26日 19:52:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/68142086.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定