使用Go中的sync.Atomic包来防止竞态条件

huangapple go评论88阅读模式
英文:

Preventing race condition using the sync.Atomic package in Go

问题

我有以下在Go中实现的计数器,我想要并发使用它。我正在使用atomic包来存储状态,但不确定是否会遇到竞争条件。我是否需要添加额外的互斥锁来防止计数器减到零以下,或者原子操作已经足够安全了?谢谢!

type Counter struct {
	counter  uint64
	finished uint32
	sync.Mutex
}

// Inc将计数器加一
func (c *Counter) Inc() error {
	if c.isFinished() {
		return fmt.Errorf("计数器已完成")
	}

	atomic.AddUint64(&c.counter, 1)
	return nil
}

// Dec将计数器减一,但防止计数器变为零
func (c *Counter) Dec() {
	// 防止溢出
	if !c.isZero() {
		atomic.AddUint64(&c.counter, ^uint64(0))
	}
}

// Cancel设置完成标志,并将计数器设置为零
func (c *Counter) Cancel() {
	if !c.isFinished() {
		atomic.StoreUint32(&c.finished, 1)
		atomic.StoreUint64(&c.counter, 0)
	}
}

// 如果已完成,则返回true
func (c *Counter) isFinished() bool {
	return atomic.LoadUint32(&c.finished) == uint32(1)
}

// 如果计数器为零,则返回true
func (c *Counter) isZero() bool {
	return atomic.LoadUint64(&c.counter) == uint64(0)
}

更新:通过在下面的代码中使用-race标志运行,我能够检测到确实需要包含互斥锁。

var counter *Counter = &Counter{}

func main() {
	wg := sync.WaitGroup{}

	numberOfLoops := 10
	wg.Add(numberOfLoops)

	for i := 0; i < numberOfLoops; i++ {
		go incrementor(&wg)
	}

	wg.Wait()
	fmt.Println("最终计数器:", counter.counter)
}

func incrementor(wg *sync.WaitGroup) {
	rand.Seed(time.Now().UnixNano())
	for i := 0; i < 20; i++ {
		counter.Inc()
		time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
	}
	wg.Done()
	counter.Cancel()
}

Playground链接

英文:

I have the following counter implemented in Go that I would like to use in a concurrently. I'm using the atomic package to store the state, but not sure if I could run into any race conditions. Do I need to add an additional mutex as well in order to protect against incrementing below zero for example or do the atomic operations provide enough safety? Thank you!

type Counter struct {
	counter  uint64
	finished uint32
	sync.Mutex
}

// Inc increments the counter by one
func (c *Counter) Inc() error {
	if c.isFinished() {
		return fmt.Errorf(&quot;counter is finished&quot;)
	}

	atomic.AddUint64(&amp;c.counter, 1)
	return nil
}

// Dec decrements the counter by one, but prevents the counter from going to zero
func (c *Counter) Dec() {
	// prevent overflow
	if !c.isZero() {
		atomic.AddUint64(&amp;c.counter, ^uint64(0))
	}
}

// Cancel sets the finished flag, and sets counter to zero
func (c *Counter) Cancel() {
	if !c.isFinished() {
		atomic.StoreUint32(&amp;c.finished, 1)
		atomic.StoreUint64(&amp;c.counter, 0)
	}
}

// isFinished returns true if finished
func (c *Counter) isFinished() bool {
	return atomic.LoadUint32(&amp;c.finished) == uint32(1)
}

// isZero returns true if counter is zero
func (c *Counter) isZero() bool {
	return atomic.LoadUint64(&amp;c.counter) == uint64(0)
}

Update: By running the code below with the -race flag, I was able to detect that I indeed need to include mutexes.

var counter *Counter = &amp;Counter{}

func main() {
	wg := sync.WaitGroup{}

	numberOfLoops := 10
	wg.Add(numberOfLoops)

	for i := 0; i &lt; numberOfLoops; i++ {
		go incrementor(&amp;wg)
	}

	wg.Wait()
	fmt.Println(&quot;Final Counter:&quot;, counter.counter)
}

