如何在未缓冲的通道中判断是否没有接收到任何内容,而不关闭它?

huangapple go评论92阅读模式
英文:

How to find out nothing is being received in an unbuffered channel without closing it?

问题

有没有办法知道通道中的所有值是否都被消耗完了?我正在制作一个递归获取种子站点的网络爬虫。我没有关闭通道,因为它会从服务器中获取数据,并且应该在每次发送新站点时进行爬取。对于给定的种子站点,我找不到比超时更好的方法来判断子任务是否完成。如果有一种方法可以知道通道中没有值了(还有待消耗的值),那么我的程序就可以退出子任务并继续监听服务器。

英文:

Is there a way to know if all the values in channel has been consumed? I'm making a crawler which recursively fetches sites from seed site. I'm not closing the channel because it consumes from the server and should crawl every time new site is sent. For a given seed site, I can't find a better way to know completion of a subtask other than timing out. If there was a way to know that there is no value in channel(left to be consumed), my program could get out of the sub task and continue listening to the server.

答案1

得分: 4

在一个无缓冲通道中不存在"排队"的概念。如果通道是无缓冲的,根据定义它总是空的。如果是有缓冲的,那么它可能有一些元素,但是尝试读取其中的元素数量总是会导致竞态条件,所以不要设计成这样(在Go语言中也是不可能的)。

理想情况下,避免设计需要知道子任务何时完成的方案,但如果必须这样做,可以给它们发送一个通道来回应你。当它们回应时,你就知道它们已经完成了。

你描述的这种问题在Go语言的博客和演讲中有详细介绍:

英文:

There is no such things as "queued in an unbuffered channel." If the channel is unbuffered, it is by definition always empty. If it is buffered, then it may have some number of elements in it up to its size. But trying to read how many elements are in it is always going to cause race conditions, so don't design that way (it's also impossible in Go).

Ideally, avoid designs that need to know when children are complete, but when you must, send them a channel to respond to you on. When they respond, then you know they're complete.

The kind of problem you're describing is well covered in the Go blogs and talks:

答案2

得分: 0

你可以使用select语句中的default来判断一个goroutine是否在通道的另一端被阻塞。例如:

package main

import (
	"fmt"
	"time"
)

var c = make(chan int)

func produce(i int) {
	c <- i
}

func consume() {
	for {
		select {
		case i := <-c:
			fmt.Println(i)
		default:
			return
		}
	}
}

func main() {
	for i := 0; i < 10; i++ {
		go produce(i)
	}
	time.Sleep(time.Millisecond)
	consume()
}

请注意,这不是一个队列。如果你有一个生产goroutine,在发送一个值并再次循环之间产生多个值的时间内,default分支会执行,消费者会继续执行。

你可以使用超时:

case <-time.After(time.Second):

这样会给你的生产者一个秒的时间来产生另一个值,但最好使用一个终止值。将你要发送的内容包装在一个结构体中:

type message struct {
    err  error
    data theOriginalType
}

然后发送这个结构体。使用io.EOF或自定义错误var Done = errors.New("DONE")来表示完成。

由于你有一个递归问题,为什么不使用WaitGroup呢?每次启动一个新任务时,增加等待组的计数,每次任务完成时,减少计数。然后有一个外部任务等待完成。例如,这是一个计算斐波那契数列的非常低效的方法:

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func fib(c chan int, n int) {
	defer wg.Done()
	if n < 2 {
		c <- n
	} else {
		wg.Add(2)
		go fib(c, n-1)
		go fib(c, n-2)
	}
}

func main() {
	wg.Add(1)
	c := make(chan int)
	go fib(c, 18)
	go func() {
		wg.Wait()
		close(c)
	}()
	sum := 0
	for i := range c {
		sum += i
	}
	fmt.Println(sum)
}
英文:

You can determine whether or not a goroutine is blocked on the other end of a channel by using default in a select statement. For example:

package main

import (
	&quot;fmt&quot;
	&quot;time&quot;
)

var c = make(chan int)

func produce(i int) {
	c &lt;- i
}

func consume() {
	for {
		select {
		case i := &lt;-c:
			fmt.Println(i)
		default:
			return
		}
	}
}

func main() {
	for i := 0; i &lt; 10; i++ {
		go produce(i)
	}
	time.Sleep(time.Millisecond)
	consume()
}

Keep in mind that this isn't a queue though. If you were to have 1 producing goroutine that looped and produced multiple values between the time it took to send one value and get back around the loop again the default case would happen and your consumer would move on.

You could use a timeout:

case &lt;-time.After(time.Second):

Which would give your producer a second to produce another value, but you're probably better off using a terminal value. Wrap whatever you're sending in a struct:

type message struct {
    err error
    data theOriginalType
}

And send that thing instead. Then use io.EOF or a custom error var Done = errors.New(&quot;DONE&quot;) to signal completion.

Since you have a recursive problem why not use a WaitGroup? Each time you start a new task increment the wait group, and each time a task completes, decrement it. Then have an outer task waiting on completion. For example here's a really inefficient way of calculating a fibonacci number:

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

var wg sync.WaitGroup

func fib(c chan int, n int) {
	defer wg.Done()
	if n &lt; 2 {
		c &lt;- n
	} else {
		wg.Add(2)
		go fib(c, n - 1)
		go fib(c, n - 2)
	}
}

func main() {
	wg.Add(1)
	c := make(chan int)
	go fib(c, 18)
	go func() {
		wg.Wait()
		close(c)
	}()
	sum := 0
	for i := range c {
		sum += i
	}
	fmt.Println(sum)
}

huangapple
  • 本文由 发表于 2015年9月9日 07:29:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/32468762.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定