如何解决这个问题:panic: sync: negative WaitGroup counter

huangapple go评论234阅读模式
英文:

How to resolve this issue: panic: sync: negative WaitGroup counter

问题

运行多次后,我有时会遇到这个问题。我知道这与计数器有关。当调用sync.WaitGroup的Done()方法的次数多于调用Add()方法的次数时,就会抛出此错误。

我该如何修复这个问题?

我的代码创建了大小为4的批次,并对每个批次进行一些处理,但是我在解决这个恐慌时遇到了问题。

package main

import (
	"fmt"
	"sync"
)

func main() {
	// 创建输入通道
	input := make(chan int)

	// 创建等待组
	var wg sync.WaitGroup

	// 启动批处理协程
	wg.Add(1)
	go batcher(input, &wg)

	// 将输入值发送到批处理器
	for i := 1; i <= 10; i++ {
		input <- i
	}

	// 关闭输入通道
	close(input)

	// 等待批处理协程完成
	wg.Wait()
}

func batcher(input chan int, wg *sync.WaitGroup) {
	// 创建大小为4的批次通道
	batch := make(chan int, 4)

	// 创建用于同步工作协程的通道
	done := make(chan bool)

	// 创建工作协程的等待组
	var workerWg sync.WaitGroup

	// 启动工作协程
	for i := 0; i < 4; i++ {
		workerWg.Add(1)
		go worker(batch, &workerWg, done)
	}

	// 读取输入值并发送到批次通道
	for value := range input {
		batch <- value
		if len(batch) == 4 {
			// 等待工作协程完成批次处理
			workerWg.Wait()

			// 将批次发送给工作协程
			for i := 0; i < 4; i++ {
				workerWg.Add(1)
				go sendBatch(batch, &workerWg, done)
			}
		}
	}

	// 等待工作协程完成剩余批次处理
	workerWg.Wait()

	// 关闭done通道以通知所有批次已处理完毕
	close(done)

	wg.Done()
}

func sendBatch(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
	// 处理批次
	for value := range batch {
		fmt.Println("处理值:", value)
	}

	// 通知工作协程批次已处理完毕
	workerWg.Done()

	select {
	case done <- true:
	default:
		// done通道已关闭
	}
}

func worker(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
	// 处理从批次通道接收到的批次
	for batch := range batch {
		// 处理批次
		fmt.Println("处理批次:", batch)
		workerWg.Done()
	}

	// 通知批处理协程工作协程已完成
	select {
	case done <- true:
	default:
		// done通道已关闭
	}
}

编写批处理器的基本代码:

package main

import (
	"fmt"
	"sync"
)

func main() {
	input := make(chan int)
	output := make(chan []int)

	var wg sync.WaitGroup
	wg.Add(2)

	// 启动批处理器协程
	go func() {
		batch := []int{}
		for value := range input {
			batch = append(batch, value)
			if len(batch) == 4 {
				output <- batch
				batch = []int{}
			}
		}
		if len(batch) > 0 {
			output <- batch
		}
		close(output)
		wg.Done()
	}()

	// 启动工作协程
	go func() {
		for batch := range output {
			sum := 0
			for _, value := range batch {
				sum += value
			}
			fmt.Printf("批次 %v 的总和:%d\n", batch, sum)
		}
		wg.Done()
	}()

	// 将输入值发送到批处理器
	for _, v := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
		input <- v
	}
	close(input)

	// 等待两个协程完成
	wg.Wait()
}
批次 [1 2 3 4] 的总和:10
批次 [5 6 7 8] 的总和:26
批次 [9 10] 的总和:19

之前的设计有点复杂,我将尝试扩展这个基本的设计。

英文:

After running again and again I am getting sometimes this issue. I know it's related to counter related. When the Done() method of a sync.WaitGroup is called more times than the Add() method was called then it will throw this error.

How do I fix this?

My code created batches of size 4 and do some processing on each batch but I am getting issue with resolving this panic.

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

func main() {
	// create input channel
	input := make(chan int)

	// create wait group
	var wg sync.WaitGroup

	// start batcher goroutine
	wg.Add(1)
	go batcher(input, &amp;wg)

	// send input values to the batcher
	for i := 1; i &lt;= 10; i++ {
		input &lt;- i
	}

	// close input channel
	close(input)

	// wait for batcher goroutine to finish
	wg.Wait()
}

