为什么Go通道限制缓冲区大小

huangapple go评论73阅读模式
英文:

Why Go channels limit the buffer size

问题

我是新手学习Go语言,可能有些地方理解有误,但为什么Go语言中的通道在缓冲通道的最大缓冲大小上有限制呢?例如,如果我这样创建一个通道:

channel := make(chan int, 100)

我不能在通道中添加超过100个元素,否则会被阻塞,这是有原因的吗?此外,它们也不能动态调整大小,因为通道的API不支持这样做。

相比于无界信号量,这似乎在语言对于使用单一机制进行通用同步方面有些限制。例如,广义信号量的值可以无限增加。

英文:

I am new to Go and I might be missing the point but why are Go channels limited in the maximum buffer size buffered channels can have? For example if I make a channel like so

channel := make(chan int, 100)

I cannot add more than 100 elements to the channel without blocking, is there a reason for this? Further they cannot dynamically be resized, because the channel API does not support that.

This seems sort of limiting in the language's support for universal synchronization with a single mechanism since it lacks convenience compared to an unbounded semaphore. For example a generalized semaphore's value can be increased without bounds.

答案1

得分: 24

如果程序的某个组件无法跟上其输入,它需要对系统的其余部分施加反压力,而不是让其继续运行并生成几GB的数据,这些数据将永远不会被处理,因为系统已经耗尽了内存并崩溃。

实际上并不存在无限制的缓冲区,因为机器对其处理能力是有限制的。Go语言要求你为带缓冲通道指定一个大小,这样你就会考虑到你的程序实际上需要和能够处理的缓冲区大小。如果确实需要十亿个项目,并且能够处理它们,你可以创建一个如此大的通道。但在大多数情况下,实际上只需要一个缓冲区大小为0或1。

英文:

If one component of a program can't keep up with its input, it needs to put back-pressure on the rest of the system, rather than letting it run ahead and generate gigabytes of data that will never get processed because the system ran out of memory and crashed.

There is really no such thing as an unlimited buffer, because machines have limits on what they can handle. Go requires you to specify a size for buffered channels so that you will think about what size buffer your program actually needs and can handle. If it really needs a billion items, and can handle them, you can create a channel that big. But in most cases a buffer size of 0 or 1 is actually what is needed.

答案2

得分: 2

这是因为通道(Channels)是为并发的goroutine之间的高效通信而设计的,但你的需求与此不同:你的阻塞表示接收方没有处理工作队列,并且“动态”很少是免费的。

你可以使用各种不同的模式和算法来解决你的问题:你可以将通道改为接受int数组,你可以添加额外的goroutine来更好地平衡或过滤工作。或者你可以实现自己的动态通道。这样做对于理解为什么动态通道不是构建并发的好方法来说,肯定是一个有用的练习。

package main

import "fmt"

func dynamicChannel(initial int) (chan<- interface{}, <-chan interface{}) {
	in := make(chan interface{}, initial)
	out := make(chan interface{}, initial)
	go func() {
		defer close(out)
		buffer := make([]interface{}, 0, initial)
	loop:
		for {
			packet, ok := <-in
			if !ok {
				break loop
			}
			select {
			case out <- packet:
				continue
			default:
			}
			buffer = append(buffer, packet)
			for len(buffer) > 0 {
				select {
				case packet, ok := <-in:
					if !ok {
						break loop
					}
					buffer = append(buffer, packet)

				case out <- buffer[0]:
					buffer = buffer[1:]
				}
			}
		}
		for len(buffer) > 0 {
			out <- buffer[0]
			buffer = buffer[1:]
		}
	}()

	return in, out
}

func main() {
	in, out := dynamicChannel(4)

	in <- 10
	fmt.Println(<-out)

	in <- 20
	in <- 30

	fmt.Println(<-out)
	fmt.Println(<-out)

	for i := 100; i < 120; i++ {
		in <- i
	}
	fmt.Println("queued 100-120")
	fmt.Println(<-out)
	close(in)
	fmt.Println("in closed")
	for i := range out {
		fmt.Println(i)
	}
}

通常,如果你的程序出现阻塞,这意味着你的并发性不够平衡。考虑采用不同的策略。例如,一个简单的工具可以查找具有匹配的.checksum文件的文件,然后检查哈希值:

func crawl(toplevel string, workQ chan<- string) {
	defer close(workQ)
	filepath.Walk(toplevel, func(path string, info os.FileInfo, err error) error {
		if err == nil && info.Mode().IsRegular() {
			workQ <- path
		}
		return nil
	})
}

