递归 Goroutines,告诉 Go 停止从通道读取的最简洁方法是什么?

huangapple go评论93阅读模式
英文:

Recursive Goroutines, what is the neatest way to tell Go to stop reading from channel?

问题

我想知道解决这个问题的惯用方法(目前会引发死锁错误),递归会分支出未知次数,所以我不能简单地关闭通道。

我已经通过传递一个指向数字的指针并递增它来使其工作,并且我已经研究了使用Sync waitgroups。我觉得(可能我错了),我没有想出一个优雅的解决方案。我看过的Go示例往往简单、巧妙而简洁。

这是来自Go之旅的最后一个练习,https://tour.golang.org/#73

你知道“一个Go程序员”会如何处理这个问题吗?任何帮助将不胜感激。我想从一开始就学好。

英文:

I want to know the idiomatic way to solve this (which currently throws a deadlock error), the recursion branches an unknown number of times, so I cannot simply close the channel.

http://play.golang.org/p/avLf_sQJj_

I have made it work, by passing a pointer to a number, and incrementing it, and I've looked into using Sync waitgroups. I didn't feel (and I may be wrong), that I'd came up with an elegant solution. The Go examples I have seen tend to be simple, clever and concise.

This is the last exercise from a Tour of Go, https://tour.golang.org/#73

Do you know 'how a Go programmer' would manage this? Any help would be appreciated. I'm trying to learn well from the start.

答案1

得分: 4

这是我对这个练习的解释。类似的练习有很多,但这是我的解决方案。我使用了sync.WaitGroup和一个自定义的、受互斥锁保护的映射来存储已访问的URL。主要是因为Go的标准map类型不是线程安全的。我还将数据和错误通道合并为一个结构体,该结构体具有一个方法来读取这些通道的内容。主要是为了关注点分离和代码更加清晰。

以下是示例代码(在playground上):

package main

import (
	"fmt"
	"sync"
)

type Fetcher interface {
	// Fetch返回URL的内容和在该页面上找到的URL列表。
	Fetch(url string) (body string, urls []string, err error)
}

// Crawl使用fetcher递归地爬取从url开始的页面,最大深度为depth。
func Crawl(wg *sync.WaitGroup, url string, depth int, fetcher Fetcher, cache *UrlCache, results *Results) {
	defer wg.Done()

	if depth <= 0 || !cache.AtomicSet(url) {
		return
	}

	body, urls, err := fetcher.Fetch(url)
	if err != nil {
		results.Error <- err
		return
	}

	results.Data <- [2]string{url, body}

	for _, url := range urls {
		wg.Add(1)
		go Crawl(wg, url, depth-1, fetcher, cache, results)
	}
}

func main() {
	var wg sync.WaitGroup
	cache := NewUrlCache()

	results := NewResults()
	defer results.Close()

	wg.Add(1)
	go Crawl(&wg, "http://golang.org/", 4, fetcher, cache, results)
	go results.Read()
	wg.Wait()
}

// Results定义了用于单个爬取的URL的结果通道。
type Results struct {
	Data  chan [2]string // url + body.
	Error chan error     // 可能的fetcher错误。
}

func NewResults() *Results {
	return &Results{
		Data:  make(chan [2]string, 1),
		Error: make(chan error, 1),
	}
}

func (r *Results) Close() error {
	close(r.Data)
	close(r.Error)
	return nil
}

// Read读取爬取的结果或错误,只要通道是打开的。
func (r *Results) Read() {
	for {
		select {
		case data := <-r.Data:
			fmt.Println(">", data)

		case err := <-r.Error:
			fmt.Println("e", err)
		}
	}
}

// UrlCache定义了我们已经访问过的URL的缓存。
type UrlCache struct {
	sync.Mutex
	data map[string]struct{} // 空结构体占用0字节,而bool占用1字节。
}

func NewUrlCache() *UrlCache { return &UrlCache{data: make(map[string]struct{})} }

// AtomicSet将给定的URL设置到缓存中,并返回false(如果URL已经存在)。
//
// 所有操作都在同一个锁定的上下文中进行。在多个goroutine中进行修改而没有同步是不安全的。
// 进行Exists()检查和Set()操作的分离将创建竞态条件,因此我们必须将两者合并为一个操作。
func (c *UrlCache) AtomicSet(url string) bool {
	c.Lock()
	_, ok := c.data[url]
	c.data[url] = struct{}{}
	c.Unlock()
	return !ok
}

