固定数量工作线程模式中的竞态条件

huangapple go评论80阅读模式
英文:

Race condition on fixed number of workers pattern

问题

我正在玩一些代码以进行学习,并且在使用-race标志时执行时出现了竞争条件,我想要理解为什么会这样。该代码启动了一组固定的goroutine,它们充当从通道中消费任务的工作程序,任务的数量没有固定,只要通道接收到任务,工作程序就必须继续工作。

当调用WaitGroup函数时,我遇到了竞争条件。根据我理解(查看数据竞争报告),竞争条件发生在其中一个派生的goroutine执行第一个wg.Add调用时,主例程同时调用wg.Wait。这样理解对吗?如果是这样,这意味着我必须始终在主例程中执行Add调用,以避免对资源的这种竞争?但是,这也意味着我需要事先知道工作程序将处理多少个任务,这有点糟糕,如果我需要代码处理可能在工作程序运行时出现的任意数量的任务...

代码如下:

func Test(t *testing.T) {
    t.Run("", func(t *testing.T) {
        var wg sync.WaitGroup
        queuedTaskC := make(chan func())
        for i := 0; i < 5; i++ {
            wID := i + 1
            go func(workerID int) {
                for task := range queuedTaskC {
                    wg.Add(1)
                    task()
                }
            }(wID)
        }

        taskFn := func() {
            fmt.Println("executing task...")
            wg.Done()
        }
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn

        wg.Wait()
        close(queuedTaskC)

        fmt.Println(len(queuedTaskC))
    })
}

报告如下:

==================
WARNING: DATA RACE
Read at 0x00c0001280d8 by goroutine 11:
  internal/race.Read()
      /src/internal/race/race.go:37 +0x206
  sync.(*WaitGroup).Add()
      /src/sync/waitgroup.go:71 +0x219
  workerpool.Test.func1.1()
      /workerpool/workerpool_test.go:36 +0x64

Previous write at 0x00c0001280d8 by goroutine 8:
  internal/race.Write()
      /src/internal/race/race.go:41 +0x125
  sync.(*WaitGroup).Wait()
      /src/sync/waitgroup.go:128 +0x126
  workerpool.Test.func1()
      /workerpool/workerpool_test.go:57 +0x292
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202

Goroutine 11 (running) created at:
  workerpool.Test.func1()
      /workerpool/workerpool_test.go:34 +0xe4
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202

Goroutine 8 (running) created at:
  testing.(*T).Run()
      /src/testing/testing.go:1168 +0x5bb
  workerpool.Test()
      workerpool_test.go:29 +0x4c
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202
==================
英文:

I'm playing with some code for learning purposes and I am getting a race condition on its execution when using the -race flag and I want to understand why. The code starts a fixed set of goroutines that act as workers consuming tasks from a channel, there is no fixed number of tasks, as long as the channel receives tasks the workers must keep working.

I'm getting a race condition when calling the WaitGroup functions. From what I understand (taking a look at the data race report) the race condition happens when the first wg.Add call is executed by one of the spawned goroutines and the main routine calls wg.Wait at the same time. Is that correct? If it is, it means that I must always execute calls to Add on the main routine to avoid this kind of race on the resource? But, that also would mean that I need to know how many tasks the workers will need to handle in advance, which kinds of sucks if I need that the code handles any number of tasks that may come once the workers are running...

The code:

func Test(t *testing.T) {
	t.Run(&quot;&quot;, func(t *testing.T) {
		var wg sync.WaitGroup
		queuedTaskC := make(chan func())
		for i := 0; i &lt; 5; i++ {
			wID := i + 1
			go func(workerID int) {
				for task := range queuedTaskC {
					wg.Add(1)
					task()
				}
			}(wID)
		}

		taskFn := func() {
			fmt.Println(&quot;executing task...&quot;)
			wg.Done()
		}
		queuedTaskC &lt;- taskFn
		queuedTaskC &lt;- taskFn
		queuedTaskC &lt;- taskFn
		queuedTaskC &lt;- taskFn
		queuedTaskC &lt;- taskFn
		queuedTaskC &lt;- taskFn
		queuedTaskC &lt;- taskFn
		queuedTaskC &lt;- taskFn
		queuedTaskC &lt;- taskFn

		wg.Wait()
		close(queuedTaskC)

		fmt.Println(len(queuedTaskC))
	})
}

