同时进行时,如何管理值/状态并避免竞态条件。

huangapple go评论69阅读模式
英文:

concurrently, how to manage values/states and avoiding a race condition

问题

如何在进程启动后根据事件/条件正确设置/修改值,同时处理 Goroutines 而不创建竞争条件。

例如,以下代码“工作(有错误)”并输出如下:

ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true

代码如下:

package main

import "fmt"

type test struct {
    ch  chan string
    foo bool
}

func (t *test) run() {
    for {
        select {
        case v := <-t.ch:
            fmt.Printf("%+v, foo=%+v\n", v, t.foo)
            t.foo = false
        default:
        }
    }
}

func (t *test) Ping() {
    t.ch <- "ping"
}

func New() *test {
    t := &test{
        ch: make(chan string),
    }
    go t.run()
    return t
}

func main() {
    t := New()
    for i := 0; i <= 10; i++ {
        if t.foo {
            t.Ping()
        }
        if i%3 == 0 {
            t.foo = true
        }
    }
}

但是,如果使用 -race 选项进行编译或运行,将得到以下输出:

$ go run -race main.go
ping, foo=true
==================
WARNING: DATA RACE
Write at 0x00c4200761b8 by goroutine 6:
  main.(*test).run()
      /main.go:16 +0x1fb

Previous read at 0x00c4200761b8 by main goroutine:
  main.main()
      /main.go:37 +0x5e

Goroutine 6 (running) created at:
  main.New()
      /main.go:30 +0xd0
  main.main()
      /main.go:35 +0x33
==================
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
Found 1 data race(s)
exit status 66

因此,我想知道在不创建竞争条件的情况下,我可以使用什么并发模式来在 goroutine 内外更改 foo 的值。

英文:

How to properly set/modify a value based on events/conditions that happen after process has started, while dealing with Goroutines without creating a race condition.

For example, the following "works (is buggy)" and output is:

ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true

<kbd>https://play.golang.org/p/Y3FafF-nBc</kbd>

<!-- language: lang-golang -->

package main

import &quot;fmt&quot;

type test struct {
	ch  chan string
	foo bool
}

func (t *test) run() {
	for {
		select {
		case v := &lt;-t.ch:
			fmt.Printf(&quot;%+v, foo=%+v\n&quot;, v, t.foo)
			t.foo = false
		default:
		}
	}
}

func (t *test) Ping() {
	t.ch &lt;- &quot;ping&quot;
}

func New() *test {
	t := &amp;test{
		ch: make(chan string),
	}
	go t.run()
	return t
}

func main() {
	t := New()
	for i := 0; i &lt;= 10; i++ {
		if t.foo {
			t.Ping()
		}
		if i%3 == 0 {
			t.foo = true
		}
	}
}

But if compiled or run using the -race option, I get this output:

$ go run -race main.go
ping, foo=true
==================
WARNING: DATA RACE
Write at 0x00c4200761b8 by goroutine 6:
  main.(*test).run()
      /main.go:16 +0x1fb

Previous read at 0x00c4200761b8 by main goroutine:
  main.main()
      /main.go:37 +0x5e

Goroutine 6 (running) created at:
  main.New()
      /main.go:30 +0xd0
  main.main()
      /main.go:35 +0x33
==================
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
Found 1 data race(s)
exit status 66

Therefore, I would like to know what concurrency pattern could I use for being available to change the value of foo outside the gorutine and also inside the gorutine without creating a race condition.

答案1

得分: 6

你有几个选项:

  • 使用atomic.Value:示例(1)
  • 使用sync.RWMutex:示例(3)
  • 使用sync/atomic:示例(6)
  • 仅使用通道和goroutine:示例(7)

另请参阅:使用sync.Mutex还是通道?


1- 你可以使用atomic.Value

atomic.Value提供了一个原子加载和存储一致类型值的功能。可以将Value作为其他数据结构的一部分创建。Value的零值从Load返回nil。一旦调用了Store,就不能再复制Value

以下是一个工作示例:

// to test the panic use go build -race
package main

import (
	"fmt"
	"sync/atomic"
)

type test struct {
	ch chan string
	atomic.Value
}

func (t *test) run() {
	for {
		select {
		case v := <-t.ch:
			fmt.Printf("%+v, foo=%+v\n", v, t.Load())
			t.Store(false)
		default:
		}
	}
}

func (self *test) Ping() {
	self.ch <- "ping"
}

func New() *test {
	t := &test{
		ch: make(chan string),
	}
	t.Store(false)
	go t.run()
	return t
}

func main() {
	t := New()
	for i := 0; i <= 10; i++ {
		if x, _ := t.Load().(bool); x {
			t.Ping()
		}
		//	time.Sleep(time.Second)
		if i%3 == 0 {
			t.Store(true)
		}
	}
}