func incrementor(wg *sync.WaitGroup) {
	rand.Seed(time.Now().UnixNano())
	for i := 0; i &lt; 20; i++ {
		counter.Inc()
		time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
	}
	wg.Done()
	counter.Cancel()
}

Playground link

答案1

得分: 4

  • 避免竞争,如“由竞争检测器检测到的”

正如@caveman所回答的,你遇到的问题与wg.Done() / wg.Wait()指令的顺序有关,以及你没有使用atomic.Load()来访问counter.counter的值。

就这种竞争条件而言,你的方法是“安全”的。

  • 避免竞争,如“不要使计数器处于不一致状态”

你确实有一个问题,(*)因为你的方法运行了几个连续的指令来检查和更新对象(例如:if condition { update_fields }),而且你没有同步机制来检查在应用update_fieldscondition是否仍然为真。

通过将你的incrementor函数更改为:

func incrementor(wg *sync.WaitGroup) {
	for i := 0; i < 20000; i++ {
		counter.Inc()
	}
	counter.Cancel()
	wg.Done()
}

并多次运行你的程序,你应该能够看到“Final Counter:”不总是以0结尾(playground)。

以下是这种情况的示例:

  • 假设goroutine 1执行counter.Cancel()
  • 同时goroutine 2执行counter.Inc()

可能发生以下执行序列:

   goroutine 1                                 goroutine 2

                                            1. if c.isFinished() {
                                                   return fmt.Errorf("counter is finished")
                                               }
2. if !c.isFinished() {                         
3.     atomic.StoreUint32(&c.finished, 1)       
4.     atomic.StoreUint64(&c.counter, 0)        
   }                                            
                                            5. atomic.AddUint64(&c.counter, 1)
                                            6. return nil
  • .Inc()中的c.isFinished()指令可能在.Cancel()执行之前发生,
  • atomic.AddUint64(&c.counter, 1)可能在.Cancel()将计数器重置为零之后发生。

为了避免这种竞争条件,你需要选择一种同步“检查+更新”指令的方式。

一种常见的方法是使用互斥锁:

type Counter struct {
	counter  uint64
	finished uint32
	m        sync.Mutex
}

// Inc将计数器增加1
func (c *Counter) Inc() error {
	c.m.Lock()
	defer c.m.Unlock()

	if c.finished != 0 {
		return fmt.Errorf("counter is finished")
	}

	c.counter++
	return nil
}

// Dec将计数器减少1,但防止计数器变为零
func (c *Counter) Dec() {
	c.m.Lock()
	defer c.m.Unlock()

	// 防止溢出
	if c.counter > 0 {
		c.counter--
	}
}

// Cancel设置finished标志,并将计数器设置为零
func (c *Counter) Cancel() {
	c.m.Lock()
	defer c.m.Unlock()

	if c.finished == 0 {
		c.finished = 1
		c.counter = 0
	}
}

playground


(*) [编辑] 我最初写道:“因为你的方法检查了对象的两个不同字段”,但即使只有一个字段,你也可能遇到类似的问题。

例如,如果两个goroutine运行以下代码:

if c.LoadUint64(&c.counter) == 0 {
   atomic.AddUint64(&c.counter, 1)
}

你可能会得到最终值为2。

问题在于有几个指令,并且这些指令不以原子方式执行。

英文:
  • to avoid races as in "detectable by the race detector" :

As @caveman answered, the issue you encountered is linked to an issue in wg.Done() / wg.Wait() instructions ordering, and the fact that you don't use an atomic.Load() to access the value of counter.counter.

Your methods are "safe" with regards to this kind of race condition.

  • to avoid races as in "don't reach an incoherent state for counter" :

you do have an issue, (*) because your methods run several successive instructions to inspect and update the object (e.g : if condition { update_fields }), and you have no synchronization mechanism to check that the condition is still true when you apply update_fields.

By changing your incrementor function to :

func incrementor(wg *sync.WaitGroup) {
	for i := 0; i &lt; 20000; i++ {
		counter.Inc()
	}
	counter.Cancel()
	wg.Done()
}

