如何检查通道是否不会再接收到任何数据,以及何时和何地进行检查?

huangapple go评论85阅读模式
英文:

When and where to check if channel won't get any more data?

问题

我正在尝试解决练习:Web爬虫

在这个练习中,你将使用Go的并发特性来并行化一个Web爬虫。

修改Crawl函数,以并行方式获取URL,避免重复获取相同的URL。

我应该在什么时候检查所有的URL是否已经被爬取过?(或者如何知道是否还有更多的数据在队列中?)

输出结果出现了死锁,因为stor.Queue通道从未关闭。

英文:

I'm trying to solve Exercise: Web Crawler

> In this exercise you'll use Go's concurrency features to parallelize a
> web crawler.
>
> Modify the Crawl function to fetch URLs in parallel without fetching
> the same URL twice.

When should I check if all urls already been crawled? (or how could I know if there will be no more data queued?)

package main

import (
	"fmt"
)

type Result struct {
	Url string
	Depth int
}

type Stor struct {
	Queue  chan Result
	Visited map[string]int
}    

func NewStor() *Stor {
	return &Stor{
		Queue:  make(chan Result,1000),
		Visited: map[string]int{},
	}
}

type Fetcher interface {
	// Fetch returns the body of URL and
	// a slice of URLs found on that page.
	Fetch(url string) (body string, urls []string, err error)
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(res Result, fetcher Fetcher, stor *Stor) {
	defer func() {			
		/*
		if len(stor.Queue) == 0 {
			close(stor.Queue)
		}	
		*/	// this is wrong, it makes the channel closes too early
	}()
	if res.Depth <= 0 {
		return
	}
	// TODO: Don't fetch the same URL twice.
	url := res.Url
	stor.Visited
++ if stor.Visited
> 1 { fmt.Println("skip:",stor.Visited
,url) return } body, urls, err := fetcher.Fetch(url) if err != nil { fmt.Println(err) return } fmt.Printf("found: %s %q\n", url, body) for _, u := range urls { stor.Queue <- Result{u,res.Depth-1} } return } func main() { stor := NewStor() Crawl(Result{"http://golang.org/", 4}, fetcher, stor) for res := range stor.Queue { // TODO: Fetch URLs in parallel. go Crawl(res,fetcher,stor) } } // fakeFetcher is Fetcher that returns canned results. type fakeFetcher map[string]*fakeResult type fakeResult struct { body string urls []string } func (f fakeFetcher) Fetch(url string) (string, []string, error) { if res, ok := f
; ok { return res.body, res.urls, nil } return "", nil, fmt.Errorf("not found: %s", url) } // fetcher is a populated fakeFetcher. var fetcher = fakeFetcher{ "http://golang.org/": &fakeResult{ "The Go Programming Language", []string{ "http://golang.org/pkg/", "http://golang.org/cmd/", }, }, "http://golang.org/pkg/": &fakeResult{ "Packages", []string{ "http://golang.org/", "http://golang.org/cmd/", "http://golang.org/pkg/fmt/", "http://golang.org/pkg/os/", }, }, "http://golang.org/pkg/fmt/": &fakeResult{ "Package fmt", []string{ "http://golang.org/", "http://golang.org/pkg/", }, }, "http://golang.org/pkg/os/": &fakeResult{ "Package os", []string{ "http://golang.org/", "http://golang.org/pkg/", }, }, }

The output was a deadlock since the stor.Queue channel never closed.

答案1

得分: 4

等待所有goroutine完成的最简单方法是使用sync包中的sync.WaitGroup。

package main

import "sync"

var wg sync.WaitGroup

// 然后你可以这样做
func Crawl(res Result, fetcher Fetcher) {
    defer wg.Done()
    // ...
    // 为什么不在Crawl函数内部生成新的goroutine呢?
    for res := range urls {
        wg.Add(1)
        go Crawl(res, fetcher)
    }
    // ...
}

// 在main.main()函数中
func main() {
    wg.Add(1)
    Crawl(Result{"http://golang.org/", 4}, fetcher)
    // ...
    wg.Wait() // 会阻塞直到所有goroutine完成
}

完整的解决方案如下:

package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup
var visited map[string]int = map[string]int{}

type Result struct {
    Url   string
    Depth int
}

type Fetcher interface {
    Fetch(url string) (body string, urls []string, err error)
}

func Crawl(res Result, fetcher Fetcher) {
    defer wg.Done()
    if res.Depth <= 0 {
        return
    }
    // TODO: 不要重复抓取相同的URL
    url := res.Url
    visited[url]++
    if visited[url] > 1 {
        fmt.Println("skip:", visited[url], url)
        return
    }
    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("found: %s %q\n", url, body)
    for _, u := range urls {
        wg.Add(1)
        go Crawl(Result{u, res.Depth - 1}, fetcher)
    }
    return
}

func main() {
    wg.Add(1)
    Crawl(Result{"http://golang.org/", 4}, fetcher)
    wg.Wait()
}

type fakeFetcher map[string]*fakeResult

type fakeResult struct {
    body string
    urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
    if res, ok := f[url]; ok {
        return res.body, res.urls, nil
    }
    return "", nil, fmt.Errorf("not found: %s", url)
}

var fetcher = fakeFetcher{
    "http://golang.org/": &fakeResult{
        "The Go Programming Language",
        []string{
            "http://golang.org/pkg/",
            "http://golang.org/cmd/",
        },
    },
    "http://golang.org/pkg/": &fakeResult{
        "Packages",
        []string{
            "http://golang.org/",
            "http://golang.org/cmd/",
            "http://golang.org/pkg/fmt/",
            "http://golang.org/pkg/os/",
        },
    },
    "http://golang.org/pkg/fmt/": &fakeResult{
        "Package fmt",
        []string{
            "http://golang.org/",
            "http://golang.org/pkg/",
        },
    },
    "http://golang.org/pkg/os/": &fakeResult{
        "Package os",
        []string{
            "http://golang.org/",
            "http://golang.org/pkg/",
        },
    },
}

希望对你有帮助!

英文:

Simplest way to wait until all goroutings are done is sync.WaitGroup in sync package

package main
import &quot;sync&quot;
var wg sync.WaitGroup
//then you do
func Crawl(res Result, fetcher Fetcher) { //what for you pass stor *Stor as arg? It just visible for all goroutings
defer wg.Done()
...
//why not to spawn new routing just inside Crowl?
for res := range urls {
wg.Add(1)
go Crawl(res,fetcher)
}
...
}
...
//And in main.main()
func main() {
wg.Add(1) 
Crawl(Result{&quot;http://golang.org/&quot;, 4}, fetcher)
...
wg.Wait() //Will block until all routings Done
}

Complete solution will be:

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
var wg sync.WaitGroup
var visited map[string]int = map[string]int{}
type Result struct {
Url string
Depth int
}
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(res Result, fetcher Fetcher) {
defer wg.Done()
if res.Depth &lt;= 0 {
return
}
// TODO: Don&#39;t fetch the same URL twice.
url := res.Url
visited
++ if visited
&gt; 1 { fmt.Println(&quot;skip:&quot;,visited
,url) return } body, urls, err := fetcher.Fetch(url) if err != nil { fmt.Println(err) return } fmt.Printf(&quot;found: %s %q\n&quot;, url, body) for _, u := range urls { wg.Add(1) go Crawl( Result{u,res.Depth-1},fetcher) //stor.Queue &lt;- Result{u,res.Depth-1} } return } func main() { wg.Add(1) Crawl(Result{&quot;http://golang.org/&quot;, 4}, fetcher) wg.Wait() } // fakeFetcher is Fetcher that returns canned results. type fakeFetcher map[string]*fakeResult type fakeResult struct { body string urls []string } func (f fakeFetcher) Fetch(url string) (string, []string, error) { if res, ok := f
; ok { return res.body, res.urls, nil } return &quot;&quot;, nil, fmt.Errorf(&quot;not found: %s&quot;, url) } // fetcher is a populated fakeFetcher. var fetcher = fakeFetcher{ &quot;http://golang.org/&quot;: &amp;fakeResult{ &quot;The Go Programming Language&quot;, []string{ &quot;http://golang.org/pkg/&quot;, &quot;http://golang.org/cmd/&quot;, }, }, &quot;http://golang.org/pkg/&quot;: &amp;fakeResult{ &quot;Packages&quot;, []string{ &quot;http://golang.org/&quot;, &quot;http://golang.org/cmd/&quot;, &quot;http://golang.org/pkg/fmt/&quot;, &quot;http://golang.org/pkg/os/&quot;, }, }, &quot;http://golang.org/pkg/fmt/&quot;: &amp;fakeResult{ &quot;Package fmt&quot;, []string{ &quot;http://golang.org/&quot;, &quot;http://golang.org/pkg/&quot;, }, }, &quot;http://golang.org/pkg/os/&quot;: &amp;fakeResult{ &quot;Package os&quot;, []string{ &quot;http://golang.org/&quot;, &quot;http://golang.org/pkg/&quot;, }, }, }

答案2

得分: 1

检查通道的长度始终存在竞争条件,你不能将其用于任何形式的同步。

生产者始终是关闭通道的一方,因为在关闭的通道上尝试发送是致命错误。在这里不要使用defer,只需在发送完成后关闭通道。

英文:

Checking the len of a channel is always a race, you can't use that for any sort of synchronization.

The producer is always the side that closes a channel, because it's a fatal error to try and send on a closed channel. Don't use a defer here, just close the channel when you're done sending.

huangapple
  • 本文由 发表于 2015年1月8日 21:18:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/27841092.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定