优雅地关闭通道并不在关闭的通道上发送消息。

huangapple go评论106阅读模式
英文:

Gracefully closing channel and not sending on closed channel

问题

我是你的中文翻译助手,以下是你提供的代码的翻译:

我对Golang并发编程还不熟悉,一直在努力理解下面这段代码。

我注意到了一些我无法解释的现象:

  1. 在主函数中使用小于等于100000的i值时,有时会打印出不同的nResults和countWrites的值(在最后两个语句中)。
fmt.Printf("number of result writes %d\n", nResults)
fmt.Printf("Number of job writes %d\n", jobWrites)
  1. 当使用大于1000000的i值时,会出现"panic: send on closed channel"的错误。

我该如何确保发送给jobs的值不是在关闭的通道上,并且在接收到所有结果后关闭通道而不发生死锁?

package main

import (
	"fmt"
	"sync"
)

func worker(wg *sync.WaitGroup, id int, jobs <-chan int, results chan<- int, countWrites *int64) {
	defer wg.Done()
	for j := range jobs {
		*countWrites += 1
		go func(j int) {
			if j%2 == 0 {
				results <- j * 2
			} else {
				results <- j
			}
		}(j)
	}
}

func main() {
	wg := &sync.WaitGroup{}
	jobs := make(chan int)
	results := make(chan int)
	var i int = 1
	var jobWrites int64 = 0
	for i <= 10000000 {
		go func(j int) {
			if j%2 == 0 {
				i += 99
				j += 99
			}
			jobWrites += 1
			jobs <- j
		}(i)
		i += 1
	}

	var nResults int64 = 0
	for w := 1; w < 1000; w++ {
		wg.Add(1)
		go worker(wg, w, jobs, results, &nResults)
	}

	close(jobs)
	wg.Wait()

	var sum int32 = 0
	var count int64 = 0
	for r := range results {
		count += 1
		sum += int32(r)
		if count == nResults {
			close(results)
		}
	}
	fmt.Println(sum)
	fmt.Printf("number of result writes %d\n", nResults)
	fmt.Printf("Number of job writes %d\n", jobWrites)
}
英文:

I am new to Golang concurrency and have been working to understand this piece of code mentioned below.

I witness few things which I am unable to explain why it happens:

  1. when using i smaller than equal to 100000 for i &lt;= 100000 { in main function, it sometimes prints different values for nResults and countWrites (in last two statements)
    fmt.Printf(&quot;number of result writes %d\n&quot;, nResults)
    fmt.Printf(&quot;Number of job writes %d\n&quot;, jobWrites)

  2. when using i more than 1000000 it gives panic: send on closed channel

How can I make sure that the values send to jobs is not on closed channel and later after all values are received in results we can close the channel without deadlock?

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
func worker(wg *sync.WaitGroup, id int, jobs &lt;-chan int, results chan&lt;- int, countWrites *int64) {
defer wg.Done()
for j := range jobs {
*countWrites += 1
go func(j int) {
if j%2 == 0 {
results &lt;- j * 2
} else {
results &lt;- j
}
}(j)
}
}
func main() {
wg := &amp;sync.WaitGroup{}
jobs := make(chan int)
results := make(chan int)
var i int = 1
var jobWrites int64 = 0
for i &lt;= 10000000 {
go func(j int) {
if j%2 == 0 {
i += 99
j += 99
}
jobWrites += 1
jobs &lt;- j
}(i)
i += 1
}
var nResults int64 = 0
for w := 1; w &lt; 1000; w++ {
wg.Add(1)
go worker(wg, w, jobs, results, &amp;nResults)
}
close(jobs)
wg.Wait()
var sum int32 = 0
var count int64 = 0
for r := range results {
count += 1
sum += int32(r)
if count == nResults {
close(results)
}
}
fmt.Println(sum)
fmt.Printf(&quot;number of result writes %d\n&quot;, nResults)
fmt.Printf(&quot;Number of job writes %d\n&quot;, jobWrites)
}

答案1

得分: 1

你的代码中存在一些问题。

在关闭的通道上发送

使用Go通道的一个通用原则是

> 不要从接收方关闭通道,也不要在通道有多个并发发送方时关闭通道

(https://go101.org/article/channel-closing.html)

你的解决方案很简单:不要有多个并发的发送方,然后你可以从发送方关闭通道。

不要为每个添加到通道的作业启动数百万个单独的goroutine,而是运行一个goroutine来执行整个循环,将所有作业添加到通道中。在循环结束后关闭通道。工作协程将尽可能快地消费通道。

在多个goroutine中修改共享变量导致数据竞争

你在不采取特殊步骤的情况下修改了两个共享变量:

  1. nResults,你将其传递给工作协程中的 countWrites *int64
  2. 循环中的 i,你从多个goroutine中将其增加99,使得你实际上写入 jobs 通道的值是不可预测的。

要解决第一个问题,有很多选项,包括使用 sync.Mutex。然而,由于你只是在增加它,最简单的解决方案是使用 atomic.AddInt64(countWrites, 1) 替代 *countWrites += 1

要解决第二个问题,不要为每次写入通道使用一个goroutine,而是为整个循环使用一个goroutine(参见上文)。

英文:

Quite a few problems in your code.

Sending on closed channel

One general principle of using Go channels is

> don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders

(https://go101.org/article/channel-closing.html)

The solution for you is simple: don't have multiple concurrent senders, and then you can close the channel from the sender side.

Instead of starting millions of separate goroutine for each job you add to the channel, run one goroutine that executes the whole loop to add all jobs to the channel. And close the channel after the loop. The workers will consume the channel as fast as they can.

Data races by modifying shared variables in multiple goroutines

You're modifying two shared variables without taking special steps:

  1. nResults, which you pass to the countWrites *int64 in the worker.
  2. i in the loop that writes to the jobs channel: you're adding 99 to it from multiple goroutines, making it unpredictable how many values you actually write to the jobs channel

To solve 1, there are many options, including using sync.Mutex. However since you're just adding to it, the easiest solution is to use atomic.AddInt64(countWrites, 1) instead of *countWrites += 1

To solve 2, don't use one goroutine per write to the channel, but one goroutine for the entire loop (see above)

huangapple
  • 本文由 发表于 2022年11月17日 10:17:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/74469385.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定