使用go build -race进行输出:

ping, foo=true
ping, foo=false
ping, foo=false
ping, foo=false
ping, foo=false

2- 对func (t *test) run()进行了一点改进:

func (t *test) run() {
	for v := range t.ch {
		fmt.Printf("%+v, foo=%+v\n", v, t.Load())
		t.Store(false)
	}
}

3- 你可以使用sync.RWMutexsync.WaitGroup,以下是一个工作示例:

// to test the panic use go build -race
package main

import (
	"fmt"
	"sync"
)

type test struct {
	ch  chan string
	foo bool
	sync.RWMutex
	sync.WaitGroup
}

func (t *test) run() {
	for v := range t.ch {
		t.Lock()
		r := t.foo
		t.foo = false
		t.Unlock()
		fmt.Printf("%+v, foo=%+v\n", v, r)

	}
	t.Done()
}

func (self *test) Ping() {
	self.ch <- "ping"
}

func New() *test {
	t := &test{ch: make(chan string)}
	t.Add(1)
	go t.run()
	return t
}

func main() {
	t := New()
	for i := 0; i <= 10; i++ {
		t.RLock()
		r := t.foo
		t.RUnlock()
		if r {
			t.Ping()
		}
		//  time.Sleep(time.Second)
		if i%3 == 0 {
			t.Lock()
			t.foo = true
			t.Unlock()
		}
	}
	close(t.ch)
	t.Wait()
}

使用go build -race进行输出:

ping, foo=true
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=false
ping, foo=true

4- 让我们按照这个方法进行改进:原始代码

package main

import (
	"fmt"
	"time"
)

type Server struct{ quit chan bool }

func NewServer() *Server {
	s := &Server{make(chan bool)}
	go s.run()
	return s
}

func (s *Server) run() {
	for {
		select {
		case <-s.quit:
			fmt.Println("finishing task")
			time.Sleep(time.Second)
			fmt.Println("task done")
			s.quit <- true
			return
		case <-time.After(time.Second):
			fmt.Println("running task")
		}
	}
}
func (s *Server) Stop() {
	fmt.Println("server stopping")
	s.quit <- true
	<-s.quit
	fmt.Println("server stopped")
}

func main() {
	s := NewServer()
	time.Sleep(2 * time.Second)
	s.Stop()
}

5- 让我们简化它:

package main

import (
	"fmt"
	"time"
)

var quit = make(chan bool)

func main() {
	go run()
	time.Sleep(2 * time.Second)
	fmt.Println("server stopping")

	quit <- true // 发送退出信号

	<-quit // 等待退出信号

	fmt.Println("server stopped")
}

func run() {
	for {
		select {
		case <-quit:
			fmt.Println("finishing task")
			time.Sleep(time.Second)
			fmt.Println("task done")
			quit <- true
			return
		case <-time.After(time.Second):
			fmt.Println("running task")
		}
	}
}

输出:

running task
running task
server stopping
finishing task
task done
server stopped

6- 简化版本的示例:

// to test the panic use go build -race
package main

import "fmt"
import "sync/atomic"

var ch = make(chan string)
var state int32

func main() {
	go run()
	for i := 0; i <= 10; i++ {
		if atomic.LoadInt32(&state) == 1 {
			ch <- "ping"
		}
		if i%3 == 0 {
			atomic.StoreInt32(&state, 1)
		}
	}
}

func run() {
	for v := range ch {
		fmt.Printf("%+v, state=%+v\n", v, atomic.LoadInt32(&state))
		atomic.StoreInt32(&state, 0)
	}
}

输出:

ping, state=1
ping, state=0
ping, state=1
ping, state=0
ping, state=1
ping, state=0

7- 使用通道的工作示例,而不使用Lock()The Go Playground):

// to test the panic use go build -race
package main

import "fmt"

func main() {
	go run()
	for i := 0; i <= 10; i++ {
		signal <- struct{}{}
		if <-read {
			ping <- "ping"
		}
		if i%3 == 0 {
			write <- true
		}
	}
}

func run() {
	foo := false
	for {
		select {
		case <-signal:
			fmt.Println("signal", foo)
			read <- foo
		case foo = <-write:
			fmt.Println("write", foo)
		case v := <-ping:
			fmt.Println(v, foo)
			foo = false
		}
	}
}

var (
	ping   = make(chan string)
	signal = make(chan struct{})
	read   = make(chan bool)
	write  = make(chan bool)
)

输出:

signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
英文:

You have some options:

  • Using atomic.Value : Sample (1)
  • Using sync.RWMutex : Sample (3)
  • Using sync/atomic : Sample (6)
  • Using only channels and goroutines : Sample (7)

