关闭缓冲通道时是否应该先将其排空?

huangapple go评论86阅读模式
英文:

Should one drain a buffered channel when closing it

问题

在Go语言中,给定一个(部分)填充的缓冲通道:

ch := make(chan *MassiveStruct, n)
for i := 0; i < n; i++ {
    ch <- NewMassiveStruct()
}

在关闭通道时(由写入者关闭),是否建议也将通道中的数据读取完毕(排空通道),以防读取者在未知的时间读取通道(例如,读取者数量有限且当前繁忙)?即:

close(ch)
for range ch {}

如果通道上有其他并发的读取者,这样的循环是否保证会结束?

背景信息:一个具有固定数量工作线程的队列服务,在服务关闭时应该停止处理队列中的任何任务(但不一定立即进行垃圾回收)。因此,我正在关闭通道以向工作线程指示服务正在终止。我可以立即排空剩余的“队列”,让垃圾回收器释放已分配的资源,我可以在工作线程中读取并忽略这些值,也可以保持通道不变,让读取者逐渐读取并在写入者中将通道设置为nil,以便垃圾回收器清理一切。我不确定哪种方式最为清晰。

英文:

Given a (partially) filled buffered channel in Go

ch := make(chan *MassiveStruct, n)
for i := 0; i &lt; n; i++ {
    ch &lt;- NewMassiveStruct()
}

is it advisable to also drain the channel when closing it (by the writer) in case it is unknown when readers are going to read from it (e.g. there is a limited number of those and they are currently busy)? That is

close(ch)
for range ch {}

Is such a loop guaranteed to end if there are other concurrent readers on the channel?

Context: a queue service with a fixed number of workers, which should drop processing anything queued when the service is going down (but not necessarily being GCed right after). So I am closing to indicate to the workers that the service is being terminated. I could drain the remaining "queue" immediately letting the GC free the resources allocated, I could read and ignore the values in the workers and I could leave the channel as is running down the readers and setting the channel to nil in the writer so that the GC cleans up everything. I am not sure which is the cleanest way.

答案1

得分: 19

这取决于你的程序,但一般来说,我倾向于说不需要在关闭通道之前清空通道。如果在关闭通道时通道中有项目,任何仍在读取通道的读取器将接收这些项目,直到通道为空。

以下是一个示例:

package main

import (
	"sync"
	"time"
)

func main() {

	var ch = make(chan int, 5)
	var wg sync.WaitGroup
	wg.Add(1)

	for range make([]struct{}, 2) {
		go func() {
			for i := range ch {
				wg.Wait()
				println(i)
			}
		}()
	}

	for i := 0; i < 5; i++ {
		ch <- i
	}
	close(ch)

	wg.Done()
	time.Sleep(1 * time.Second)
}

在这个示例中,尽管通道在任何读取器读取通道之前就被关闭了,程序仍然会输出所有的项目。

英文:

It depends on your program, but generally speaking I would tend to say no (you don't need to clear the channel before closing it): if there are items in your channel when you close it, any reader still reading from the channel will receive the items until the channel is empty.

Here is an example:

package main

import (
	&quot;sync&quot;
	&quot;time&quot;
)

func main() {

    var ch = make(chan int, 5)
    var wg sync.WaitGroup
    wg.Add(1)

    for range make([]struct{}, 2) {
	    go func() {
		    for i := range ch {
			    wg.Wait()
			    println(i)
		    }
	    }()
    }

    for i := 0; i &lt; 5; i++ {
    	ch &lt;- i
	}
	close(ch)

	wg.Done()
	time.Sleep(1 * time.Second)
}

Here, the program will output all the items, despite the fact that the channel is closed strictly before any reader can even read from the channel.

答案2

得分: 11

有更好的方法可以实现你想要的目标。你当前的方法可能会导致丢弃一些记录,并且以随机方式处理其他记录(因为排空循环会与所有的消费者竞争)。这并不能真正解决问题。

你需要的是取消操作。以下是一个示例,来自于《Go并发模式:管道和取消》(http://blog.golang.org/pipelines):

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

你将一个done通道传递给所有的goroutine,并在想要停止它们处理时关闭该通道。如果你经常这样做,你可能会发现golang.org/x/net/context包很有用,它规范了这种模式,并添加了一些额外的功能(比如超时)。

英文:

There are better ways to achieve what you're trying to achieve. Your current approach can just lead to throwing away some records, and processing other records randomly (since the draining loop is racing all the consumers). That doesn't really address the goal.

What you want is cancellation. Here's an example from Go Concurrency Patterns: Pipelines and cancellation

func sq(done &lt;-chan struct{}, in &lt;-chan int) &lt;-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out &lt;- n * n:
            case &lt;-done:
                return
            }
        }
    }()
    return out
}

You pass a done channel to all the goroutines, and you close it when you want them all to stop processing. If you do this a lot, you may find the golang.org/x/net/context package useful, which formalizes this pattern, and adds some extra features (like timeout).

答案3

得分: 1

我觉得提供的答案实际上并没有提供太多的解释,除了暗示既不需要排空也不需要关闭。因此,对于所描述的情况,以下解决方案对我来说看起来很清晰,它终止了工作线程并删除了与它们或相关通道的所有引用,从而让垃圾回收器清理通道及其内容:

type worker struct {
	submitted chan Task
	stop      chan bool
	p         *Processor
}

// 在一个 goroutine 中执行
func (w *worker) run() {
	for {
		select {
		case task := <-w.submitted:
			if err := task.Execute(w.p); err != nil {
				logger.Error(err.Error())
			}
		case <-w.stop:
			logger.Warn("Worker stopped")
			return
		}
	}
}

func (p *Processor) Stop() {
	if atomic.CompareAndSwapInt32(&p.status, running, stopped) {
		for _, w := range p.workers {
			w.stop <- true
		}
		// 当 goroutine 停止时,立即进行垃圾回收所有工作线程
		p.workers = nil
		// 当工作线程终止时,垃圾回收所有已发布的数据
		p.submitted = nil
		// 不需要执行以下操作:
		// close(p.submitted)
		// for range p.submitted {}
	}
}
英文:

I feel that the supplied answers actually do not clarify much apart from the hints that neither drain nor closing is needed. As such the following solution for the described context looks clean to me that terminates the workers and removes all references to them or the channel in question, thus, letting the GC to clean up the channel and its content:

type worker struct {
	submitted chan Task
	stop      chan bool
	p         *Processor
}

// executed in a goroutine
func (w *worker) run() {
	for {
		select {
		case task := &lt;-w.submitted:
			if err := task.Execute(w.p); err != nil {
				logger.Error(err.Error())
			}
		case &lt;-w.stop:
			logger.Warn(&quot;Worker stopped&quot;)
			return
		}
	}
}

func (p *Processor) Stop() {
	if atomic.CompareAndSwapInt32(&amp;p.status, running, stopped) {
		for _, w := range p.workers {
			w.stop &lt;- true
		}
        // GC all workers as soon as goroutines stop
        p.workers = nil
        // GC all published data when workers terminate
        p.submitted = nil
        // no need to do the following above:
		// close(p.submitted)
		// for range p.submitted {}
	}
}

huangapple
  • 本文由 发表于 2016年3月6日 21:29:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/35827442.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定