英文:
Go deadlock all goroutines asleep
问题
这是对我之前帖子的跟进:
在阅读了多个关于此问题的主题和文章后,我仍然无法弄清楚通道应该在何处关闭。
该程序将打开一个文件列表,为每个输入文件创建一个输出文件(文件名相同),访问每个输入文件中的所有URL,并从中获取所有href链接 - 这些链接保存到相应的输出文件中。然而,我遇到了以下错误:
http://play.golang.org/p/8X-1rM3aXC
linkGetter和getHref函数主要用于处理。head和tail作为单独的goroutine运行,worker执行处理。
请问通道关闭的方式有什么问题?
英文:
This is a follow up to my earlier post:
http://stackoverflow.com/questions/34736825/goroutine-exit-status-2-what-does-it-mean-why-is-it-happening?noredirect=1#comment57238789_34736825
I'm still having trouble figuring out where the channels should be closed, after reading multiple topics and articles both on and off SO.
This program will open a list of files, create an output file for each input file (with the same name),visit all the urls in each input file and get all href links from these - which are saved to the corresponding output file.
However, I'm getting the following error:
http://play.golang.org/p/8X-1rM3aXC
The linkgetter, and getHref functions are mainly for processing. Head and tail are run as separate goroutines, and worker does the processing.
package main
import (
"bufio"
"bytes"
"fmt"
"golang.org/x/net/html"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"regexp"
"sync"
)
type Work struct {
Link string
Filename string
}
type Output struct {
Href string
Filename string
}
func getHref(t html.Token) (href string, ok bool) {
// Iterate over all of the Token's attributes until we find an "href"
for _, a := range t.Attr {
if a.Key == "href" {
href = a.Val
ok = true
}
}
return
}
func linkGetter(out chan<- Output, r io.Reader, filename string) {
z := html.NewTokenizer(r)
for {
tt := z.Next()
switch {
case tt == html.ErrorToken:
return
case tt == html.StartTagToken:
t := z.Token()
isAnchor := t.Data == "a"
if !isAnchor {
continue
}
// Extract the href value, if there is one
url, ok := getHref(t)
if !ok {
continue
}
out <- Output{url, filename}
}
}
}
func worker(out chan<- Output, in <-chan Work, wg *sync.WaitGroup) {
defer wg.Done()
for work := range in {
resp, err := http.Get(work.Link)
if err != nil {
continue
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
continue
}
if err = resp.Body.Close(); err != nil {
fmt.Println(err)
}
linkGetter(out, bytes.NewReader(body), work.Filename)
}
}
func head(c chan<- Work) {
r, _ := regexp.Compile("(.*)(?:.json)")
files, _ := filepath.Glob("*.json")
for _, elem := range files {
res := r.FindStringSubmatch(elem)
for k, v := range res {
if k == 0 {
outpath, _ := filepath.Abs(fmt.Sprintf("go_tester/%s", v))
abspath, _ := filepath.Abs(fmt.Sprintf("url_links/%s", v))
f, _ := os.Open(abspath)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
c <- Work{outpath, scanner.Text()}
}
}
}
}
}
func tail(c <-chan Output) {
currentfile := ""
var f *os.File
var err error
for out := range c {
if out.Filename != currentfile {
if err = f.Close(); err != nil { // might cause an error on first run
fmt.Println(err)
}
f, err = os.OpenFile(out.Filename, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Fatal(err)
}
currentfile = out.Filename
}
if _, err = f.WriteString(out.Href + "\n"); err != nil {
fmt.Println(err)
}
}
}
const (
nworkers = 80
)
func main() {
//fmt.Println("hi")
in := make(chan Work)
out := make(chan Output)
go head(in)
go tail(out)
var wg sync.WaitGroup
for i := 0; i < 85; i++ {
wg.Add(1)
go worker(out, in, &wg)
}
close(in)
close(out)
wg.Wait()
}
What is wrong with the way the channels are closed?
答案1
得分: 3
你在这里并没有真正关注你的流水线设计。你必须问自己:“第X部分何时完成?完成后应该发生什么?完成后会发生什么?”对于流水线的每个部分都要这样做。
你启动head
、tail
和worker
来遍历通道。这些函数能够成功返回的唯一方法是这些通道被关闭。
如果需要,可以将其画出来。
head(in)
将输入提供给in
worker(out, in, &wg)
遍历in
,将其提供给out
,并在in
关闭后使用wg
告诉你它已经完成tail(out)
遍历out
那么你需要做什么来实现以下目标:
- 确保所有输入都被处理?
- 确保所有goroutine都返回?
可以这样做:
- 当
head
处理完所有文件时,需要关闭in
。 - 这将导致
worker
在处理完in
中的所有项目后返回,从而导致wg.Wait()
返回。 - 现在可以安全地关闭
out
,因为没有任何东西在提供给它,这将最终导致tail
返回。
但是,对于这种特定的设计,你可能还需要与tail
关联另一个sync.WaitGroup
,因为当wg.Wait()
返回时,整个程序将退出,可能无法完成tail
正在进行的所有工作。参见这里。具体来说:
程序的执行从初始化主包开始,然后调用
main
函数。当该函数调用返回时,程序退出。它不会等待其他(非主)goroutine完成。
你可能还希望使用缓冲通道在这里引用,以帮助减少goroutine之间的上下文切换次数。根据你当前的设计,你在上下文切换方面浪费了很多时间。
英文:
You're not really paying attention to your pipeline design here. You have to ask yourself "When is section X done? What should happen when it is done? What happens after it is done?" for every section of the pipeline.
You start up head
, tail
, and worker
to range over channels. The only way these functions are going to return successfully is if these channels are closed.
Draw it out of you need to.
head(in)
feeds in toin
worker(out, in, &wg)
ranges overin
, feeds intoout
, and tells you it is done withwg
oncein
is closedtail(out)
ranges overout
So what do you need to do to:
- Make sure all input is processed?
- Make sure all goroutines return?
Like so:
- You need to close
in
fromhead
once it is done processing all of the files. - This will cause
worker
to actually return once all items it can get fromin
are processed, causingwg.Wait()
to return - it is now safe to close
out
since nothing is feeding in to it and this will causetail
to eventually return.
But you'll probably need another sync.WaitGroup
associated with tail
for this particular design because the whole program will exit right when wg.Wait()
returns, thus possibly not finishing all of the work that tail
is doing. See here. Specifically:
> Program execution begins by initializing the main package and then
> invoking the function main. When that function invocation returns, the
> program exits. It does not wait for other (non-main) goroutines to
> complete.
You'll probably also want to use buffered channels referenced here to aid in not having switch execution between goroutines so much. With your current design you're wasting a lot of time with context switching.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论