多个 goroutine 访问/修改一个列表/映射。

huangapple go评论67阅读模式
英文:

Multiple goroutines access/modify a list/map

问题

我正在尝试使用Go语言实现一个多线程爬虫作为学习该语言的示例任务。

它应该扫描页面,跟踪链接并将它们保存到数据库中。

为了避免重复,我尝试使用map来保存我已经保存过的所有URL。

同步版本运行良好,但是当我尝试使用goroutines时遇到了问题。

我试图使用互斥锁作为map的同步对象,并使用通道作为协调goroutines的方式。但显然我对它们没有清楚的理解。

问题在于我有很多重复的条目,所以我的map存储/检查无法正常工作。

以下是我的代码:

package main

import (
	"fmt"
	"net/http"
	"golang.org/x/net/html"
	"strings"
	"database/sql"
	_ "github.com/ziutek/mymysql/godrv"
	"io/ioutil"
	"runtime/debug"
	"sync"
)

const maxDepth = 2;

var workers = make(chan bool)

type Pages struct {
	mu sync.Mutex
	pagesMap map[string]bool
}

func main() {
	var pagesMutex Pages
	fmt.Println("Start")
	const database = "gotest"
	const user = "root"
	const password = "123"

	//open connection to DB
	con, err := sql.Open("mymysql", database + "/" + user + "/" + password)
	if err != nil { /* error handling */
		fmt.Printf("%s", err)
		debug.PrintStack()
	}

	fmt.Println("call 1st save site")
	pagesMutex.pagesMap = make(map[string]bool)
	go pagesMutex.saveSite(con, "http://golang.org/", 0)

	fmt.Println("saving true to channel")
	workers <- true

	fmt.Println("finishing in main")
	defer con.Close()
}


func (p *Pages) saveSite(con *sql.DB, url string, depth int) {
	fmt.Println("Save ", url, depth)
	fmt.Println("trying to lock")
	p.mu.Lock()
	fmt.Println("locked on mutex")
	pageDownloaded := p.pagesMap[url] == true
	if pageDownloaded {
		p.mu.Unlock()
		return
	} else {
		p.pagesMap[url] = true
	}
	p.mu.Unlock()

	response, err := http.Get(url)
	if err != nil {
		fmt.Printf("%s", err)
		debug.PrintStack()
	} else {
		defer response.Body.Close()

		contents, err := ioutil.ReadAll(response.Body)
		if err != nil {
			if err != nil {
				fmt.Printf("%s", err)
				debug.PrintStack()
			}
		}

		_, err = con.Exec("insert into pages (url) values (?)", string(url))
		if err != nil {
			fmt.Printf("%s", err)
			debug.PrintStack()
		}
		z := html.NewTokenizer(strings.NewReader((string(contents))))

		for {
			tokenType := z.Next()
			if tokenType == html.ErrorToken {
				return
			}

			token := z.Token()
			switch tokenType {
			case html.StartTagToken: // <tag>

				tagName := token.Data
				if strings.Compare(string(tagName), "a") == 0 {
					for _, attr := range token.Attr {
						if strings.Compare(attr.Key, "href") == 0 {
							if depth < maxDepth  {
								urlNew := attr.Val
								if !strings.HasPrefix(urlNew, "http")  {
									if strings.HasPrefix(urlNew, "/")  {
										urlNew = urlNew[1:]
									}
									urlNew = url + urlNew
								}
								//urlNew = path.Clean(urlNew)
								go  p.saveSite(con, urlNew, depth + 1)

							}
						}
					}

				}
			case html.TextToken: // text between start and end tag
			case html.EndTagToken: // </tag>
			case html.SelfClosingTagToken: // <tag/>

			}

		}

	}
	val := <-workers
	fmt.Println("finished Save Site", val)
}

有人能解释一下如何正确地做到这一点吗?谢谢!

英文:

I am trying to implement a multithreaded crawler using a go lang as a sample task to learn the language.

It supposed to scan pages, follow links and save them do DB.

To avoid duplicates I'm trying to use map where I save all the URLs I've already saved.

The synchronous version works fine, but I have troubles when I'm trying to use goroutines.

I'm trying to use mutex as a sync object for map, and channel as a way to coordinate goroutines. But obviously I don't have clear understanding of them.

The problem is that I have many duplicate entries, so my map store/check does not work properly.

Here is my code:

package main
import (
&quot;fmt&quot;
&quot;net/http&quot;
&quot;golang.org/x/net/html&quot;
&quot;strings&quot;
&quot;database/sql&quot;
_ &quot;github.com/ziutek/mymysql/godrv&quot;
&quot;io/ioutil&quot;
&quot;runtime/debug&quot;
&quot;sync&quot;
)
const maxDepth = 2;
var workers = make(chan bool)
type Pages struct {
mu sync.Mutex
pagesMap map[string]bool
}
func main() {
var pagesMutex Pages
fmt.Println(&quot;Start&quot;)
const database = &quot;gotest&quot;
const user = &quot;root&quot;
const password = &quot;123&quot;
//open connection to DB
con, err := sql.Open(&quot;mymysql&quot;, database + &quot;/&quot; + user + &quot;/&quot; + password)
if err != nil { /* error handling */
fmt.Printf(&quot;%s&quot;, err)
debug.PrintStack()
}
fmt.Println(&quot;call 1st save site&quot;)
pagesMutex.pagesMap = make(map[string]bool)
go pagesMutex.saveSite(con, &quot;http://golang.org/&quot;, 0)
fmt.Println(&quot;saving true to channel&quot;)
workers &lt;- true
fmt.Println(&quot;finishing in main&quot;)
defer con.Close()
}
func (p *Pages) saveSite(con *sql.DB, url string, depth int) {
fmt.Println(&quot;Save &quot;, url, depth)
fmt.Println(&quot;trying to lock&quot;)
p.mu.Lock()
fmt.Println(&quot;locked on mutex&quot;)
pageDownloaded := p.pagesMap
== true if pageDownloaded { p.mu.Unlock() return } else { p.pagesMap
= true } p.mu.Unlock() response, err := http.Get(url) if err != nil { fmt.Printf(&quot;%s&quot;, err) debug.PrintStack() } else { defer response.Body.Close() contents, err := ioutil.ReadAll(response.Body) if err != nil { if err != nil { fmt.Printf(&quot;%s&quot;, err) debug.PrintStack() } } _, err = con.Exec(&quot;insert into pages (url) values (?)&quot;, string(url)) if err != nil { fmt.Printf(&quot;%s&quot;, err) debug.PrintStack() } z := html.NewTokenizer(strings.NewReader((string(contents)))) for { tokenType := z.Next() if tokenType == html.ErrorToken { return } token := z.Token() switch tokenType { case html.StartTagToken: // &lt;tag&gt; tagName := token.Data if strings.Compare(string(tagName), &quot;a&quot;) == 0 { for _, attr := range token.Attr { if strings.Compare(attr.Key, &quot;href&quot;) == 0 { if depth &lt; maxDepth { urlNew := attr.Val if !strings.HasPrefix(urlNew, &quot;http&quot;) { if strings.HasPrefix(urlNew, &quot;/&quot;) { urlNew = urlNew[1:] } urlNew = url + urlNew } //urlNew = path.Clean(urlNew) go p.saveSite(con, urlNew, depth + 1) } } } } case html.TextToken: // text between start and end tag case html.EndTagToken: // &lt;/tag&gt; case html.SelfClosingTagToken: // &lt;tag/&gt; } } } val := &lt;-workers fmt.Println(&quot;finished Save Site&quot;, val) }

Could someone explain to me how to do this properly, please?

答案1

得分: 2

你有两个选择,对于一个小而简单的实现,我建议将地图上的操作分离到一个单独的结构中。

// Index 是一个共享页面索引
type Index struct {
    access sync.Mutex
    pages  map[string]bool
}

// Mark 报告一个站点已被访问
func (i Index) Mark(name string) {
    i.access.Lock()
    i.pages[name] = true
    i.access.Unlock()
}

// Visited 如果一个站点已被访问,则返回 true
func (i Index) Visited(name string) bool {
    i.access.Lock()
    defer i.access.Unlock()

    return i.pages[name]
}

然后,添加另一个类似的结构:

// Crawler 是一个网络爬虫 :D
type Crawler struct {
    index Index
    /* ... 其他重要的内容,比如已访问的站点 ... */
}

// Crawl 寻找内容
func (c *Crawler) Crawl(site string) {
    // 在这里实现你的逻辑
    // 例如:
    if !c.index.Visited(site) {
        c.index.Mark(site) // 标记为已访问
    }
}

这样你可以保持代码清晰明了,可能会多一些代码,但肯定更易读。你需要像这样实例化爬虫:

sameIndex := Index{pages: make(map[string]bool)}
asManyAsYouWant := Crawler{sameIndex, 0} // 它们将共享 sameIndex

如果你想进一步使用高级解决方案,我建议使用生产者/消费者架构。

英文:

Well you have two chooses, for a little and simple implementation, I would recommend to separate the operations on the map into a separate structure.

// Index is a shared page index
type Index struct {
access sync.Mutex
pages map[string]bool
}
// Mark reports that a site have been visited
func (i Index) Mark(name string) {
i.access.Lock()
i.pages[name] = true
i.access.Unlock()
}
// Visited returns true if a site have been visited
func (i Index) Visited(name string) bool {
i.access.Lock()
defer i.access.Unlock()
return i.pages[name]
}

Then, add another structure like this:

// Crawler is a web spider :D
type Crawler struct {
index Index
/* ... other important stuff like visited sites ... */
}
// Crawl looks for content
func (c *Crawler) Crawl(site string) {
// Implement your logic here 
// For example: 
if !c.index.Visited(site) {
c.index.Mark(site) // When marked
}
}

That way you keep things nice and clear, probably a little more code, but definitely more readable. You need to instance crawler like this:

sameIndex := Index{pages: make(map[string]bool)}
asManyAsYouWant := Crawler{sameIndex, 0} // They will share sameIndex

If you want to go further with a high level solution, then I would recommend Producer/Consumer architecture.

huangapple
  • 本文由 发表于 2016年2月17日 05:55:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/35443781.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定