Go死锁,所有goroutine都处于休眠状态。

huangapple go评论92阅读模式
英文:

Go deadlock all goroutines asleep

问题

这是对我之前帖子的跟进:

http://stackoverflow.com/questions/34736825/goroutine-exit-status-2-what-does-it-mean-why-is-it-happening?noredirect=1#comment57238789_34736825

在阅读了多个关于此问题的主题和文章后,我仍然无法弄清楚通道应该在何处关闭。

该程序将打开一个文件列表,为每个输入文件创建一个输出文件(文件名相同),访问每个输入文件中的所有URL,并从中获取所有href链接 - 这些链接保存到相应的输出文件中。然而,我遇到了以下错误:

http://play.golang.org/p/8X-1rM3aXC

linkGetter和getHref函数主要用于处理。head和tail作为单独的goroutine运行,worker执行处理。

请问通道关闭的方式有什么问题?

英文:

This is a follow up to my earlier post:

    http://stackoverflow.com/questions/34736825/goroutine-exit-status-2-what-does-it-mean-why-is-it-happening?noredirect=1#comment57238789_34736825

I'm still having trouble figuring out where the channels should be closed, after reading multiple topics and articles both on and off SO.

This program will open a list of files, create an output file for each input file (with the same name),visit all the urls in each input file and get all href links from these - which are saved to the corresponding output file.
However, I'm getting the following error:

    http://play.golang.org/p/8X-1rM3aXC

The linkgetter, and getHref functions are mainly for processing. Head and tail are run as separate goroutines, and worker does the processing.

    package main

    import (
    "bufio"
    "bytes"
    "fmt"
    "golang.org/x/net/html"
    "io"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "path/filepath"
    "regexp"
    "sync"
    )

    type Work struct {
    Link     string
    Filename string
    }

    type Output struct {
    Href     string
    Filename string
    }

    func getHref(t html.Token) (href string, ok bool) {
    // Iterate over all of the Token's attributes until we find an    "href"
    for _, a := range t.Attr {
            if a.Key == "href" {
                    href = a.Val
                    ok = true
            }
    }
    return
    }

    func linkGetter(out chan<- Output, r io.Reader, filename string) {
    z := html.NewTokenizer(r)
    for {
            tt := z.Next()
            switch {
            case tt == html.ErrorToken:
                    return
            case tt == html.StartTagToken:
                    t := z.Token()
                    isAnchor := t.Data == "a"
                    if !isAnchor {
                            continue
                    }

                    // Extract the href value, if there is one
                    url, ok := getHref(t)
                    if !ok {
                            continue
                    }

                    out <- Output{url, filename}
            }
    }
    }

    func worker(out chan<- Output, in <-chan Work, wg *sync.WaitGroup)    {
    defer wg.Done()
    for work := range in {
            resp, err := http.Get(work.Link)
            if err != nil {
                    continue
            }
            body, err := ioutil.ReadAll(resp.Body)
            if err != nil {
                    continue
            }
            if err = resp.Body.Close(); err != nil {
                    fmt.Println(err)
            }
            linkGetter(out, bytes.NewReader(body), work.Filename)
    }
    }

    func head(c chan<- Work) {
    r, _ := regexp.Compile("(.*)(?:.json)")
    files, _ := filepath.Glob("*.json")

    for _, elem := range files {
            res := r.FindStringSubmatch(elem)
            for k, v := range res {

                    if k == 0 {
                            outpath, _ :=  filepath.Abs(fmt.Sprintf("go_tester/%s", v))

                            abspath, _ := filepath.Abs(fmt.Sprintf("url_links/%s", v))
                            f, _ := os.Open(abspath)
                            scanner := bufio.NewScanner(f)

                            for scanner.Scan() {
                                    c <- Work{outpath, scanner.Text()}
                            }

                    }
            }

    }
    
    
    }

    func tail(c <-chan Output) {
    currentfile := ""
    var f *os.File
    var err error
    for out := range c {
            if out.Filename != currentfile {
                    if err = f.Close(); err != nil { // might cause an error on first run
                            fmt.Println(err)
                    }
                    f, err = os.OpenFile(out.Filename, os.O_APPEND|os.O_WRONLY, 0600)
                    if err != nil {
                            log.Fatal(err)
                    }
                    currentfile = out.Filename
            }
            if _, err = f.WriteString(out.Href + "\n"); err != nil {
                    fmt.Println(err)
            }
    }
         
    }

    const (
    nworkers = 80
    )

    func main() {
    //fmt.Println("hi")
    in := make(chan Work)
    out := make(chan Output)

    go head(in)
    go tail(out)

    var wg sync.WaitGroup
    for i := 0; i < 85; i++ {
            wg.Add(1)
            go worker(out, in, &wg)
    }
    close(in)   
    close(out)    
    wg.Wait()


    }