func validateWorker(workQ <-chan string) {
	for path := range workQ {
		expectedSum, err := os.ReadFile(path + ".checksum")[:256]
		if err != nil {
			continue
		}
		file, err := os.Open(path)
		if err != nil {
			continue
		}
		defer file.Close()
		hash := sha256.New()
		if _, err := io.Copy(hash, file); err != nil {
			log.Printf("couldn't hash %s: %w", path, err)
			continue
		}
		actualSum := fmt.Sprintf("%x", hash.Sum(nil))
		if actualSum != expectedSum {
			log.Printf("%s: mismatch: expected %s, got %s", path, expectedSum, actualSum)
		}
	}
}

即使没有任何.checksum文件,crawl函数也往往比工作队列快。当遇到.checksum文件时,特别是如果文件很大,工作线程执行单个校验和可能需要更长的时间。

在这里,更好的目标是通过减少"validateWorker"函数的工作量来实现更一致的吞吐量。目前它有时很快,因为它检查校验和文件。有时它很慢,因为它还要读取和计算文件的哈希值。

type ChecksumQuery struct {
	Filepath  string
	ExpectSum string
}

func crawl(toplevel string, workQ chan<- ChecksumQuery) {
	checkupQ := make(chan string, 4)

	go func() {
		defer close(workQ)
		for path := range checkupQ {
			expected, err := os.ReadFile(path + ".checksum")[:256]
			if err == nil && len(expected) > 0 {
				workQ <- ChecksumQuery{path, string(expected)}
			}
		}
	}()

	go func() {
		defer close(checkupQ)
		filepath.Walk(toplevel, func(path string, info os.FileInfo, err error) error {
			if err == nil && info.Mode().IsRegular() {
				checkupQ <- path
			}
			return nil
		})
	}()
}

// 运行适量的validate workers,为workQ分配适当的大小,但如果爬虫或校验函数阻塞,那是因为validate正在执行有用的工作。

如果你的校验工作线程都很忙,它们都在从磁盘中读取和计算大文件的哈希值。让其他工作线程通过爬取更多的文件名、分配和传递字符串来中断这个过程并不是有利的。

在其他情况下,可能会传递大型的列表给工作线程,在这种情况下,可以通过通道传递切片(这是廉价的);或者传递动态大小的一组元素,在这种情况下,可以考虑通过通道传递通道或捕获。

英文:

This is because Channels are designed for efficient communication between concurrent goroutines, but the need you have is something different: the fact you are blocking denotes that the recipient is not attending the work queue, and "dynamic" is rarely free.

There are a variety of different patterns and algorithms you can use to solve the problem you have: you could change your channel to accept arrays of ints, you could add additional goroutines to better balance or filter work. Or you could implement your own dynamic channel. Doing so is certainly a useful exercise for seeing why dynamic channels aren't a great way to build concurrency.

package main
import &quot;fmt&quot;
func dynamicChannel(initial int) (chan &lt;- interface{}, &lt;- chan interface{}) {
in := make(chan interface{}, initial)
out := make(chan interface{}, initial)
go func () {
defer close(out)
buffer := make([]interface{}, 0, initial)
loop:
for {
packet, ok := &lt;- in
if !ok {
break loop
}
select {
case out &lt;- packet:
continue
default:
}
buffer = append(buffer, packet)
for len(buffer) &gt; 0 {
select {
case packet, ok := &lt;-in:
if !ok {
break loop
}
buffer = append(buffer, packet)
case out &lt;- buffer[0]:
buffer = buffer[1:]
}
}
}
for len(buffer) &gt; 0 {
out &lt;- buffer[0]
buffer = buffer[1:]
}
} ()
return in, out
}
func main() {
in, out := dynamicChannel(4)
in &lt;- 10
fmt.Println(&lt;-out)
in &lt;- 20
in &lt;- 30
fmt.Println(&lt;-out)
fmt.Println(&lt;-out)
for i := 100; i &lt; 120; i++ {
in &lt;- i
}
fmt.Println(&quot;queued 100-120&quot;)
fmt.Println(&lt;-out)
close(in)
fmt.Println(&quot;in closed&quot;)
for i := range out {
fmt.Println(i)
}
}

Generally, if you are blocking, it indicates your concurrency is not well balanced. Consider a different strategy. For example, a simple tool to look for files with a matching .checksum file and then check the hashes:

func crawl(toplevel string, workQ chan &lt;- string) {
defer close(workQ)
for _, path := filepath.Walk(toplevel, func (path string, info os.FileInfo, err error) error {
if err == nil &amp;&amp; info.Mode().IsRegular() {
workQ &lt;- path
}
}
// if a file has a .checksum file, compare it with the file&#39;s checksum.
func validateWorker(workQ &lt;- chan string) {
for path := range workQ {
// If there&#39;s a .checksum file, read it, limit to 256 bytes.
expectedSum, err := os.ReadFile(path + &quot;.checksum&quot;)[:256]
if err != nil {  // ignore
continue
}
file, err := os.Open(path)
if err != nil {
continue
}
defer close(file)
hash := sha256.New()
if _, err := io.Copy(hash, file); err != nil {
log.Printf(&quot;couldn&#39;t hash %s: %w&quot;, path, err)
continue
}
actualSum := fmt.Sprintf(&quot;%x&quot;, hash.Sum(nil))
if actualSum != expectedSum {
log.Printf(&quot;%s: mismatch: expected %s, got %s&quot;, path, expectedSum, actualSum)
}
}

Even without any .checksum files, the crawl function will tend to outpace the worker queue. When .checksum files are encountered, especially if the files are large, the worker could take much, much longer to perform a single checksum.

A better aim here would be to achieve more consistent throughput by reducing the number of things the "validateWorker" does. Right now it is sometimes fast, because it checks for the checksum file. Othertimes it is slow, because it also loads has to read and checksum the files.

type ChecksumQuery struct {
Filepath  string
ExpectSum string
}
func crawl(toplevel string, workQ chan &lt;- ChecksumQuery) {
// Have a worker filter out files which don&#39;t have .checksums, and allow it
// to get a little ahead of the crawl function.
checkupQ := make(chan string, 4)
go func () {
defer close(workQ)
for path := range checkupQ {
expected, err := os.ReadFile(path + &quot;.checksum&quot;)[:256]
if err == nil &amp;&amp; len(expected) &gt; 0 {
workQ &lt;- ChecksumQuery{ path, string(expected) }
}
}
}()
go func () {
defer close(checkupQ)
for _, path := filepath.Walk(toplevel, func (path string, info os.FileInfo, err error) error {
if err == nil &amp;&amp; info.Mode().IsRegular() {
checkupQ &lt;- path
}
}
}()
}

Run a suitable number of validate workers, assign the workQ a suitable size, but if the crawler or validate functions block, it is because validate is doing useful work.

If your validate workers are all busy, they are all consuming large files from disk and hashing them. Having other workers interrupt this by crawling for more filenames, allocating and passing strings, isn't advantageous.

Other scenarios might be passing large lists to workers, in which pass the slices over channels (its cheap); or dynamic sized groups of things, in which case consider passing channels or captures over channels.

答案3

得分: -5

缓冲区大小是指在发送阻塞之前可以发送到通道的元素数量。默认情况下,通道的缓冲区大小为0(使用make(chan int)创建通道时会得到这个值)。这意味着每次发送操作都会阻塞,直到另一个goroutine从通道接收数据。缓冲区大小为1的通道可以存储1个元素,直到发送操作阻塞,因此你可以这样使用:

c := make(chan int, 1)
c <- 1 // 不会阻塞
c <- 2 // 阻塞,直到另一个goroutine从通道接收数据

我建议你查看以下链接以获得更多解释:
https://rogpeppe.wordpress.com/2010/02/10/unlimited-buffering-with-low-overhead/
http://openmymind.net/Introduction-To-Go-Buffered-Channels/

英文:

The buffer size is the number of elements that can be sent to the channel without the send blocking. By default, a channel has a buffer size of 0 (you get this with make(chan int)). This means that every single send will block until another goroutine receives from the channel. A channel of buffer size 1 can hold 1 element until sending blocks, so you'd get

c := make(chan int, 1)
c &lt;- 1 // doesn&#39;t block
c &lt;- 2 // blocks until another goroutine receives from the channel

I suggest you to look this for more clarification:
https://rogpeppe.wordpress.com/2010/02/10/unlimited-buffering-with-low-overhead/
http://openmymind.net/Introduction-To-Go-Buffered-Channels/

huangapple
  • 本文由 发表于 2017年1月28日 11:52:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/41906146.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定