Also see: Use a sync.Mutex or a channel?


1- You may use atomic.Value:

> A Value provides an atomic load and store of a consistently typed
> value. Values can be created as part of other data structures. The
> zero value for a Value returns nil from Load. Once Store has been
> called, a Value must not be copied.
>
> A Value must not be copied after first use.

Like this working sample:

<!-- language: lang-golang -->

// to test the panic use go build -race
package main

import (
	&quot;fmt&quot;
	&quot;sync/atomic&quot;
)

type test struct {
	ch chan string
	atomic.Value
}

func (t *test) run() {
	for {
		select {
		case v := &lt;-t.ch:
			fmt.Printf(&quot;%+v, foo=%+v\n&quot;, v, t.Load())
			t.Store(false)
		default:
		}
	}
}

func (self *test) Ping() {
	self.ch &lt;- &quot;ping&quot;
}

func New() *test {
	t := &amp;test{
		ch: make(chan string),
	}
	t.Store(false)
	go t.run()
	return t
}

func main() {
	t := New()
	for i := 0; i &lt;= 10; i++ {
		if x, _ := t.Load().(bool); x {
			t.Ping()
		}
		//	time.Sleep(time.Second)
		if i%3 == 0 {
			t.Store(true)
		}
	}
}

output with go build -race:

ping, foo=true
ping, foo=false
ping, foo=false
ping, foo=false
ping, foo=false

2- A little improvment to func (t *test) run():

<!-- language: lang-golang -->

func (t *test) run() {
	for v := range t.ch {
		fmt.Printf(&quot;%+v, foo=%+v\n&quot;, v, t.Load())
		t.Store(false)
	}
}

3- You may use sync.RWMutex and sync.WaitGroup, like this working sample:

<!-- language: lang-golang -->

// to test the panic use go build -race
package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

type test struct {
	ch  chan string
	foo bool
	sync.RWMutex
	sync.WaitGroup
}

func (t *test) run() {
	for v := range t.ch {
		t.Lock()
		r := t.foo
		t.foo = false
		t.Unlock()
		fmt.Printf(&quot;%+v, foo=%+v\n&quot;, v, r)

	}
	t.Done()
}

func (self *test) Ping() {
	self.ch &lt;- &quot;ping&quot;
}

func New() *test {
	t := &amp;test{ch: make(chan string)}
	t.Add(1)
	go t.run()
	return t
}

func main() {
	t := New()
	for i := 0; i &lt;= 10; i++ {
		t.RLock()
		r := t.foo
		t.RUnlock()
		if r {
			t.Ping()
		}
		//  time.Sleep(time.Second)
		if i%3 == 0 {
			t.Lock()
			t.foo = true
			t.Unlock()
		}
	}
	close(t.ch)
	t.Wait()
}

output with go build -race:

ping, foo=true
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=false
ping, foo=true

4- So let's follow this approach https://talks.golang.org/2013/bestpractices.slide#29:
Original Code:

<!-- language: lang-golang -->

package main

import (
	&quot;fmt&quot;
	&quot;time&quot;
)

type Server struct{ quit chan bool }

func NewServer() *Server {
	s := &amp;Server{make(chan bool)}
	go s.run()
	return s
}

func (s *Server) run() {
	for {
		select {
		case &lt;-s.quit:
			fmt.Println(&quot;finishing task&quot;)
			time.Sleep(time.Second)
			fmt.Println(&quot;task done&quot;)
			s.quit &lt;- true
			return
		case &lt;-time.After(time.Second):
			fmt.Println(&quot;running task&quot;)
		}
	}
}
func (s *Server) Stop() {
	fmt.Println(&quot;server stopping&quot;)
	s.quit &lt;- true
	&lt;-s.quit
	fmt.Println(&quot;server stopped&quot;)
}

func main() {
	s := NewServer()
	time.Sleep(2 * time.Second)
	s.Stop()
}

5- Let's simplify it:

<!-- language: lang-golang -->

package main

import (
	&quot;fmt&quot;
	&quot;time&quot;
)

var quit = make(chan bool)

func main() {
	go run()
	time.Sleep(2 * time.Second)
	fmt.Println(&quot;server stopping&quot;)

	quit &lt;- true // signal to quit

	&lt;-quit // wait for quit signal

	fmt.Println(&quot;server stopped&quot;)
}

func run() {
	for {
		select {
		case &lt;-quit:
			fmt.Println(&quot;finishing task&quot;)
			time.Sleep(time.Second)
			fmt.Println(&quot;task done&quot;)
			quit &lt;- true
			return
		case &lt;-time.After(time.Second):
			fmt.Println(&quot;running task&quot;)
		}
	}
}

output:

running task
running task
server stopping
finishing task
task done
server stopped

6- Simplified version of your sample:

<!-- language: lang-golang -->

// to test the panic use go build -race
package main

import &quot;fmt&quot;
import &quot;sync/atomic&quot;

var ch = make(chan string)
var state int32

func main() {
	go run()
	for i := 0; i &lt;= 10; i++ {
		if atomic.LoadInt32(&amp;state) == 1 {
			ch &lt;- &quot;ping&quot;
		}
		if i%3 == 0 {
			atomic.StoreInt32(&amp;state, 1)
		}
	}
}

func run() {
	for v := range ch {
		fmt.Printf(&quot;%+v, state=%+v\n&quot;, v, atomic.LoadInt32(&amp;state))
		atomic.StoreInt32(&amp;state, 0)
	}
}

output:

ping, state=1
ping, state=0
ping, state=1
ping, state=0
ping, state=1
ping, state=0

7- working sample with channels and without using Lock() (The Go Playground):

<!-- language: lang-golang -->

// to test the panic use go build -race
package main

import &quot;fmt&quot;

func main() {
	go run()
	for i := 0; i &lt;= 10; i++ {
		signal &lt;- struct{}{}
		if &lt;-read {
			ping &lt;- &quot;ping&quot;
		}
		if i%3 == 0 {
			write &lt;- true
		}
	}
}

func run() {
	foo := false
	for {
		select {
		case &lt;-signal:
			fmt.Println(&quot;signal&quot;, foo)
			read &lt;- foo
		case foo = &lt;-write:
			fmt.Println(&quot;write&quot;, foo)
		case v := &lt;-ping:
			fmt.Println(v, foo)
			foo = false
		}
	}
}

var (
	ping   = make(chan string)
	signal = make(chan struct{})
	read   = make(chan bool)
	write  = make(chan bool)
)

output:

signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true

答案2

得分: 1

使用互斥锁

package main

import (
    "sync"
    "time"
    "fmt"
)

var myvar int
var mut sync.Mutex

func main() {
    for {
        go other()
        go printer()
        time.Sleep(time.Duration(1) * time.Second)
    }
}

func other() {
    mut.Lock()
    myvar = myvar +1
    mut.Unlock()
}

func printer() {
    mut.Lock()
    fmt.Println(myvar)
    mut.Unlock()
}

运行(使用互斥锁)

$ go build -race t1.go 
$ ./t1 
1
2
3
4
5
6
7
7
9
10

运行(不使用互斥锁)

$ go build t2.go 
$ go build -race t2.go 
$ ./t2 
==================
WARNING: DATA RACE
Read at 0x000000580ce8 by goroutine 7:
  runtime.convT2E()
      /usr/local/go/src/runtime/iface.go:155 +0x0
  main.printer()
      /.../.../.../GOPATH/t2.go:23 +0x65

Previous write at 0x000000580ce8 by goroutine 6:
  main.other()
      /.../.../.../GOPATH/t2.go:19 +0x3d

Goroutine 7 (running) created at:
  main.main()
      /.../.../.../GOPATH/t2.go:13 +0x5a

Goroutine 6 (finished) created at:
  main.main()
      /.../.../.../GOPATH/t2.go:12 +0x42
==================
1
2
英文:

Use Mutexes

package main

import (
    &quot;sync&quot;
    &quot;time&quot;
    &quot;fmt&quot;
)

var myvar int
var mut sync.Mutex

func main() {
    for {
        go other()
        go printer()
        time.Sleep(time.Duration(1) * time.Second)
    }
}

func other() {
    mut.Lock()
    myvar = myvar +1
    mut.Unlock()
}

func printer() {
    mut.Lock()
    fmt.Println(myvar)
    mut.Unlock()
}

Run (with mutexes)

$ go build -race t1.go 
$ ./t1 
1
2
3
4
5
6
7
7
9
10

Run (without mutexes)

$ go build t2.go 
$ go build -race t2.go 
$ ./t2 
==================
WARNING: DATA RACE
Read at 0x000000580ce8 by goroutine 7:
  runtime.convT2E()
      /usr/local/go/src/runtime/iface.go:155 +0x0
  main.printer()
      /.../.../.../GOPATH/t2.go:23 +0x65

Previous write at 0x000000580ce8 by goroutine 6:
  main.other()
      /.../.../.../GOPATH/t2.go:19 +0x3d

Goroutine 7 (running) created at:
  main.main()
      /.../.../.../GOPATH/t2.go:13 +0x5a

Goroutine 6 (finished) created at:
  main.main()
      /.../.../.../GOPATH/t2.go:12 +0x42
==================
1
2

huangapple
  • 本文由 发表于 2016年8月24日 20:33:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/39123453.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定