Go语言中的网络爬虫

huangapple go评论85阅读模式
英文:

Webcrawler in Go

问题

我正在尝试使用Go语言构建一个网络爬虫,我想指定最大并发工作器的数量。只要队列中有待探索的链接,所有工作器都会一直工作。当队列中的元素少于工作器数量时,工作器应该关闭,但如果找到更多链接,则会重新启动。

我尝试的代码如下:

const max_workers = 6

// 模拟链接的整数
func crawl(wg *sync.WaitGroup, queue chan int) {
    for element := range queue {   
        wg.Done() // 为什么在这里使用defer会导致死锁?
        fmt.Println("添加2个新元素")
        if element%2 == 0 {
            wg.Add(2)
            queue <- (element*100 + 11)
            queue <- (element*100 + 33)
        }
    }
}

func main() {
    var wg sync.WaitGroup
    queue := make(chan int, 10)
    queue <- 0
    queue <- 1
    queue <- 2
    queue <- 3
    var min int
    if len(queue) < max_workers {
        min = len(queue)
    } else {
        min = max_workers
    }
    for i := 0; i < min; i++ {
        wg.Add(1)
        go crawl(&wg, queue)
    }
    wg.Wait()
    close(queue)
}

Playground链接

这个代码似乎可以工作,但有一个问题:当我启动时,我必须用多个元素填充队列。我希望它从一个(单一的)种子页面开始(在我的示例中是queue <- 0),然后动态地增加/缩小工作池。

我的问题是:

  • 我如何实现这种行为?

  • 为什么使用defer wg.Done()会导致死锁?在函数实际完成时调用wg.Done()是正常的吗?我认为,如果没有defer,goroutine将不会等待另一部分完成(在解析HTML的实际工作示例中可能需要更长时间)。

英文:

I'm trying to build a web crawler in Go where I would like to specify the max number of concurrent workers. They will all be working as long as there are link to explore in the queue. When the queue has less elements than workers, workers should shout down, but resume in case more links are found.

The code I have tried is

const max_workers = 6
// simulating links with int
func crawl(wg *sync.WaitGroup, queue chan int) {
    for element := range queue {   
        wg.Done() // why is defer here causing a deadlock?
        fmt.Println(&quot;adding 2 new elements &quot;)
        if element%2 == 0 {
            wg.Add(2)
            queue &lt;- (element*100 + 11)
            queue &lt;- (element*100 + 33)
        }
    
    }
}

func main() {
	var wg sync.WaitGroup
    queue := make(chan int, 10)
    queue &lt;- 0
    queue &lt;- 1
    queue &lt;- 2
    queue &lt;- 3
    var min int
    if (len(queue) &lt; max_workers) {
        min = len(queue)
    } else {
        min = max_workers
    }
    for i := 0; i &lt; min; i++ {
        wg.Add(1)
        go crawl(&amp;wg, queue)
    }
    wg.Wait()
    close(queue)
}

Link to playground

This seems to work, but there is a catch: I have to fill the queue with more than one element when I start. I would like it to start from a (single) seed page (in my example queue &lt;- 0) and then grow / shrink the working pool dynamically.

My questions are:

  • how can I obtain behavior?

  • why is defer wg.Done() causing deadlock? It's normal to wg.Done() the function when it is actually completed? I think that without the defer the goroutine is not waiting for the other part to finish (which could take longer in the real work example of parsing HTML).

答案1

得分: 5

如果你使用你最喜欢的网络搜索引擎搜索“Go网络爬虫”(或“golang网络爬虫”),你会找到许多示例,包括:Go Tour练习:网络爬虫。还有一些关于Go并发性的讲座涵盖了这种类型的内容。

在Go中,实现这个功能的“标准”方式不需要涉及等待组(wait groups)。回答你的一个问题,使用defer延迟执行的代码只会在函数返回时运行。由于你的函数运行时间较长,所以不要在这样的循环中使用defer

“标准”方式是在它们自己的goroutine中启动任意数量的工作线程。它们都从同一个通道中读取“任务”,如果没有任务可执行时会阻塞。当所有任务完成后,该通道会关闭,所有工作线程都会退出。

