如何解决这个问题:panic: sync: negative WaitGroup counter

huangapple go评论126阅读模式
英文:

How to resolve this issue: panic: sync: negative WaitGroup counter

问题

运行多次后,我有时会遇到这个问题。我知道这与计数器有关。当调用sync.WaitGroup的Done()方法的次数多于调用Add()方法的次数时,就会抛出此错误。

我该如何修复这个问题?

我的代码创建了大小为4的批次,并对每个批次进行一些处理,但是我在解决这个恐慌时遇到了问题。

package main

import (
	"fmt"
	"sync"
)

func main() {
	// 创建输入通道
	input := make(chan int)

	// 创建等待组
	var wg sync.WaitGroup

	// 启动批处理协程
	wg.Add(1)
	go batcher(input, &wg)

	// 将输入值发送到批处理器
	for i := 1; i <= 10; i++ {
		input <- i
	}

	// 关闭输入通道
	close(input)

	// 等待批处理协程完成
	wg.Wait()
}

func batcher(input chan int, wg *sync.WaitGroup) {
	// 创建大小为4的批次通道
	batch := make(chan int, 4)

	// 创建用于同步工作协程的通道
	done := make(chan bool)

	// 创建工作协程的等待组
	var workerWg sync.WaitGroup

	// 启动工作协程
	for i := 0; i < 4; i++ {
		workerWg.Add(1)
		go worker(batch, &workerWg, done)
	}

	// 读取输入值并发送到批次通道
	for value := range input {
		batch <- value
		if len(batch) == 4 {
			// 等待工作协程完成批次处理
			workerWg.Wait()

			// 将批次发送给工作协程
			for i := 0; i < 4; i++ {
				workerWg.Add(1)
				go sendBatch(batch, &workerWg, done)
			}
		}
	}

	// 等待工作协程完成剩余批次处理
	workerWg.Wait()

	// 关闭done通道以通知所有批次已处理完毕
	close(done)

	wg.Done()
}

func sendBatch(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
	// 处理批次
	for value := range batch {
		fmt.Println("处理值:", value)
	}

	// 通知工作协程批次已处理完毕
	workerWg.Done()

	select {
	case done <- true:
	default:
		// done通道已关闭
	}
}

func worker(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
	// 处理从批次通道接收到的批次
	for batch := range batch {
		// 处理批次
		fmt.Println("处理批次:", batch)
		workerWg.Done()
	}

	// 通知批处理协程工作协程已完成
	select {
	case done <- true:
	default:
		// done通道已关闭
	}
}

编写批处理器的基本代码:

package main

import (
	"fmt"
	"sync"
)

func main() {
	input := make(chan int)
	output := make(chan []int)

	var wg sync.WaitGroup
	wg.Add(2)

	// 启动批处理器协程
	go func() {
		batch := []int{}
		for value := range input {
			batch = append(batch, value)
			if len(batch) == 4 {
				output <- batch
				batch = []int{}
			}
		}
		if len(batch) > 0 {
			output <- batch
		}
		close(output)
		wg.Done()
	}()

	// 启动工作协程
	go func() {
		for batch := range output {
			sum := 0
			for _, value := range batch {
				sum += value
			}
			fmt.Printf("批次 %v 的总和:%d\n", batch, sum)
		}
		wg.Done()
	}()

	// 将输入值发送到批处理器
	for _, v := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
		input <- v
	}
	close(input)

	// 等待两个协程完成
	wg.Wait()
}
批次 [1 2 3 4] 的总和:10
批次 [5 6 7 8] 的总和:26
批次 [9 10] 的总和:19

之前的设计有点复杂,我将尝试扩展这个基本的设计。

英文:

After running again and again I am getting sometimes this issue. I know it's related to counter related. When the Done() method of a sync.WaitGroup is called more times than the Add() method was called then it will throw this error.

How do I fix this?

