英文:
Race condition on fixed number of workers pattern
问题
我正在玩一些代码以进行学习,并且在使用-race
标志时执行时出现了竞争条件,我想要理解为什么会这样。该代码启动了一组固定的goroutine,它们充当从通道中消费任务的工作程序,任务的数量没有固定,只要通道接收到任务,工作程序就必须继续工作。
当调用WaitGroup
函数时,我遇到了竞争条件。根据我理解(查看数据竞争报告),竞争条件发生在其中一个派生的goroutine执行第一个wg.Add
调用时,主例程同时调用wg.Wait
。这样理解对吗?如果是这样,这意味着我必须始终在主例程中执行Add调用,以避免对资源的这种竞争?但是,这也意味着我需要事先知道工作程序将处理多少个任务,这有点糟糕,如果我需要代码处理可能在工作程序运行时出现的任意数量的任务...
代码如下:
func Test(t *testing.T) {
t.Run("", func(t *testing.T) {
var wg sync.WaitGroup
queuedTaskC := make(chan func())
for i := 0; i < 5; i++ {
wID := i + 1
go func(workerID int) {
for task := range queuedTaskC {
wg.Add(1)
task()
}
}(wID)
}
taskFn := func() {
fmt.Println("executing task...")
wg.Done()
}
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
wg.Wait()
close(queuedTaskC)
fmt.Println(len(queuedTaskC))
})
}
报告如下:
==================
WARNING: DATA RACE
Read at 0x00c0001280d8 by goroutine 11:
internal/race.Read()
/src/internal/race/race.go:37 +0x206
sync.(*WaitGroup).Add()
/src/sync/waitgroup.go:71 +0x219
workerpool.Test.func1.1()
/workerpool/workerpool_test.go:36 +0x64
Previous write at 0x00c0001280d8 by goroutine 8:
internal/race.Write()
/src/internal/race/race.go:41 +0x125
sync.(*WaitGroup).Wait()
/src/sync/waitgroup.go:128 +0x126
workerpool.Test.func1()
/workerpool/workerpool_test.go:57 +0x292
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 11 (running) created at:
workerpool.Test.func1()
/workerpool/workerpool_test.go:34 +0xe4
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 8 (running) created at:
testing.(*T).Run()
/src/testing/testing.go:1168 +0x5bb
workerpool.Test()
workerpool_test.go:29 +0x4c
testing.tRunner()
/src/testing/testing.go:1123 +0x202
==================
英文:
I'm playing with some code for learning purposes and I am getting a race condition on its execution when using the -race
flag and I want to understand why. The code starts a fixed set of goroutines that act as workers consuming tasks from a channel, there is no fixed number of tasks, as long as the channel receives tasks the workers must keep working.
I'm getting a race condition when calling the WaitGroup
functions. From what I understand (taking a look at the data race report) the race condition happens when the first wg.Add
call is executed by one of the spawned goroutines and the main routine calls wg.Wait
at the same time. Is that correct? If it is, it means that I must always execute calls to Add on the main routine to avoid this kind of race on the resource? But, that also would mean that I need to know how many tasks the workers will need to handle in advance, which kinds of sucks if I need that the code handles any number of tasks that may come once the workers are running...
The code:
func Test(t *testing.T) {
t.Run("", func(t *testing.T) {
var wg sync.WaitGroup
queuedTaskC := make(chan func())
for i := 0; i < 5; i++ {
wID := i + 1
go func(workerID int) {
for task := range queuedTaskC {
wg.Add(1)
task()
}
}(wID)
}
taskFn := func() {
fmt.Println("executing task...")
wg.Done()
}
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
wg.Wait()
close(queuedTaskC)
fmt.Println(len(queuedTaskC))
})
}
The report:
==================
WARNING: DATA RACE
Read at 0x00c0001280d8 by goroutine 11:
internal/race.Read()
/src/internal/race/race.go:37 +0x206
sync.(*WaitGroup).Add()
/src/sync/waitgroup.go:71 +0x219
workerpool.Test.func1.1()
/workerpool/workerpool_test.go:36 +0x64
Previous write at 0x00c0001280d8 by goroutine 8:
internal/race.Write()
/src/internal/race/race.go:41 +0x125
sync.(*WaitGroup).Wait()
/src/sync/waitgroup.go:128 +0x126
workerpool.Test.func1()
/workerpool/workerpool_test.go:57 +0x292
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 11 (running) created at:
workerpool.Test.func1()
/workerpool/workerpool_test.go:34 +0xe4
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 8 (running) created at:
testing.(*T).Run()
/src/testing/testing.go:1168 +0x5bb
workerpool.Test()
workerpool_test.go:29 +0x4c
testing.tRunner()
/src/testing/testing.go:1123 +0x202
==================
答案1
得分: 3
WaitGroup
的实现是基于内部计数器,该计数器通过Add
和Done
方法进行更改。Wait
方法在计数器归零之前不会返回。在特定条件下,可以重用WaitGroup
,这些条件在文档中有描述:
// 如果要重用WaitGroup来等待多个独立的事件集,
// 则新的Add调用必须在所有先前的Wait调用返回之后发生。
尽管您的代码没有重用wg
,但它能够多次将WaitGroup
计数器归零。这在给定时间没有任务正在处理时会发生,这在并发代码中是完全可能的。由于您的代码在调用Add
之前没有等待Wait
返回,因此会出现竞争条件错误。
正如评论中的每个人建议的那样,您应该放弃使用WaitGroup
来跟踪任务,而是控制运行的goroutine。以下是修改后的代码示例:
func Test(t *testing.T) {
var wg sync.WaitGroup
queuedTaskC := make(chan func(), 10)
for i := 0; i < 5; i++ {
wID := i + 1
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range queuedTaskC {
task()
}
}(wID)
}
for i := 0; i < 10; i++ {
queuedTaskC <- func() {
fmt.Println("执行任务...")
}
}
close(queuedTaskC)
wg.Wait()
fmt.Println(len(queuedTaskC))
}
英文:
WaitGroup
implementation is based on the internal counter which is changed by Add
and Done
methods. The Wait
method will not return until the counter is zeroed. It is also possible to reuse WaitGroup
but under certain conditions described in the documentation:
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
Although your code is not reusing wg
it's able to zero the WaitGroup
counter multiple times. This happens when no tasks are being processed at a given time, which is entirely possible in concurrent code. And since your code does not wait Wait
to return before calling Add
you get the race condition error.
As everyone suggests in the comments you should abandon the idea of tracking the task with WaitGroup
in favor of controlling running goroutines. Attaching the code proposal.
func Test(t *testing.T) {
var wg sync.WaitGroup
queuedTaskC := make(chan func(), 10)
for i := 0; i < 5; i++ {
wID := i + 1
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range queuedTaskC {
task()
}
}(wID)
}
for i := 0; i < 10; i++ {
queuedTaskC <- func() {
fmt.Println("executing task...")
}
}
close(queuedTaskC)
wg.Wait()
fmt.Println(len(queuedTaskC))
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论