对于像爬虫这样的应用,工作线程会发现更多需要执行的“任务”并将其加入队列。你不希望它们将任务写回同一个通道,因为该通道可能有一定的缓冲区限制(或者没有缓冲区!),你最终会阻塞所有试图将更多任务加入队列的工作线程!

一个简单的解决方案是使用一个单独的通道(例如,每个工作线程有in <-chan Job, out chan<- Job),以及一个单独的队列/过滤器goroutine来读取这些请求,将它们附加到一个切片中,该切片可以任意增长或者进行一些全局限制,并从切片的头部向另一个通道发送数据(即一个简单的for-select循环,从一个通道读取数据并写入另一个通道)。这段代码通常还负责跟踪已经完成的任务(例如,已访问的URL的映射),并丢弃重复的请求。

队列goroutine的代码可能如下所示(这里的参数名过于冗长):

type Job string

func queue(toWorkers chan<- Job, fromWorkers <-chan Job) {
    var list []Job
    done := make(map[Job]bool)
    for {
        var send chan<- Job
        var item Job
        if len(list) > 0 {
            send = toWorkers
            item = list[0]
        }
        select {
        case send <- item:
            // 我们发送了一个任务,将其移除
            list = list[1:]
        case thing := <-fromWorkers:
            // 收到一个新任务
            if !done[thing] {
                list = append(list, thing)
                done[thing] = true
            }
        }
    }
}

这个简单示例中有一些细节被省略了,比如终止条件。如果“Jobs”是一个更大的结构体,你可能需要使用chan *Job[]*Job,并且还需要将映射类型更改为从任务中提取的某个键(例如Job.URL),并且在list = list[1:]之前需要执行list[0] = nil以消除对*Job指针的引用,让垃圾回收器更早地回收它。

编辑:关于优雅终止的一些说明。

有几种方法可以优雅地终止上述代码。可以使用等待组(wait group),但是需要仔细放置Add/Done调用的位置,可能还需要另一个goroutine来执行Wait操作(然后关闭一个通道以开始关闭过程)。工作线程不应该关闭它们的输出通道,因为有多个工作线程,不能多次关闭一个通道;队列goroutine无法判断何时关闭与工作线程之间的通道,除非知道工作线程何时完成。

在我之前使用类似上述代码的代码时,我在“队列”goroutine中使用了一个本地的“outstanding”计数器(避免了互斥锁或等待组的任何同步开销)。当将任务发送给工作线程时,增加了未完成的任务计数。当工作线程表示已完成任务时,再次减少计数。我的代码还为此使用了另一个通道(我的“队列”除了将进一步的节点加入队列外,还收集结果)。使用单独的通道可能更清晰,但也可以在现有通道上使用特殊值(例如,空的Job指针)。无论如何,有了这样一个计数器,当本地列表为空且没有未完成的任务时,只需关闭与工作线程的通道并返回,即可终止。

例如:

if len(list) > 0 {
    send = toWorkers
    item = list[0]
} else if outstandingJobs == 0 {
    close(toWorkers)
    return
}
英文:

If you use your favourite web search for "Go web crawler" (or "golang web crawler")
you'll find many examples including:
Go Tour Exercise: Web Crawler.
There are also some talks on concurrency in Go that cover this kind of thing.

The "standard" way to do this in Go does not need to involve wait groups at all.
To answer one of your questions, things queued with defer only get run when the function returns. You have a long running function so do not use defer in such a loop.

The "standard" way is to start up however many workers you want in their own goroutines.
They all read "jobs" from the same channel, blocking if/when there is nothing to do.
When fully done that channel is closed and they all exit.

In the case of something like a crawler the workers will discover more "jobs" to do and want to enqueue them.
You don't want them writing back to the same channel since it will have some limited amount of buffering (or none!) and you'll eventually block all the workers trying to enqueue more jobs!

A simple solution to this is to use a separate channel
(e.g. each worker has in &lt;-chan Job, out chan&lt;- Job)
and a single queue/filter goroutine that reads these requests,
appends them onto a slice that it either lets grow arbitrarily large or does some global limiting on,
and also feeds the other channel from the head of the slice
(i.e. a simple for-select loop reading from one channel and writing to the other).
This code is also usually responsible for keeping track of what has been already done
(e.g. a map of URLs visited) and drops incoming requests for duplicates.