// fakeFetcher是一个返回固定结果的Fetcher。
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
	body string
	urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
	if res, ok := f[url]; ok {
		return res.body, res.urls, nil
	}
	return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher是一个填充了数据的fakeFetcher。
var fetcher = fakeFetcher{
	"http://golang.org/": &fakeResult{
		"The Go Programming Language",
		[]string{
			"http://golang.org/pkg/",
			"http://golang.org/cmd/",
		},
	},
	"http://golang.org/pkg/": &fakeResult{
		"Packages",
		[]string{
			"http://golang.org/",
			"http://golang.org/cmd/",
			"http://golang.org/pkg/fmt/",
			"http://golang.org/pkg/os/",
		},
	},
	"http://golang.org/pkg/fmt/": &fakeResult{
		"Package fmt",
		[]string{
			"http://golang.org/",
			"http://golang.org/pkg/",
		},
	},
	"http://golang.org/pkg/os/": &fakeResult{
		"Package os",
		[]string{
			"http://golang.org/",
			"http://golang.org/pkg/",
		},
	},
}

这个代码没有经过广泛测试所以可能还有一些可以优化和修复的地方但至少可以给你一些思路

<details>
<summary>英文:</summary>

Here is my interpretation of the exercise. There are many like it, but this is mine. I use `sync.WaitGroup` and a custom, mutex-protected map to store visited URLs. Mostly because Go&#39;s standard `map` type is not thread safe. I also combine the data and error channels into a single structure, which has a method doing the reading of said channels. Mostly for separation of concerns and (arguably) keeping things a little cleaner.

Example [on playground](http://play.golang.org/p/DmS-Crw2AD):

	package main

	import (
		&quot;fmt&quot;
		&quot;sync&quot;
	)

	type Fetcher interface {
		// Fetch returns the body of URL and
		// a slice of URLs found on that page.
		Fetch(url string) (body string, urls []string, err error)
	}

	// Crawl uses fetcher to recursively crawl
	// pages starting with url, to a maximum of depth.
	func Crawl(wg *sync.WaitGroup, url string, depth int, fetcher Fetcher, cache *UrlCache, results *Results) {
		defer wg.Done()

		if depth &lt;= 0 || !cache.AtomicSet(url) {
			return
		}

		body, urls, err := fetcher.Fetch(url)
		if err != nil {
			results.Error &lt;- err
			return
		}

		results.Data &lt;- [2]string{url, body}

		for _, url := range urls {
			wg.Add(1)
			go Crawl(wg, url, depth-1, fetcher, cache, results)
		}
	}

	func main() {
		var wg sync.WaitGroup
		cache := NewUrlCache()

		results := NewResults()
		defer results.Close()

		wg.Add(1)
		go Crawl(&amp;wg, &quot;http://golang.org/&quot;, 4, fetcher, cache, results)
		go results.Read()
		wg.Wait()
	}

	// Results defines channels which yield results for a single crawled URL.
	type Results struct {
		Data  chan [2]string // url + body.
		Error chan error     // Possible fetcher error.
	}

	func NewResults() *Results {
		return &amp;Results{
			Data:  make(chan [2]string, 1),
			Error: make(chan error, 1),
		}
	}

	func (r *Results) Close() error {
		close(r.Data)
		close(r.Error)
		return nil
	}

	// Read reads crawled results or errors, for as long as the channels are open.
	func (r *Results) Read() {
		for {
			select {
			case data := &lt;-r.Data:
				fmt.Println(&quot;&gt;&quot;, data)

			case err := &lt;-r.Error:
				fmt.Println(&quot;e&quot;, err)
			}
		}
	}

	// UrlCache defines a cache of URL&#39;s we&#39;ve already visited.
	type UrlCache struct {
		sync.Mutex
		data map[string]struct{} // Empty struct occupies 0 bytes, whereas bool takes 1 bytes.
	}

	func NewUrlCache() *UrlCache { return &amp;UrlCache{data: make(map[string]struct{})} }

	// AtomicSet sets the given url in the cache and returns false if it already existed.
	//
	// All within the same locked context. Modifying a map without synchronisation is not safe
	// when done from multiple goroutines. Doing a Exists() check and Set() separately will
	// create a race condition, so we must combine both in a single operation.
	func (c *UrlCache) AtomicSet(url string) bool {
		c.Lock()
		_, ok := c.data[url]
		c.data[url] = struct{}{}
		c.Unlock()
		return !ok
	}

	// fakeFetcher is Fetcher that returns canned results.
	type fakeFetcher map[string]*fakeResult

	type fakeResult struct {
		body string
		urls []string
	}

	func (f fakeFetcher) Fetch(url string) (string, []string, error) {
		if res, ok := f[url]; ok {
			return res.body, res.urls, nil
		}
		return &quot;&quot;, nil, fmt.Errorf(&quot;not found: %s&quot;, url)
	}

	// fetcher is a populated fakeFetcher.
	var fetcher = fakeFetcher{
		&quot;http://golang.org/&quot;: &amp;fakeResult{
			&quot;The Go Programming Language&quot;,
			[]string{
				&quot;http://golang.org/pkg/&quot;,
				&quot;http://golang.org/cmd/&quot;,
			},
		},
		&quot;http://golang.org/pkg/&quot;: &amp;fakeResult{
			&quot;Packages&quot;,
			[]string{
				&quot;http://golang.org/&quot;,
				&quot;http://golang.org/cmd/&quot;,
				&quot;http://golang.org/pkg/fmt/&quot;,
				&quot;http://golang.org/pkg/os/&quot;,
			},
		},
		&quot;http://golang.org/pkg/fmt/&quot;: &amp;fakeResult{
			&quot;Package fmt&quot;,
			[]string{
				&quot;http://golang.org/&quot;,
				&quot;http://golang.org/pkg/&quot;,
			},
		},
		&quot;http://golang.org/pkg/os/&quot;: &amp;fakeResult{
			&quot;Package os&quot;,
			[]string{
				&quot;http://golang.org/&quot;,
				&quot;http://golang.org/pkg/&quot;,
			},
		},
	}