The report:

==================
WARNING: DATA RACE
Read at 0x00c0001280d8 by goroutine 11:
  internal/race.Read()
      /src/internal/race/race.go:37 +0x206
  sync.(*WaitGroup).Add()
      /src/sync/waitgroup.go:71 +0x219
  workerpool.Test.func1.1()
      /workerpool/workerpool_test.go:36 +0x64

Previous write at 0x00c0001280d8 by goroutine 8:
  internal/race.Write()
      /src/internal/race/race.go:41 +0x125
  sync.(*WaitGroup).Wait()
      /src/sync/waitgroup.go:128 +0x126
  workerpool.Test.func1()
      /workerpool/workerpool_test.go:57 +0x292
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202

Goroutine 11 (running) created at:
  workerpool.Test.func1()
      /workerpool/workerpool_test.go:34 +0xe4
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202

Goroutine 8 (running) created at:
  testing.(*T).Run()
      /src/testing/testing.go:1168 +0x5bb
  workerpool.Test()
      workerpool_test.go:29 +0x4c
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202
==================

答案1

得分: 3

WaitGroup的实现是基于内部计数器,该计数器通过AddDone方法进行更改。Wait方法在计数器归零之前不会返回。在特定条件下,可以重用WaitGroup,这些条件在文档中有描述:

// 如果要重用WaitGroup来等待多个独立的事件集,
// 则新的Add调用必须在所有先前的Wait调用返回之后发生。

尽管您的代码没有重用wg,但它能够多次将WaitGroup计数器归零。这在给定时间没有任务正在处理时会发生,这在并发代码中是完全可能的。由于您的代码在调用Add之前没有等待Wait返回,因此会出现竞争条件错误。

正如评论中的每个人建议的那样,您应该放弃使用WaitGroup来跟踪任务,而是控制运行的goroutine。以下是修改后的代码示例:

func Test(t *testing.T) {
	var wg sync.WaitGroup
	queuedTaskC := make(chan func(), 10)
	for i := 0; i < 5; i++ {
		wID := i + 1
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			for task := range queuedTaskC {
				task()
			}
		}(wID)
	}
	for i := 0; i < 10; i++ {
		queuedTaskC <- func() {
			fmt.Println("执行任务...")
		}
	}
	close(queuedTaskC)
	wg.Wait()
	fmt.Println(len(queuedTaskC))
}
英文:

WaitGroup implementation is based on the internal counter which is changed by Add and Done methods. The Wait method will not return until the counter is zeroed. It is also possible to reuse WaitGroup but under certain conditions described in the documentation:

// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.

Although your code is not reusing wg it's able to zero the WaitGroup counter multiple times. This happens when no tasks are being processed at a given time, which is entirely possible in concurrent code. And since your code does not wait Wait to return before calling Add you get the race condition error.

As everyone suggests in the comments you should abandon the idea of tracking the task with WaitGroup in favor of controlling running goroutines. Attaching the code proposal.

func Test(t *testing.T) {
	var wg sync.WaitGroup
	queuedTaskC := make(chan func(), 10)
	for i := 0; i &lt; 5; i++ {
		wID := i + 1
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			for task := range queuedTaskC {
				task()
			}
		}(wID)
	}
	for i := 0; i &lt; 10; i++ {
		queuedTaskC &lt;- func() {
			fmt.Println(&quot;executing task...&quot;)
		}
	}
	close(queuedTaskC)
	wg.Wait()
	fmt.Println(len(queuedTaskC))
}

huangapple
  • 本文由 发表于 2021年9月7日 20:50:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/69088560.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定