func batcher(input chan int, wg *sync.WaitGroup) {
	// create batch channel with buffer of size 4
	batch := make(chan int, 4)

	// create channel to synchronize worker goroutines
	done := make(chan bool)

	// create wait group for worker goroutines
	var workerWg sync.WaitGroup

	// start worker goroutines
	for i := 0; i &lt; 4; i++ {
		workerWg.Add(1)
		go worker(batch, &amp;workerWg, done)
	}

	// read input values and send to batch
	for value := range input {
		batch &lt;- value
		if len(batch) == 4 {
			// wait for worker goroutines to finish processing batch
			workerWg.Wait()

			// send batch to worker goroutines
			for i := 0; i &lt; 4; i++ {
				workerWg.Add(1)
				go sendBatch(batch, &amp;workerWg, done)
			}
		}
	}

	// wait for worker goroutines to finish processing remaining batch
	workerWg.Wait()

	// close done channel to notify that all batches have been processed
	close(done)

	wg.Done()
}

func sendBatch(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
	// process batch
	for value := range batch {
		fmt.Println(&quot;Processing value:&quot;, value)
	}

	// notify worker goroutines that batch has been processed
	workerWg.Done()

	select {
	case done &lt;- true:
	default:
		// done channel has been closed
	}
}

func worker(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
	// process batches received from batch channel
	for batch := range batch {
		// process batch
		fmt.Println(&quot;Processing batch:&quot;, batch)
		workerWg.Done()
	}

	// notify batcher goroutine that worker goroutine has finished
	select {
	case done &lt;- true:
	default:
		// done channel has been closed
	}
}

Basic code to write batcher:

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

func main() {
	input := make(chan int)
	output := make(chan []int)

	var wg sync.WaitGroup
	wg.Add(2)

	// Start the batcher goroutine
	go func() {
		batch := []int{}
		for value := range input {
			batch = append(batch, value)
			if len(batch) == 4 {
				output &lt;- batch
				batch = []int{}
			}
		}
		if len(batch) &gt; 0 {
			output &lt;- batch
		}
		close(output)
		wg.Done()
	}()

	// Start the worker goroutine
	go func() {
		for batch := range output {
			sum := 0
			for _, value := range batch {
				sum += value
			}
			fmt.Printf(&quot;Sum of batch %v: %d\n&quot;, batch, sum)
		}
		wg.Done()
	}()

	// Send input values to the batcher
	for _, v := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
		input &lt;- v
	}
	close(input)

	// Wait for both goroutines to finish
	wg.Wait()
}
Sum of batch [1 2 3 4]: 10
Sum of batch [5 6 7 8]: 26
Sum of batch [9 10]: 19

Earlier design was bit complicated I will try to extend this basic one.

答案1

得分: 1

根据这段代码:

for i := 0; i < 4; i++ {
    workerWg.Add(1)
    go worker(batch, &workerWg, done)
}

我认为workerWg.Done()应该移出循环:

func worker(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
	defer workerWg.Done()
	// 处理从batch通道接收到的批次
	for batch := range batch {
		// 处理批次
		fmt.Println("Processing batch:", batch)
		workerWg.Done()
	}

	// 通知批处理goroutine工作goroutine已经完成
	select {
	case done <- true:
	default:
		// done通道已关闭
	}
}

但是在示例中,batch通道没有关闭。因此,实际上,goroutine将一直运行,直到程序结束。

我不确定是否还有其他问题。这个设计太复杂了。复杂的代码很难理解,容易出错。考虑重新设计它。

英文:

According to this code:

for i := 0; i &lt; 4; i++ {
    workerWg.Add(1)
    go worker(batch, &amp;workerWg, done)
}

I think workerWg.Done() should be moved outside of the loop:

  func worker(batch chan int, workerWg *sync.WaitGroup, done chan bool) {
+ 	defer workerWg.Done()
  	// process batches received from batch channel
  	for batch := range batch {
  		// process batch
  		fmt.Println(&quot;Processing batch:&quot;, batch)
- 		workerWg.Done()
  	}

  	// notify batcher goroutine that worker goroutine has finished
  	select {
  	case done &lt;- true:
  	default:
  		// done channel has been closed
  	}
  }

But batch is not closed in the demo. So in fact, the goroutine will run for ever until the program is ended.

I'm not sure whether there is any other issue. The design is too complicated. Complicated code is hard to understand and prone to error. Consider redesigning it.

huangapple
  • 本文由 发表于 2023年4月27日 14:11:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/76117234.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定