The queue goroutine might look something like this (argument names excessively verbose here):

type Job string

func queue(toWorkers chan&lt;- Job, fromWorkers &lt;-chan Job) {
	var list []Job
	done := make(map[Job]bool)
	for {
		var send chan&lt;- Job
		var item Job
		if len(list) &gt; 0 {
			send = toWorkers
			item = list[0]
		}
		select {
		case send &lt;- item:
			// We sent an item, remove it
			list = list[1:]
		case thing := &lt;-fromWorkers:
			// Got a new thing
			if !done[thing] {
				list = append(list, thing)
                done[thing] = true
			}
		}
	}
}

A few things are glossed over in this simple example.
Such as termination. And if "Jobs" is some larger structure where you'd want to use chan *Job and []*Job instead.
In that case you'd also need to change the map type to some some key you extract from the job
(e.g. Job.URL perhaps)
and you'd want to do list[0] = nil before list = list[1:] to get rid of the reference to *Job pointer and let the garbage collector at it earlier.

Edit: Some notes on terminating cleanly.

There are several ways to terminate code like the above cleanly. A wait group could be used, but the placement of the Add/Done calls needs to be done carefully and you'd probably need another goroutine to do the Wait (and then close one of the channels to start the shutdown). The workers shouldn't close their output channel since there are multiple workers and you can't close a channel more than once; the queue goroutine can't tell when to close it's channel to the workers without knowing when the workers are done.

In the past when I've used code very similar to the above I used a local "outstanding" counter within the "queue" goroutine (which avoids any need for a mutex or any synchronization overhead that a wait group has). The count of outstanding jobs is increased when a job is sent to a worker. It's decreased again when the worker says it's finished with it. My code happened to have another channel for this (my "queue" was also collecting results in addition to further nodes to enqueue). It's probably cleaner on it's own channel, but instead a special value on the existing channel (e.g. a nil Job pointer) could be used. At any rate, with such a counter, the existing length check on the local list just needs to see there is nothing outstanding when the list is empty and it's time to terminate; just shutdown the channel to the workers and return.

E.g.:

    if len(list) &gt; 0 {
        send = toWorkers
        item = list[0]
    } else if outstandingJobs == 0 {
        close(toWorkers)
        return
    }

答案2

得分: 1

我使用Go语言的互斥锁(Mutex)函数编写了一个解决方案。

当它在并发运行时,限制只有一个实例可以同时访问url映射可能很重要。我相信我按照下面的方式实现了它。请随意尝试一下。我将非常感谢您的反馈,因为我也会从您的评论中学到东西。

package main

import (
	"fmt"
	"sync"
)

type Fetcher interface {
	// Fetch返回URL的内容和在该页面上找到的URL列表。
	Fetch(url string) (body string, urls []string, err error)
}

// SafeUrlBook帮助限制只有一个实例可以同时访问中央url映射。这样就不会发生冗余的爬取。
type SafeUrlBook struct {
	book map[string]bool
	mux  sync.Mutex
}

func (sub *SafeUrlBook) doesThisExist(url string) bool {
	sub.mux.Lock()
	_, key_exists := sub.book[url]
	defer sub.mux.Unlock()

	if key_exists {
		return true
	} else {
		sub.book[url] = true
		return false
	}
}

// Crawl使用fetcher递归爬取以url为起点的页面,最大深度为depth。
// 现在我使用safeBook(SafeUrlBook)来跟踪爬虫访问过的URL。
func Crawl(url string, depth int, fetcher Fetcher, safeBook SafeUrlBook) {
	if depth <= 0 {
		return
	}

	exist := safeBook.doesThisExist(url)
	if exist {
		fmt.Println("Skip", url)
		return
	}

	body, urls, err := fetcher.Fetch(url)
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Printf("found: %s %q\n", url, body)
	for _, u := range urls {
		Crawl(u, depth-1, fetcher, safeBook)
	}
	return
}

func main() {
	safeBook := SafeUrlBook{book: make(map[string]bool)}
	Crawl("https://golang.org/", 4, fetcher, safeBook)
}

