英文:
Go crawler stalls on select from output channel after a few minutes
问题
我写了一个简单的爬虫,大致如下所示:
type SiteData struct {
// ...
}
func downloadURL(url string) (body []byte, status int) {
resp, err := http.Get(url)
if err != nil {
return
}
status = resp.StatusCode
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
body = bytes.Trim(body, "\x00")
return
}
func processSiteData(resp []byte) SiteData {
// ...
}
func worker(input chan string, output chan SiteData) {
// 等待通道中的链接进行处理
for url := range input {
// 获取HTTP响应和状态码
resp, status := downloadURL(url)
if resp != nil && status == 200 {
// 如果获取链接没有错误
// 处理数据并发送回去
output <- processSiteData(resp)
} else {
// 否则重新发送链接进行处理
input <- url
}
}
}
func crawl(urlList []string) {
numWorkers := 4
input := make(chan string)
output := make(chan SiteData)
// 创建工作协程
for i := 0; i < numWorkers; i++ {
go worker(input, output)
}
// 将链接加入队列
go func() {
for _, url := range urlList {
input <- url
}
}()
// 等待结果
for {
select {
case data := <-output:
saveToDB(data)
}
}
}
func main() {
urlList := loadLinksFromDB()
crawl(urlList)
}
它可以爬取一个网站,并且工作得很好 - 下载数据,处理数据并将其保存到数据库中。但是在几分钟后(大约5-10分钟),它会"卡住",需要重新启动。该网站没有将我列入黑名单,我已经与他们核实过,在程序阻塞后可以随时访问任何URL。此外,它在所有URL都处理完之前就会阻塞。显然,当列表用尽时它会阻塞,但是现在还远没有到那个时候。
我在这里做错了什么吗?我之所以使用for { select { ... } }
而不是for _, _ = range urlList { // read output }
是因为如果处理失败,任何URL都可以重新加入队列。此外,数据库似乎也不是问题。任何建议都将有所帮助 - 谢谢。
英文:
I've written a simple crawler that looks something like this:
type SiteData struct {
// ...
}
func downloadURL(url string) (body []byte, status int) {
resp, err := http.Get(url)
if err != nil {
return
}
status = resp.StatusCode
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
body = bytes.Trim(body, "\x00")
return
}
func processSiteData(resp []byte) SiteData {
// ...
}
func worker(input chan string, output chan SiteData) {
// wait on the channel for links to process
for url := range input {
// fetch the http response and status code
resp, status := downloadURL(url)
if resp != nil && status == 200 {
// if no errors in fetching link
// process the data and send
// it back
output <- processSiteData(resp)
} else {
// otherwise send the url for processing
// once more
input <- url
}
}
}
func crawl(urlList []string) {
numWorkers := 4
input := make(chan string)
output := make(chan SiteData)
// spawn workers
for i := 0; i < numWorkers; i++ {
go worker(input, output)
}
// enqueue urls
go func() {
for url := range urlList {
input <- url
}
}()
// wait for the results
for {
select {
case data := <-output:
saveToDB(data)
}
}
}
func main() {
urlList := loadLinksFromDB()
crawl(urlList)
}
It scrapes a single website and works great - downloading data, processing it and saving it to a database. Yet after a few minutes (5-10) or so it gets "stuck" and needs to be restarted. The site isn't blacklisting me, I've verified with them and can access any url at any time after the program blocks. Also, it blocks before all the urls are done processing. Obviously it'll block when the list is spent, but it is nowhere near that.
Am I doing something wrong here? The reason I'm using for { select { ... } }
instead of for _, _ = range urlList { // read output }
is that any url can be re-enqueued if failed to process. In addition, the database doesn't seem to be the issue here as well. Any input will help - thanks.
答案1
得分: 1
我相信当所有N个工作线程都在等待input <- url
时,程序会卡住,因此没有更多的工作线程从input
中取出数据。换句话说,如果4个URL几乎同时失败,程序会卡住。
解决方案是将失败的URL发送到不是工作线程的输入通道的地方(以避免死锁)。
一种可能的解决方案是使用一个单独的failed
通道,匿名的goroutine始终从该通道接收输入。代码如下(未经测试):
package main
func worker(input chan string, output chan SiteData, failed chan string) {
for url := range input {
// ...
if resp != nil && status == 200 {
output <- processSideData(resp)
} else {
failed <- url
}
}
}
func crawl(urlList []string) {
numWorkers := 4
input := make(chan string)
failed := make(chan string)
output := make(chan SiteData)
// 启动工作线程
for i := 0; i < numWorkers; i++ {
go worker(input, output, failed)
}
// 将URL分发给工作线程,并从工作线程接收失败的URL
go func() {
for {
select {
case input <- urlList[0]:
urlList = urlList[1:]
case url := <-failed:
urlList = append(urlList, url)
}
}
}()
// 等待结果
for {
data := <-output
saveToDB(data)
}
}
func main() {
urlList := loadLinksFromDB()
crawl(urlList)
}
(注意,在你的评论中提到的,在crawl()
函数中不使用for _, _ = range urlList { // read output }
是正确的,因为URL可以重新入队列;但是根据我所知,你不需要使用select
。)
英文:
I believe this hangs when you have all N workers waiting on input <- url
, and hence there are no more workers taking stuff out of input
. In other words, if 4 URLs fail roughly at the same time, it will hang.
The solution is to send failed URLs to some place that is not the input channel for the workers (to avoid deadlock).
One possibility is to have a separate failed
channel, with the anonymous goroutine always accepting input from it. Like this (not tested):
package main
func worker(intput chan string, output chan SiteData, failed chan string) {
for url := range input {
// ...
if resp != nil && status == 200 {
output <- processSideData(resp)
} else {
failed <- url
}
}
}
func crawl(urlList []string) {
numWorkers := 4
input := make(chan string)
failed := make(chan string)
output := make(chan SiteData)
// spawn workers
for i := 0; i < numWorkers; i++ {
go worker(input, output, failed)
}
// Dispatch URLs to the workers, also receive failures from them.
go func() {
for {
select {
case input <- urlList[0]:
urlList = urlList[1:]
case url := <-failed:
urlList = append(urlList, url)
}
}
}()
// wait for the results
for {
data := <-output
saveToDB(data)
}
}
func main() {
urlList := loadLinksFromDB()
crawl(urlList)
}
(Note how it is correct, as you say in your commentary, not to use for _, _ = range urlList { // read output }
in your crawl()
function, because URLs can be re-enqueued; but you don’t need select either as far as I can tell.)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论