and running your program several times, you should be able to see that "Final Counter:" does not always end with 0 (playground).

Here is an illustration of how this can occur :

  • suppose goroutine 1 executes counter.Cancel()
  • while goroutine 2 executes counter.Inc()

the following sequence of execution may occur :

   goroutine 1                                 goroutine 2

                                            1. if c.isFinished() {
                                                   return fmt.Errorf(&quot;counter is finished&quot;)
                                               }
2. if !c.isFinished() {                         
3.     atomic.StoreUint32(&amp;c.finished, 1)       
4.     atomic.StoreUint64(&amp;c.counter, 0)        
   }                                            
                                            5. atomic.AddUint64(&amp;c.counter, 1)
                                            6. return nil
  • the c.isFinished() instruction in .Inc() may happen before .Cancel() gets executed,
  • and the atomic.AddUint64(&amp;c.counter, 1) may happen after .Cancel() has reset the counter to zero.

To avoid this kind of race, you need to choose a way to synchronize the inspect + update instructions.

One common way to do that is to use a mutex :

type Counter struct {
	counter  uint64
	finished uint32
	m        sync.Mutex
}

// Inc increments the counter by one
func (c *Counter) Inc() error {
	c.m.Lock()
	defer c.m.Unlock()

	if c.finished != 0 {
		return fmt.Errorf(&quot;counter is finished&quot;)
	}

	c.counter++
	return nil
}

// Dec decrements the counter by one, but prevents the counter from going to zero
func (c *Counter) Dec() {
	c.m.Lock()
	defer c.m.Unlock()

	// prevent overflow
	if c.counter &gt; 0 {
		c.counter--
	}
}

// Cancel sets the finished flag, and sets counter to zero
func (c *Counter) Cancel() {
	c.m.Lock()
	defer c.m.Unlock()

	if c.finished == 0 {
		c.finished = 1
		c.counter = 0
	}
}

playground


(*) [edit] I initially wrote : "because your methods inspect two distinct fields of your object", but you could have a similar issue even with one single field.

For example, if two goroutines run the following code :

if c.LoadUint64(&amp;c.counter) == 0 {
   atomic.AddUint64(&amp;c.counter, 1)
}

you may have an end value of 2.

The issue lies with having several instructions, which aren't executed in an atomic way.

答案2

得分: 2

你不需要额外的互斥锁,因为你的主函数在读取counter.counter时没有使用原子加载,而你的incrementor在调用wg.Done()之前调用了counter.Cancel(),因此会出现竞争条件。

通过将wg.Done()移动到counter.Cancel()之后,可以解决竞争条件:

func main() {
	wg := sync.WaitGroup{}

	numberOfLoops := 10
	wg.Add(numberOfLoops)

	for i := 0; i < numberOfLoops; i++ {
		go incrementor(&wg)
	}

	wg.Wait()
	fmt.Println("Final Counter:", counter.counter)
}

func incrementor(wg *sync.WaitGroup) {
	rand.Seed(time.Now().UnixNano())
	for i := 0; i < 20; i++ {
		counter.Inc()
		time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
	}
	counter.Cancel()
	wg.Done()
}
英文:

You don't need an additional mutex, your main function is reading counter.counter without using an atomic load, while your incrementor calls wg.Done() before counter.Cancel(), thus you get a race condition.

By moving wg.Done() after counter.Cancel() the race condition is resolved:

func main() {
	wg := sync.WaitGroup{}

	numberOfLoops := 10
	wg.Add(numberOfLoops)

	for i := 0; i &lt; numberOfLoops; i++ {
		go incrementor(&amp;wg)
	}

	wg.Wait()
	fmt.Println(&quot;Final Counter:&quot;, counter.counter)
}

func incrementor(wg *sync.WaitGroup) {
	rand.Seed(time.Now().UnixNano())
	for i := 0; i &lt; 20; i++ {
		counter.Inc()
		time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
	}
	counter.Cancel()
	wg.Done()
}

huangapple
  • 本文由 发表于 2022年1月4日 00:47:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/70568953.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定