This has not been tested extensively, so perhaps there are optimisations and fixes that can be applied, but it should at least give you some ideas.

</details>



# 答案2
**得分**: 2

不要使用`sync.WaitGroup`而是可以扩展发送到解析的URL上的结果并包括找到的新URL数量在主循环中只要有要收集的内容就可以继续读取结果

在你的情况下找到的URL数量将是生成的goroutine数量但不一定需要是这样我个人会生成固定数量的获取例程这样你就不会打开太多的HTTP请求或者至少可以对其进行控制)。然后你的主循环不会改变因为它不关心获取是如何执行的这里的重要事实是你需要为每个URL发送一个结果或错误 - 我已经修改了代码所以当深度已经为1时它不会生成新的例程

这种解决方案的一个副作用是你可以在主循环中轻松打印进度

以下是示例代码

```go
package main

import (
	"fmt"
)

type Fetcher interface {
	// Fetch返回URL的内容和在该页面上找到的URL切片。
	Fetch(url string) (body string, urls []string, err error)
}

type Res struct {
	url   string
	body  string
	found int // 找到的新URL数量
}

// Crawl使用fetcher递归地爬取从url开始的页面,最大深度为depth。
func Crawl(url string, depth int, fetcher Fetcher, ch chan Res, errs chan error, visited map[string]bool) {
	body, urls, err := fetcher.Fetch(url)
	visited
= true
if err != nil { errs <- err return } newUrls := 0 if depth > 1 { for _, u := range urls { if !visited[u] { newUrls++ go Crawl(u, depth-1, fetcher, ch, errs, visited) } } } // 将结果与要获取的URL数量一起发送 ch <- Res{url, body, newUrls} return } func main() { ch := make(chan Res) errs := make(chan error) visited := map[string]bool{} go Crawl("http://golang.org/", 4, fetcher, ch, errs, visited) tocollect := 1 for n := 0; n < tocollect; n++ { select { case s := <-ch: fmt.Printf("found: %s %q\n", s.url, s.body) tocollect += s.found case e := <-errs: fmt.Println(e) } } } // fakeFetcher是返回预定义结果的Fetcher。 type fakeFetcher map[string]*fakeResult type fakeResult struct { body string urls []string } func (f fakeFetcher) Fetch(url string) (string, []string, error) { if res, ok := f
; ok {
return res.body, res.urls, nil } return "", nil, fmt.Errorf("not found: %s", url) } // fetcher是一个填充了fakeFetcher的实例。 var fetcher = fakeFetcher{ "http://golang.org/": &fakeResult{ "The Go Programming Language", []string{ "http://golang.org/pkg/", "http://golang.org/cmd/", }, }, "http://golang.org/pkg/": &fakeResult{ "Packages", []string{ "http://golang.org/", "http://golang.org/cmd/", "http://golang.org/pkg/fmt/", "http://golang.org/pkg/os/", }, }, "http://golang.org/pkg/fmt/": &fakeResult{ "Package fmt", []string{ "http://golang.org/", "http://golang.org/pkg/", }, }, "http://golang.org/pkg/os/": &fakeResult{ "Package os", []string{ "http://golang.org/", "http://golang.org/pkg/", }, }, } 是的,遵循@jimt的建议,使对映射的访问线程安全。 <details> <summary>英文:</summary> Instead of involving `sync.WaitGroup`, you could extend the result being send on a parsed url and include number of new URLs found. In your main loop you would then keep reading the results as long as there&#39;s something to collect. In your case number of urls found would be number of go routines spawned, but it doesn&#39;t necessarily need to be. I would personally spawn more or less fixed number of fetching routines, so you don&#39;t open too many HTTP requests (or at least you have control over it). Your main loop wouldn&#39;t change then, as it doesn&#39;t care how the fetching is being executed. The important fact here is that you need to send either a result or error for each url – I&#39;ve modified the code here, so it doesn&#39;t spawn new routines when the depth is already 1. A side effect of this solution is that you can easily print the progress in your main loop. Here is the example on playground: http://play.golang.org/p/BRlUc6bojf package main import ( &quot;fmt&quot; ) type Fetcher interface { // Fetch returns the body of URL and // a slice of URLs found on that page. Fetch(url string) (body string, urls []string, err error) } type Res struct { url string body string found int // Number of new urls found } // Crawl uses fetcher to recursively crawl // pages starting with url, to a maximum of depth. func Crawl(url string, depth int, fetcher Fetcher, ch chan Res, errs chan error, visited map[string]bool) { body, urls, err := fetcher.Fetch(url) visited
= true
if err != nil { errs &lt;- err return } newUrls := 0 if depth &gt; 1 { for _, u := range urls { if !visited[u] { newUrls++ go Crawl(u, depth-1, fetcher, ch, errs, visited) } } } // Send the result along with number of urls to be fetched ch &lt;- Res{url, body, newUrls} return } func main() { ch := make(chan Res) errs := make(chan error) visited := map[string]bool{} go Crawl(&quot;http://golang.org/&quot;, 4, fetcher, ch, errs, visited) tocollect := 1 for n := 0; n &lt; tocollect; n++ { select { case s := &lt;-ch: fmt.Printf(&quot;found: %s %q\n&quot;, s.url, s.body) tocollect += s.found case e := &lt;-errs: fmt.Println(e) } } } // fakeFetcher is Fetcher that returns canned results. type fakeFetcher map[string]*fakeResult type fakeResult struct { body string urls []string } func (f fakeFetcher) Fetch(url string) (string, []string, error) { if res, ok := f
; ok {
return res.body, res.urls, nil } return &quot;&quot;, nil, fmt.Errorf(&quot;not found: %s&quot;, url) } // fetcher is a populated fakeFetcher. var fetcher = fakeFetcher{ &quot;http://golang.org/&quot;: &amp;fakeResult{ &quot;The Go Programming Language&quot;, []string{ &quot;http://golang.org/pkg/&quot;, &quot;http://golang.org/cmd/&quot;, }, }, &quot;http://golang.org/pkg/&quot;: &amp;fakeResult{ &quot;Packages&quot;, []string{ &quot;http://golang.org/&quot;, &quot;http://golang.org/cmd/&quot;, &quot;http://golang.org/pkg/fmt/&quot;, &quot;http://golang.org/pkg/os/&quot;, }, }, &quot;http://golang.org/pkg/fmt/&quot;: &amp;fakeResult{ &quot;Package fmt&quot;, []string{ &quot;http://golang.org/&quot;, &quot;http://golang.org/pkg/&quot;, }, }, &quot;http://golang.org/pkg/os/&quot;: &amp;fakeResult{ &quot;Package os&quot;, []string{ &quot;http://golang.org/&quot;, &quot;http://golang.org/pkg/&quot;, }, }, } And yes, follow @jimt advice and make access to the map thread safe. </details> # 答案3 **得分**: 0 以下是我解决Go Tour中Web爬虫练习的方法: **为了在并行执行中跟踪递归完成**,我使用了原子整数计数器来跟踪并行递归中正在爬取的URL数量。在主函数中,我在循环中等待,直到原子计数器递减回零。 **为了避免再次爬取相同的URL**,我使用了带有互斥锁的映射来跟踪已爬取的URL。 以下是相应的代码片段。 你可以在[Github上找到完整的工作代码][1] ```go // Safe HashSet Version type SafeHashSet struct { sync.Mutex urls map[string]bool //我们主要希望将其用作哈希集,因此映射的值对我们来说不重要 } var ( urlSet SafeHashSet urlCounter int64 ) // 将URL添加到集合中,如果新的URL被添加(如果不存在) func (m *SafeHashSet) add(newUrl string) bool { m.Lock() defer m.Unlock() _, ok := m.urls[newUrl] if !ok { m.urls[newUrl] = true return true } return false } // Crawl使用fetcher递归爬取 // 从url开始的页面,最大深度为depth。 func Crawl(url string, depth int, fetcher Fetcher) { // 当此爬取函数退出时,递减原子URL计数器 defer atomic.AddInt64(&urlCounter, -1) if depth <= 0 { return } // 如果URL已经处理过,则不处理 isNewUrl := urlSet.add(url) if !isNewUrl { fmt.Printf("skip: \t%s\n", url) return } body, urls, err := fetcher.Fetch(url) if err != nil { fmt.Println(err) return } fmt.Printf("found: \t%s %q\n", url, body) for _, u := range urls { atomic.AddInt64(&urlCounter, 1) // 并行爬取 go Crawl(u, depth-1, fetcher) } return } func main() { urlSet = SafeHashSet{urls: make(map[string]bool)} atomic.AddInt64(&urlCounter, 1) go Crawl("https://golang.org/", 4, fetcher) for atomic.LoadInt64(&urlCounter) > 0 { time.Sleep(100 * time.Microsecond) } fmt.Println("Exiting") }
英文:

Here is how I solved the Web Crawler exercise of the Go Tour

For tracking recursion completion in parallel execution, I have used Atomic Integer counter to keep track of how many urls are getting crawled in parallel recursions. In the main function, I wait in loop till the atomic counter is decremented back to ZERO.

For avoiding crawling the same URL again, I have used a map with Mutex to keep track of crawled urls.

Below are the code snippets for the same.

You can find the entire working code here on Github

// Safe HashSet Version
type SafeHashSet struct {
sync.Mutex
urls map[string]bool //Primarily we wanted use this as an hashset, so the value of map is not significant to us
}
var (
urlSet     SafeHashSet
urlCounter int64
)
// Adds an URL to the Set, returns true if new url was added (if not present already)
func (m *SafeHashSet) add(newUrl string) bool {
m.Lock()
defer m.Unlock()
_, ok := m.urls[newUrl]
if !ok {
m.urls[newUrl] = true
return true
}
return false
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
// Decrement the atomic url counter, when this crawl function exits
defer atomic.AddInt64(&amp;urlCounter, -1)
if depth &lt;= 0 {
return
}
// Don&#39;t Process a url if it is already processed
isNewUrl := urlSet.add(url)
if !isNewUrl {
fmt.Printf(&quot;skip: \t%s\n&quot;, url)
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf(&quot;found: \t%s %q\n&quot;, url, body)
for _, u := range urls {
atomic.AddInt64(&amp;urlCounter, 1)
// Crawl parallely
go Crawl(u, depth-1, fetcher)
}
return
}
func main() {
urlSet = SafeHashSet{urls: make(map[string]bool)}
atomic.AddInt64(&amp;urlCounter, 1)
go Crawl(&quot;https://golang.org/&quot;, 4, fetcher)
for atomic.LoadInt64(&amp;urlCounter) &gt; 0 {
time.Sleep(100 * time.Microsecond)
}
fmt.Println(&quot;Exiting&quot;)
}

huangapple
  • 本文由 发表于 2014年11月24日 18:55:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/27103161.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定