如何在Go中使用专用通道来信号化爬取作业的结束

huangapple go评论83阅读模式
英文:

How to use a dedicated channel to signal the end of a crawl job in go

问题

这是对我的上一个问题的跟进。

我正在尝试构建一个网络爬虫的原型,并且我想使用chan来阻塞执行,直到所有的任务都完成,就像这样:

func main() {
    go func() {
        do_stuff()
        stop <- true
    }
    fmt.Println(<-stop)
}

有一个queue函数将任务分发给工作线程。当所有任务完成时,该函数还会关闭通道并发送一个信号。

type Job int

// 模拟处理 HTML 页面并返回更多链接的工作线程
func worker(in chan Job, out chan Job, num int) {
    for element := range in {
        if element%2 == 0 {
            out <- 100*element + 5
            out <- 100*element + 3
            out <- 100*element + 1
        }
    }
}

func queue(toWorkers chan<- Job, fromWorkers <-chan Job, init Job, stop chan bool) {
    var list []Job
    var currentJobs int
    currentJobs = 0
    list = append(list, init)
    done := make(map[Job]bool)
    for {
        var send chan<- Job
        var item Job
        if len(list) > 0 {
            send = toWorkers
            item = list[0]
        } else if currentJobs == 0 {
            close(toWorkers)
            // 这里出了问题!
            stop <- true
            return
        }

        select {
        case send <- item:
            currentJobs += 1
            // 我们发送了一个任务,将其移除
            list = list[1:]
        case thing := <-fromWorkers:
            currentJobs -= 1
            // 收到一个新的任务
            if !done[thing] {
                list = append(list, thing)
                done[thing] = true
            }
        }
    }
}

func main() {
    in := make(chan Job, 1)
    out := make(chan Job, 1)
    stop := make(chan bool)
    // 将任务分发给工作线程
    go queue(in, out, 0, stop)
    for i := 0; i < max_workers; i++ {
        go worker(in, out, i)
    }
    duration := time.Second
    time.Sleep(duration)
    // 这会导致死锁
    fmt.Println(<-stop)
}

Playground链接

如果我理解正确,问题出在stop通道上:当工作线程仍然有任务时,Go 会认为没有人会向该通道发送消息,并宣布死锁。queue函数将关闭toWorkers通道并发送一个信号到stop,但前提是没有未完成的任务。
我漏掉了什么?

英文:

This is a follow up from my previous question.

I am trying to build a prototype for a webcrawler and I want to use a chan to block the execution until all the jobs are done, just as in

func main() {
	go func() {
	    do_stuff()
	    stop &lt;- true
   }
	fmt.Println(&lt;-stop)
}

There is a queue function that dispatch the jobs to the workers. When all jobs are finished, the function will also the channel and send a signal.

type Job int

//simulating a worker that processes a html page and returns some more links
func worker(in chan Job, out chan Job, num int) {
	for element := range in {
		if element%2 == 0 {
			out &lt;- 100*element + 5
			out &lt;- 100*element + 3
			out &lt;- 100*element + 1
		}
	}
}

func queue(toWorkers chan&lt;- Job, fromWorkers &lt;-chan Job, init Job, stop chan bool) {
	var list []Job
	var currentJobs int
	currentJobs = 0
	list = append(list, init)
	done := make(map[Job]bool)
	for {
		var send chan&lt;- Job
		var item Job
		if len(list) &gt; 0 {
			send = toWorkers
			item = list[0]
		} else if currentJobs == 0 {
			close(toWorkers)
			// this messes up everything!
			stop &lt;- true
			return
		}

		select {
		case send &lt;- item:
			currentJobs += 1
			// We sent an item, remove it
			list = list[1:]
		case thing := &lt;-fromWorkers:
			currentJobs -= 1
			// Got a new thing
			if !done[thing] {
				list = append(list, thing)
				done[thing] = true
			}
		}
	}

}

func main() {
	in := make(chan Job, 1)
	out := make(chan Job, 1)
	stop := make(chan bool)
	// dispatches jobs to workers
	go queue(in, out, 0, stop)
	for i := 0; i &lt; max_workers; i++ {
		go worker(in, out, i)
	}
	duration := time.Second
	time.Sleep(duration)
	// this cause deadlock
	fmt.Println(&lt;-stop)
}

Link to playground

If I understand correctly, the problem is with the stop channel: when the workers still have jobs, go thinks that no one will send to that channel and declares deadlock. The function queue will both close the toWorkers channel and send a signal to stop, but not while there are outstanding jobs.
What am I missing?

答案1

得分: 4

使用sync.WaitGroup来等待所有的goroutine结束。

http://golang.org/pkg/sync/#WaitGroup

http://blog.golang.org/pipelines

我在这里提供了一个小例子:http://play.golang.org/p/P30LdV0Gfe

package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	routinesNo := 10
	wg.Add(routinesNo)
	for i := 0; i < routinesNo; i++ {
		go func(n int) {
			fmt.Printf("%d ", n)
			wg.Done()
		}(i)
	}
	wg.Wait()
	fmt.Println("\nThe end!")
}
英文:

Use sync.WaitGroup to wait for all the go routines to end.

http://golang.org/pkg/sync/#WaitGroup

http://blog.golang.org/pipelines

I made a small example here: http://play.golang.org/p/P30LdV0Gfe

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

func main() {
	var wg sync.WaitGroup
	routinesNo := 10
	wg.Add(routinesNo)
	for i := 0; i &lt; routinesNo; i++ {
		go func(n int) {
			fmt.Printf(&quot;%d &quot;, n)
			wg.Done()
		}(i)
	}
	wg.Wait()
	fmt.Println(&quot;\nThe end!&quot;)
}

huangapple
  • 本文由 发表于 2015年4月9日 20:26:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/29538470.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定