What is wrong with the way the channels are closed?

答案1

得分: 3

你在这里并没有真正关注你的流水线设计。你必须问自己:“第X部分何时完成?完成后应该发生什么?完成后会发生什么?”对于流水线的每个部分都要这样做。

你启动headtailworker来遍历通道。这些函数能够成功返回的唯一方法是这些通道被关闭。

如果需要,可以将其画出来。

  1. head(in)将输入提供给in
  2. worker(out, in, &wg)遍历in,将其提供给out,并在in关闭后使用wg告诉你它已经完成
  3. tail(out)遍历out

那么你需要做什么来实现以下目标:

  1. 确保所有输入都被处理?
  2. 确保所有goroutine都返回?

可以这样做:

  1. head处理完所有文件时,需要关闭in
  2. 这将导致worker在处理完in中的所有项目后返回,从而导致wg.Wait()返回。
  3. 现在可以安全地关闭out,因为没有任何东西在提供给它,这将最终导致tail返回。

但是,对于这种特定的设计,你可能还需要与tail关联另一个sync.WaitGroup,因为当wg.Wait()返回时,整个程序将退出,可能无法完成tail正在进行的所有工作。参见这里。具体来说:

程序的执行从初始化主包开始,然后调用main函数。当该函数调用返回时,程序退出。它不会等待其他(非主)goroutine完成。

你可能还希望使用缓冲通道在这里引用,以帮助减少goroutine之间的上下文切换次数。根据你当前的设计,你在上下文切换方面浪费了很多时间。

英文:

You're not really paying attention to your pipeline design here. You have to ask yourself "When is section X done? What should happen when it is done? What happens after it is done?" for every section of the pipeline.

You start up head, tail, and worker to range over channels. The only way these functions are going to return successfully is if these channels are closed.

Draw it out of you need to.

  1. head(in) feeds in to in
  2. worker(out, in, &wg) ranges over in, feeds into out, and tells you it is done with wg once in is closed
  3. tail(out) ranges over out

So what do you need to do to:

  1. Make sure all input is processed?
  2. Make sure all goroutines return?

Like so:

  1. You need to close in from head once it is done processing all of the files.
  2. This will cause worker to actually return once all items it can get from in are processed, causing wg.Wait() to return
  3. it is now safe to close out since nothing is feeding in to it and this will cause tail to eventually return.

But you'll probably need another sync.WaitGroup associated with tail for this particular design because the whole program will exit right when wg.Wait() returns, thus possibly not finishing all of the work that tail is doing. See here. Specifically:

> Program execution begins by initializing the main package and then
> invoking the function main. When that function invocation returns, the
> program exits. It does not wait for other (non-main) goroutines to
> complete.

You'll probably also want to use buffered channels referenced here to aid in not having switch execution between goroutines so much. With your current design you're wasting a lot of time with context switching.

huangapple
  • 本文由 发表于 2016年1月13日 08:47:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/34756537.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定