// fakeFetcher是一个返回预定义结果的Fetcher。
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
	body string
	urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
	if res, ok := f[url]; ok {
		return res.body, res.urls, nil
	}
	return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher是一个填充了预定义结果的fakeFetcher。
var fetcher = fakeFetcher{
	"https://golang.org/": &fakeResult{
		"The Go Programming Language",
		[]string{
			"https://golang.org/pkg/",
			"https://golang.org/cmd/",
		},
	},
	"https://golang.org/pkg/": &fakeResult{
		"Packages",
		[]string{
			"https://golang.org/",
			"https://golang.org/cmd/",
			"https://golang.org/pkg/fmt/",
			"https://golang.org/pkg/os/",
		},
	},
	"https://golang.org/pkg/fmt/": &fakeResult{
		"Package fmt",
		[]string{
			"https://golang.org/",
			"https://golang.org/pkg/",
		},
	},
	"https://golang.org/pkg/os/": &fakeResult{
		"Package os",
		[]string{
			"https://golang.org/",
			"https://golang.org/pkg/",
		},
	},
}

以上是您提供的代码的翻译。

英文:

I wrote a solution utilizing the mutual exclusion (Mutex) function of Go.

When it runs on the concurrency, it may be important to restrict only one instance access the url map at a time. I believe I implemented it as written below. Please feel free to try this out. I would appreciate your feedback as I will be learn from your comments as well.

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// ! SafeUrlBook helps restrict only one instance access the central url map at a time. So that no redundant crawling should occur.
type SafeUrlBook struct {
book map[string]bool
mux  sync.Mutex
}
func (sub *SafeUrlBook) doesThisExist(url string) bool {
sub.mux.Lock()
_ , key_exists := sub.book
defer sub.mux.Unlock() if key_exists { return true } else { sub.book
= true return false } } // End SafeUrlBook // Crawl uses fetcher to recursively crawl // pages starting with url, to a maximum of depth. // Note that now I use safeBook (SafeUrlBook) to keep track of which url has been visited by a crawler. func Crawl(url string, depth int, fetcher Fetcher, safeBook SafeUrlBook) { if depth &lt;= 0 { return } exist := safeBook.doesThisExist(url) if exist { fmt.Println(&quot;Skip&quot;, url) ; return } body, urls, err := fetcher.Fetch(url) if err != nil { fmt.Println(err) return } fmt.Printf(&quot;found: %s %q\n&quot;, url, body) for _, u := range urls { Crawl(u, depth-1, fetcher, safeBook) } return } func main() { safeBook := SafeUrlBook{book: make(map[string]bool)} Crawl(&quot;https://golang.org/&quot;, 4, fetcher, safeBook) } // fakeFetcher is Fetcher that returns canned results. type fakeFetcher map[string]*fakeResult type fakeResult struct { body string urls []string } func (f fakeFetcher) Fetch(url string) (string, []string, error) { if res, ok := f
; ok { return res.body, res.urls, nil } return &quot;&quot;, nil, fmt.Errorf(&quot;not found: %s&quot;, url) } // fetcher is a populated fakeFetcher. var fetcher = fakeFetcher{ &quot;https://golang.org/&quot;: &amp;fakeResult{ &quot;The Go Programming Language&quot;, []string{ &quot;https://golang.org/pkg/&quot;, &quot;https://golang.org/cmd/&quot;, }, }, &quot;https://golang.org/pkg/&quot;: &amp;fakeResult{ &quot;Packages&quot;, []string{ &quot;https://golang.org/&quot;, &quot;https://golang.org/cmd/&quot;, &quot;https://golang.org/pkg/fmt/&quot;, &quot;https://golang.org/pkg/os/&quot;, }, }, &quot;https://golang.org/pkg/fmt/&quot;: &amp;fakeResult{ &quot;Package fmt&quot;, []string{ &quot;https://golang.org/&quot;, &quot;https://golang.org/pkg/&quot;, }, }, &quot;https://golang.org/pkg/os/&quot;: &amp;fakeResult{ &quot;Package os&quot;, []string{ &quot;https://golang.org/&quot;, &quot;https://golang.org/pkg/&quot;, }, }, }

huangapple
  • 本文由 发表于 2015年4月7日 20:37:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/29491795.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定