英文:
concurrently, how to manage values/states and avoiding a race condition
问题
如何在进程启动后根据事件/条件正确设置/修改值,同时处理 Goroutines 而不创建竞争条件。
例如,以下代码“工作(有错误)”并输出如下:
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
代码如下:
package main
import "fmt"
type test struct {
ch chan string
foo bool
}
func (t *test) run() {
for {
select {
case v := <-t.ch:
fmt.Printf("%+v, foo=%+v\n", v, t.foo)
t.foo = false
default:
}
}
}
func (t *test) Ping() {
t.ch <- "ping"
}
func New() *test {
t := &test{
ch: make(chan string),
}
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
if t.foo {
t.Ping()
}
if i%3 == 0 {
t.foo = true
}
}
}
但是,如果使用 -race
选项进行编译或运行,将得到以下输出:
$ go run -race main.go
ping, foo=true
==================
WARNING: DATA RACE
Write at 0x00c4200761b8 by goroutine 6:
main.(*test).run()
/main.go:16 +0x1fb
Previous read at 0x00c4200761b8 by main goroutine:
main.main()
/main.go:37 +0x5e
Goroutine 6 (running) created at:
main.New()
/main.go:30 +0xd0
main.main()
/main.go:35 +0x33
==================
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
Found 1 data race(s)
exit status 66
因此,我想知道在不创建竞争条件的情况下,我可以使用什么并发模式来在 goroutine 内外更改 foo
的值。
英文:
How to properly set/modify a value based on events/conditions that happen after process has started, while dealing with Goroutines without creating a race condition.
For example, the following "works (is buggy)" and output is:
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
<kbd>https://play.golang.org/p/Y3FafF-nBc</kbd>
<!-- language: lang-golang -->
package main
import "fmt"
type test struct {
ch chan string
foo bool
}
func (t *test) run() {
for {
select {
case v := <-t.ch:
fmt.Printf("%+v, foo=%+v\n", v, t.foo)
t.foo = false
default:
}
}
}
func (t *test) Ping() {
t.ch <- "ping"
}
func New() *test {
t := &test{
ch: make(chan string),
}
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
if t.foo {
t.Ping()
}
if i%3 == 0 {
t.foo = true
}
}
}
But if compiled or run using the -race option, I get this output:
$ go run -race main.go
ping, foo=true
==================
WARNING: DATA RACE
Write at 0x00c4200761b8 by goroutine 6:
main.(*test).run()
/main.go:16 +0x1fb
Previous read at 0x00c4200761b8 by main goroutine:
main.main()
/main.go:37 +0x5e
Goroutine 6 (running) created at:
main.New()
/main.go:30 +0xd0
main.main()
/main.go:35 +0x33
==================
ping, foo=false
ping, foo=true
ping, foo=true
ping, foo=true
Found 1 data race(s)
exit status 66
Therefore, I would like to know what concurrency pattern could I use for being available to change the value of foo
outside the gorutine and also inside the gorutine without creating a race condition.
答案1
得分: 6
你有几个选项:
- 使用
atomic.Value
:示例(1) - 使用
sync.RWMutex
:示例(3) - 使用
sync/atomic
:示例(6) - 仅使用通道和goroutine:示例(7)
另请参阅:使用sync.Mutex还是通道?
1- 你可以使用atomic.Value
:
atomic.Value
提供了一个原子加载和存储一致类型值的功能。可以将Value
作为其他数据结构的一部分创建。Value
的零值从Load
返回nil
。一旦调用了Store
,就不能再复制Value
。
以下是一个工作示例:
// to test the panic use go build -race
package main
import (
"fmt"
"sync/atomic"
)
type test struct {
ch chan string
atomic.Value
}
func (t *test) run() {
for {
select {
case v := <-t.ch:
fmt.Printf("%+v, foo=%+v\n", v, t.Load())
t.Store(false)
default:
}
}
}
func (self *test) Ping() {
self.ch <- "ping"
}
func New() *test {
t := &test{
ch: make(chan string),
}
t.Store(false)
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
if x, _ := t.Load().(bool); x {
t.Ping()
}
// time.Sleep(time.Second)
if i%3 == 0 {
t.Store(true)
}
}
}
使用go build -race
进行输出:
ping, foo=true
ping, foo=false
ping, foo=false
ping, foo=false
ping, foo=false
2- 对func (t *test) run()
进行了一点改进:
func (t *test) run() {
for v := range t.ch {
fmt.Printf("%+v, foo=%+v\n", v, t.Load())
t.Store(false)
}
}
3- 你可以使用sync.RWMutex
和sync.WaitGroup
,以下是一个工作示例:
// to test the panic use go build -race
package main
import (
"fmt"
"sync"
)
type test struct {
ch chan string
foo bool
sync.RWMutex
sync.WaitGroup
}
func (t *test) run() {
for v := range t.ch {
t.Lock()
r := t.foo
t.foo = false
t.Unlock()
fmt.Printf("%+v, foo=%+v\n", v, r)
}
t.Done()
}
func (self *test) Ping() {
self.ch <- "ping"
}
func New() *test {
t := &test{ch: make(chan string)}
t.Add(1)
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
t.RLock()
r := t.foo
t.RUnlock()
if r {
t.Ping()
}
// time.Sleep(time.Second)
if i%3 == 0 {
t.Lock()
t.foo = true
t.Unlock()
}
}
close(t.ch)
t.Wait()
}
使用go build -race
进行输出:
ping, foo=true
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=false
ping, foo=true
4- 让我们按照这个方法进行改进:原始代码:
package main
import (
"fmt"
"time"
)
type Server struct{ quit chan bool }
func NewServer() *Server {
s := &Server{make(chan bool)}
go s.run()
return s
}
func (s *Server) run() {
for {
select {
case <-s.quit:
fmt.Println("finishing task")
time.Sleep(time.Second)
fmt.Println("task done")
s.quit <- true
return
case <-time.After(time.Second):
fmt.Println("running task")
}
}
}
func (s *Server) Stop() {
fmt.Println("server stopping")
s.quit <- true
<-s.quit
fmt.Println("server stopped")
}
func main() {
s := NewServer()
time.Sleep(2 * time.Second)
s.Stop()
}
5- 让我们简化它:
package main
import (
"fmt"
"time"
)
var quit = make(chan bool)
func main() {
go run()
time.Sleep(2 * time.Second)
fmt.Println("server stopping")
quit <- true // 发送退出信号
<-quit // 等待退出信号
fmt.Println("server stopped")
}
func run() {
for {
select {
case <-quit:
fmt.Println("finishing task")
time.Sleep(time.Second)
fmt.Println("task done")
quit <- true
return
case <-time.After(time.Second):
fmt.Println("running task")
}
}
}
输出:
running task
running task
server stopping
finishing task
task done
server stopped
6- 简化版本的示例:
// to test the panic use go build -race
package main
import "fmt"
import "sync/atomic"
var ch = make(chan string)
var state int32
func main() {
go run()
for i := 0; i <= 10; i++ {
if atomic.LoadInt32(&state) == 1 {
ch <- "ping"
}
if i%3 == 0 {
atomic.StoreInt32(&state, 1)
}
}
}
func run() {
for v := range ch {
fmt.Printf("%+v, state=%+v\n", v, atomic.LoadInt32(&state))
atomic.StoreInt32(&state, 0)
}
}
输出:
ping, state=1
ping, state=0
ping, state=1
ping, state=0
ping, state=1
ping, state=0
7- 使用通道的工作示例,而不使用Lock()
(The Go Playground):
// to test the panic use go build -race
package main
import "fmt"
func main() {
go run()
for i := 0; i <= 10; i++ {
signal <- struct{}{}
if <-read {
ping <- "ping"
}
if i%3 == 0 {
write <- true
}
}
}
func run() {
foo := false
for {
select {
case <-signal:
fmt.Println("signal", foo)
read <- foo
case foo = <-write:
fmt.Println("write", foo)
case v := <-ping:
fmt.Println(v, foo)
foo = false
}
}
}
var (
ping = make(chan string)
signal = make(chan struct{})
read = make(chan bool)
write = make(chan bool)
)
输出:
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
英文:
You have some options:
- Using
atomic.Value
: Sample (1) - Using
sync.RWMutex
: Sample (3) - Using
sync/atomic
: Sample (6) - Using only channels and goroutines : Sample (7)
Also see: Use a sync.Mutex or a channel?
1- You may use atomic.Value
:
> A Value provides an atomic load and store of a consistently typed
> value. Values can be created as part of other data structures. The
> zero value for a Value returns nil from Load. Once Store has been
> called, a Value must not be copied.
>
> A Value must not be copied after first use.
Like this working sample:
<!-- language: lang-golang -->
// to test the panic use go build -race
package main
import (
"fmt"
"sync/atomic"
)
type test struct {
ch chan string
atomic.Value
}
func (t *test) run() {
for {
select {
case v := <-t.ch:
fmt.Printf("%+v, foo=%+v\n", v, t.Load())
t.Store(false)
default:
}
}
}
func (self *test) Ping() {
self.ch <- "ping"
}
func New() *test {
t := &test{
ch: make(chan string),
}
t.Store(false)
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
if x, _ := t.Load().(bool); x {
t.Ping()
}
// time.Sleep(time.Second)
if i%3 == 0 {
t.Store(true)
}
}
}
output with go build -race
:
ping, foo=true
ping, foo=false
ping, foo=false
ping, foo=false
ping, foo=false
2- A little improvment to func (t *test) run()
:
<!-- language: lang-golang -->
func (t *test) run() {
for v := range t.ch {
fmt.Printf("%+v, foo=%+v\n", v, t.Load())
t.Store(false)
}
}
3- You may use sync.RWMutex
and sync.WaitGroup
, like this working sample:
<!-- language: lang-golang -->
// to test the panic use go build -race
package main
import (
"fmt"
"sync"
)
type test struct {
ch chan string
foo bool
sync.RWMutex
sync.WaitGroup
}
func (t *test) run() {
for v := range t.ch {
t.Lock()
r := t.foo
t.foo = false
t.Unlock()
fmt.Printf("%+v, foo=%+v\n", v, r)
}
t.Done()
}
func (self *test) Ping() {
self.ch <- "ping"
}
func New() *test {
t := &test{ch: make(chan string)}
t.Add(1)
go t.run()
return t
}
func main() {
t := New()
for i := 0; i <= 10; i++ {
t.RLock()
r := t.foo
t.RUnlock()
if r {
t.Ping()
}
// time.Sleep(time.Second)
if i%3 == 0 {
t.Lock()
t.foo = true
t.Unlock()
}
}
close(t.ch)
t.Wait()
}
output with go build -race
:
ping, foo=true
ping, foo=true
ping, foo=false
ping, foo=true
ping, foo=false
ping, foo=true
4- So let's follow this approach https://talks.golang.org/2013/bestpractices.slide#29:
Original Code:
<!-- language: lang-golang -->
package main
import (
"fmt"
"time"
)
type Server struct{ quit chan bool }
func NewServer() *Server {
s := &Server{make(chan bool)}
go s.run()
return s
}
func (s *Server) run() {
for {
select {
case <-s.quit:
fmt.Println("finishing task")
time.Sleep(time.Second)
fmt.Println("task done")
s.quit <- true
return
case <-time.After(time.Second):
fmt.Println("running task")
}
}
}
func (s *Server) Stop() {
fmt.Println("server stopping")
s.quit <- true
<-s.quit
fmt.Println("server stopped")
}
func main() {
s := NewServer()
time.Sleep(2 * time.Second)
s.Stop()
}
5- Let's simplify it:
<!-- language: lang-golang -->
package main
import (
"fmt"
"time"
)
var quit = make(chan bool)
func main() {
go run()
time.Sleep(2 * time.Second)
fmt.Println("server stopping")
quit <- true // signal to quit
<-quit // wait for quit signal
fmt.Println("server stopped")
}
func run() {
for {
select {
case <-quit:
fmt.Println("finishing task")
time.Sleep(time.Second)
fmt.Println("task done")
quit <- true
return
case <-time.After(time.Second):
fmt.Println("running task")
}
}
}
output:
running task
running task
server stopping
finishing task
task done
server stopped
6- Simplified version of your sample:
<!-- language: lang-golang -->
// to test the panic use go build -race
package main
import "fmt"
import "sync/atomic"
var ch = make(chan string)
var state int32
func main() {
go run()
for i := 0; i <= 10; i++ {
if atomic.LoadInt32(&state) == 1 {
ch <- "ping"
}
if i%3 == 0 {
atomic.StoreInt32(&state, 1)
}
}
}
func run() {
for v := range ch {
fmt.Printf("%+v, state=%+v\n", v, atomic.LoadInt32(&state))
atomic.StoreInt32(&state, 0)
}
}
output:
ping, state=1
ping, state=0
ping, state=1
ping, state=0
ping, state=1
ping, state=0
7- working sample with channels and without using Lock()
(The Go Playground):
<!-- language: lang-golang -->
// to test the panic use go build -race
package main
import "fmt"
func main() {
go run()
for i := 0; i <= 10; i++ {
signal <- struct{}{}
if <-read {
ping <- "ping"
}
if i%3 == 0 {
write <- true
}
}
}
func run() {
foo := false
for {
select {
case <-signal:
fmt.Println("signal", foo)
read <- foo
case foo = <-write:
fmt.Println("write", foo)
case v := <-ping:
fmt.Println(v, foo)
foo = false
}
}
}
var (
ping = make(chan string)
signal = make(chan struct{})
read = make(chan bool)
write = make(chan bool)
)
output:
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
signal false
signal false
write true
signal true
ping true
答案2
得分: 1
使用互斥锁
package main
import (
"sync"
"time"
"fmt"
)
var myvar int
var mut sync.Mutex
func main() {
for {
go other()
go printer()
time.Sleep(time.Duration(1) * time.Second)
}
}
func other() {
mut.Lock()
myvar = myvar +1
mut.Unlock()
}
func printer() {
mut.Lock()
fmt.Println(myvar)
mut.Unlock()
}
运行(使用互斥锁)
$ go build -race t1.go
$ ./t1
1
2
3
4
5
6
7
7
9
10
运行(不使用互斥锁)
$ go build t2.go
$ go build -race t2.go
$ ./t2
==================
WARNING: DATA RACE
Read at 0x000000580ce8 by goroutine 7:
runtime.convT2E()
/usr/local/go/src/runtime/iface.go:155 +0x0
main.printer()
/.../.../.../GOPATH/t2.go:23 +0x65
Previous write at 0x000000580ce8 by goroutine 6:
main.other()
/.../.../.../GOPATH/t2.go:19 +0x3d
Goroutine 7 (running) created at:
main.main()
/.../.../.../GOPATH/t2.go:13 +0x5a
Goroutine 6 (finished) created at:
main.main()
/.../.../.../GOPATH/t2.go:12 +0x42
==================
1
2
英文:
Use Mutexes
package main
import (
"sync"
"time"
"fmt"
)
var myvar int
var mut sync.Mutex
func main() {
for {
go other()
go printer()
time.Sleep(time.Duration(1) * time.Second)
}
}
func other() {
mut.Lock()
myvar = myvar +1
mut.Unlock()
}
func printer() {
mut.Lock()
fmt.Println(myvar)
mut.Unlock()
}
Run (with mutexes)
$ go build -race t1.go
$ ./t1
1
2
3
4
5
6
7
7
9
10
Run (without mutexes)
$ go build t2.go
$ go build -race t2.go
$ ./t2
==================
WARNING: DATA RACE
Read at 0x000000580ce8 by goroutine 7:
runtime.convT2E()
/usr/local/go/src/runtime/iface.go:155 +0x0
main.printer()
/.../.../.../GOPATH/t2.go:23 +0x65
Previous write at 0x000000580ce8 by goroutine 6:
main.other()
/.../.../.../GOPATH/t2.go:19 +0x3d
Goroutine 7 (running) created at:
main.main()
/.../.../.../GOPATH/t2.go:13 +0x5a
Goroutine 6 (finished) created at:
main.main()
/.../.../.../GOPATH/t2.go:12 +0x42
==================
1
2
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论