My code created batches of size 4 and do some processing on each batch but I am getting issue with resolving this panic.

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
func main() {
// create input channel
input := make(chan int)
// create wait group
var wg sync.WaitGroup
// start batcher goroutine
wg.Add(1)
go batcher(input, &amp;wg)
// send input values to the batcher
for i := 1; i &lt;= 10; i++ {
input &lt;- i
}
// close input channel
close(input)
// wait for batcher goroutine to finish
wg.Wait()
}
func batcher(input chan int, wg *sync.WaitGroup) {
// create batch channel with buffer of size 4
batch := make(chan int, 4)
// create channel to synchronize worker goroutines
done := make(chan bool)
// create wait group for worker goroutines
var workerWg sync.WaitGroup
// start worker goroutines
for i := 0; i &lt; 4; i++ {
workerWg.Add(1)
go worker(batch, &amp;workerWg, done)
}
// read input values and send to batch
for value := range input {
batch &lt;- value
if len(batch) == 4 {
// wait for worker goroutines to finish processing batch
workerWg.Wait()
// send batch to worker goroutines
for i := 0; i &lt; 4; i++ {
workerWg.Add(1)
go sendBatch(batch, &amp;workerWg, done)
}
}
}
// wait for worker goroutines to finish processing remaining batch
workerWg.Wait()
// close done channel to notify that all batches have been processed
close(done)
wg.Done()
}
func sendBatch(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
// process batch
for value := range batch {
fmt.Println(&quot;Processing value:&quot;, value)
}
// notify worker goroutines that batch has been processed
workerWg.Done()
select {
case done &lt;- true:
default:
// done channel has been closed
}
}
func worker(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
// process batches received from batch channel
for batch := range batch {
// process batch
fmt.Println(&quot;Processing batch:&quot;, batch)
workerWg.Done()
}
// notify batcher goroutine that worker goroutine has finished
select {
case done &lt;- true:
default:
// done channel has been closed
}
}

Basic code to write batcher:

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
func main() {
input := make(chan int)
output := make(chan []int)
var wg sync.WaitGroup
wg.Add(2)
// Start the batcher goroutine
go func() {
batch := []int{}
for value := range input {
batch = append(batch, value)
if len(batch) == 4 {
output &lt;- batch
batch = []int{}
}
}
if len(batch) &gt; 0 {
output &lt;- batch
}
close(output)
wg.Done()
}()
// Start the worker goroutine
go func() {
for batch := range output {
sum := 0
for _, value := range batch {
sum += value
}
fmt.Printf(&quot;Sum of batch %v: %d\n&quot;, batch, sum)
}
wg.Done()
}()
// Send input values to the batcher
for _, v := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
input &lt;- v
}
close(input)
// Wait for both goroutines to finish
wg.Wait()
}
Sum of batch [1 2 3 4]: 10
Sum of batch [5 6 7 8]: 26
Sum of batch [9 10]: 19

Earlier design was bit complicated I will try to extend this basic one.

答案1

得分: 1

根据这段代码:

for i := 0; i < 4; i++ {
    workerWg.Add(1)
    go worker(batch, &workerWg, done)
}

我认为workerWg.Done()应该移出循环:

func worker(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
	defer workerWg.Done()
	// 处理从batch通道接收到的批次
	for batch := range batch {
		// 处理批次
		fmt.Println("Processing batch:", batch)
		workerWg.Done()
	}

	// 通知批处理goroutine工作goroutine已经完成
	select {
	case done <- true:
	default:
		// done通道已关闭
	}
}

但是在示例中,batch通道没有关闭。因此,实际上,goroutine将一直运行,直到程序结束。

我不确定是否还有其他问题。这个设计太复杂了。复杂的代码很难理解,容易出错。考虑重新设计它。

英文:

According to this code:

for i := 0; i &lt; 4; i++ {
workerWg.Add(1)
go worker(batch, &amp;workerWg, done)
}

I think workerWg.Done() should be moved outside of the loop:

  func worker(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
+ 	defer workerWg.Done()
// process batches received from batch channel
for batch := range batch {
// process batch
fmt.Println(&quot;Processing batch:&quot;, batch)
- 		workerWg.Done()
}
// notify batcher goroutine that worker goroutine has finished
select {
case done &lt;- true:
default:
// done channel has been closed
}
}

But batch is not closed in the demo. So in fact, the goroutine will run for ever until the program is ended.

I'm not sure whether there is any other issue. The design is too complicated. Complicated code is hard to understand and prone to error. Consider redesigning it.

huangapple
  • 本文由 发表于 2023年4月27日 14:11:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/76117234.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定