golang: how to close the channel after all goroutines are finished?

huangapple go评论72阅读模式
英文:

golang: how to close the channel after all goroutines are finished?

问题

我想用Go语言编写一个简单的网络爬虫,实现以下功能:

  • 从一个URL中获取所有符合某种模式的href
  • 提取一些特定字段
  • 将结果写入CSV文件

以下是我的代码:

package main

import (
	"encoding/csv"
	"flag"
	"fmt"
	"github.com/PuerkitoBio/goquery"
	"log"
	"net/http"
	"net/url"
	"os"
	"strings"
	"sync"
)

type Enterprise struct {
	name     string
	tax_code string
	group    string
	capital  string
}

var u, f string
var name, tax_code, group, capital string

func init() {
	flag.StringVar(&u, "u", "", "Which URL to download from")
	flag.StringVar(&f, "f", "", "Path to the csv file to write the output to")
}

func check(e error) {
	if e != nil {
		panic(e)
	}
}

func findHrefs(u string) map[string]string {
	resp, err := http.Get(u)
	check(err)

	doc, err := goquery.NewDocumentFromResponse(resp)
	check(err)

	e_hrefs := make(map[string]string)
	doc.Find("td div a").Each(func(_ int, s *goquery.Selection) {
		e_href, _ := s.Attr("href")
		if strings.HasPrefix(e_href, "/Thong-tin-doanh-nghiep") && s.Text() != "" {
			e_hrefs[e_href] = s.Text()
		}
	})
	return e_hrefs
}

func fetch(url string, name string, file *os.File, wg *sync.WaitGroup, c chan Enterprise) {
	defer wg.Done()

	log.Println("Fetching URL", url)
	resp, err := http.Get(url)
	check(err)

	doc, err := goquery.NewDocumentFromResponse(resp)
	check(err)
	e := new(Enterprise)
	doc.Find("td").Each(func(_ int, s *goquery.Selection) {
		if s.Text() == "Mã số thuế:" {
			e.tax_code = s.Next().Text()
		}
		if s.Text() == "Tên ngành cấp 2:" {
			e.group = s.Next().Text()
		}
		if s.Text() == "Sở hữu vốn:" {
			e.capital = s.Next().Text()
		}
	})
	w := csv.NewWriter(file)
	w.Write([]string{name, "'" + e.tax_code, e.group, e.capital})
	w.Flush()
	c <- *e
}

func getDoc(u, f string) {
	parsedUrl, err := url.Parse(u)
	check(err)

	file, err := os.Create(f)
	check(err)
	defer file.Close()

	var wg sync.WaitGroup
	c := make(chan Enterprise)

	e_hrefs := findHrefs(u)
	for e_href, name := range e_hrefs {
		wg.Add(1)
		go fetch(parsedUrl.Scheme+"://"+parsedUrl.Host+e_href, name, file, &wg, c)
	}
	wg.Wait()
}

func main() {
	flag.Parse()
	if u == "" || f == "" {
		fmt.Println("-u=<URL to download from> -f=<Path to the CSV file>")
		os.Exit(1)
	}
	getDoc(u, f)
}

问题是在所有goroutine完成后,通道没有关闭,我必须按Ctrl+C才能恢复命令提示符:

2016/03/02 09:34:05 Fetching URL ...
2016/03/02 09:34:05 Fetching URL ...
2016/03/02 09:34:05 Fetching URL ...
^Csignal: interrupt

通过阅读这篇文章,我将getDoc函数中的最后一行更改为以下内容:

go func() {
	wg.Wait()
	close(c)
}()

现在当运行时,我可以恢复命令提示符,但是通道在所有goroutine完成之前就关闭了,没有将任何内容写入CSV文件。

我错在哪里?

英文:

I would like to write a simple web scraper in Go by:

  • get all href with a pattern from an URL
  • extract some specific fields
  • and write to a CSV file

Here's my code:

package main
import (
&quot;encoding/csv&quot;
&quot;flag&quot;
&quot;fmt&quot;
&quot;github.com/PuerkitoBio/goquery&quot;
&quot;log&quot;
&quot;net/http&quot;
&quot;net/url&quot;
&quot;os&quot;
&quot;strings&quot;
&quot;sync&quot;
)
type Enterprise struct {
name     string
tax_code string
group    string
capital  string
}
var u, f string
var name, tax_code, group, capital string
func init() {
flag.StringVar(&amp;u, &quot;u&quot;, &quot;&quot;, &quot;Which URL to download from&quot;)
flag.StringVar(&amp;f, &quot;f&quot;, &quot;&quot;, &quot;Path to the csv file to write the output to&quot;)
}
func check(e error) {
if e != nil {
panic(e)
}
}
func findHrefs(u string) map[string]string {
resp, err := http.Get(u)
check(err)
doc, err := goquery.NewDocumentFromResponse(resp)
check(err)
e_hrefs := make(map[string]string)
doc.Find(&quot;td div a&quot;).Each(func(_ int, s *goquery.Selection) {
e_href, _ := s.Attr(&quot;href&quot;)
if strings.HasPrefix(e_href, &quot;/Thong-tin-doanh-nghiep&quot;) &amp;&amp; s.Text() != &quot;&quot; {
e_hrefs[e_href] = s.Text()
}
})
return e_hrefs
}
func fetch(url string, name string, file *os.File, wg *sync.WaitGroup, c chan Enterprise) {
defer wg.Done()
log.Println(&quot;Fetching URL&quot;, url)
resp, err := http.Get(url)
check(err)
doc, err := goquery.NewDocumentFromResponse(resp)
check(err)
e := new(Enterprise)
doc.Find(&quot;td&quot;).Each(func(_ int, s *goquery.Selection) {
if s.Text() == &quot;M&#227; số thuế:&quot; {
e.tax_code = s.Next().Text()
}
if s.Text() == &quot;T&#234;n ng&#224;nh cấp 2:&quot; {
e.group = s.Next().Text()
}
if s.Text() == &quot;Sở hữu vốn:&quot; {
e.capital = s.Next().Text()
}
})
w := csv.NewWriter(file)
w.Write([]string{name, &quot;&#39;&quot; + e.tax_code, e.group, e.capital})
w.Flush()
c &lt;- *e
}
func getDoc(u, f string) {
parsedUrl, err := url.Parse(u)
check(err)
file, err := os.Create(f)
check(err)
defer file.Close()
var wg sync.WaitGroup
c := make(chan Enterprise)
e_hrefs := findHrefs(u)
for e_href, name := range e_hrefs {
wg.Add(1)
go fetch(parsedUrl.Scheme+&quot;://&quot;+parsedUrl.Host+e_href, name, file, &amp;wg, c)
}
wg.Wait()
}
func main() {
flag.Parse()
if u == &quot;&quot; || f == &quot;&quot; {
fmt.Println(&quot;-u=&lt;URL to download from&gt; -f=&lt;Path to the CSV file&gt;&quot;)
os.Exit(1)
}
getDoc(u, f)
}

The problem is channel was not closed after all goroutines are finished and I have to press <kbd>control</kbd>+<kbd>C</kbd> to get my shell prompt back:

2016/03/02 09:34:05 Fetching URL ...
2016/03/02 09:34:05 Fetching URL ...
2016/03/02 09:34:05 Fetching URL ...
^Csignal: interrupt

By reading this, I change the last line in getDoc func to something like:

go func() {
wg.Wait()
close(c)
}()

Now I can get my shell prompt back when running but the channel was closed before all goroutines are finished and nothing write to CSV file.

Where did I go wrong?

答案1

得分: 4

对我来说,看起来你没有从你的通道中读取数据,因为它是一个同步通道(你没有声明长度),如果接收到值,它将会阻塞。所以你需要通过value <- c从你的c中读取数据,否则你的获取函数将会在c <- *e处一直挂起。

这导致你的sync.WaitGroup永远不会调用wg.Done(),从而不会减少计数器,也不会导致wg.Wait()停止阻塞,这导致你的close(c)永远不会被调用。

英文:

To me it doesn't look like you're reading from your channel, and because it is a synchronous channel (you never declared a length on it) it will block if it receives a value. So you need to be reading from your c by value &lt;- c or your fetch function will just hang at c &lt;- *e

This is causing your sync.WaitGroup to never wg.Done() which never decrements the counter, which never causes the wg.Wait() to stop blocking, which causes your close(c) to never get called

答案2

得分: 0

我的原始代码大致如下:

e_hrefs := findHrefs(u)
w := csv.NewWriter(file)
for e_href, name := range e_hrefs {
    wg.Add(1)
    go fetch(parsedUrl.Scheme+"://"+parsedUrl.Host+e_href, name, &wg, c)
    e := <-c
    w.Write([]string{name, "'" + e.tax_code, e.group, e.capital})
    w.Flush()
}
wg.Wait()

你可以看到,这不是并发的。

我刚刚通过使用 range 语句来迭代通道来修复了这个问题:

e_hrefs := findHrefs(u)
for e_href, name := range e_hrefs {
    wg.Add(1)
    go fetch(parsedUrl.Scheme+"://"+parsedUrl.Host+e_href, name, &wg, c)
}
go func() {
    wg.Wait()
    close(c)
}()

w := csv.NewWriter(file)
for e := range c {
    w.Write([]string{e.name, "'" + e.tax_code, e.group, e.capital})
    w.Flush()
}
英文:

My original code is something like this:

e_hrefs := findHrefs(u)
w := csv.NewWriter(file)
for e_href, name := range e_hrefs {
wg.Add(1)
go fetch(parsedUrl.Scheme+&quot;://&quot;+parsedUrl.Host+e_href, name, &amp;wg, c)
e := &lt;-c
w.Write([]string{name, &quot;&#39;&quot; + e.tax_code, e.group, e.capital})
w.Flush()
}
wg.Wait()

and you can see, it's not concurrency.

I've just fixed by using the range clause to iterate over channel:

e_hrefs := findHrefs(u)
for e_href, name := range e_hrefs {
wg.Add(1)
go fetch(parsedUrl.Scheme+&quot;://&quot;+parsedUrl.Host+e_href, name, &amp;wg, c)
}
go func() {
wg.Wait()
close(c)
}()
w := csv.NewWriter(file)
for e := range c {
w.Write([]string{e.name, &quot;&#39;&quot; + e.tax_code, e.group, e.capital})
w.Flush()
}

huangapple
  • 本文由 发表于 2016年3月2日 11:12:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/35737796.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定