英文:
Golang - race condition using go-routine
问题
我尝试在我的程序中使用race标志,并发现了一个问题
以下是函数的代码:
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
函数的调用如下:
g.Start(func() (bool, error) {
return install(vins, crObjectKey, releasePrefix, kFilePath, objectCli, dependenciesBroadcastingSchema, compStatus)
})
g.Start(func() (bool, error) {
return false, uninstall(currentRelease, kFilePath, updateChartStatus)
})
堆栈跟踪如下:
WARNING: DATA RACE
Read at 0x00c0001614a8 by goroutine 82:
github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()
/Users/github.vs.sar/agm/coperator/components/tools/waitgroup.go:27 +0x84
k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1()
/Users/i88893/go/pkg/mod/k8s.io/apimachinery@v0.22.4/pkg/util/wait/wait.go:73 +0x6d
Start
函数是这样的:(我的代码) github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
堆栈跟踪中的第二个是这样的 (不是我的代码)
go/pkg/mod/k8s.io/apimachinery@v0.22.4/pkg/util/wait/wait.go:73 +0x6d
// Start在组中的新goroutine中启动f。
func (g *Group) Start(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
我猜测(不是go的专家),这与从多个goroutine并发使用g.err
和写入g.requeue
有关,这是不允许的。有什么办法可以解决这个问题吗?
也许我需要使用https://pkg.go.dev/sync#RWMutex
但是不确定如何使用...
英文:
I tried to use the race flag to my program and issue found
The func is the following
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
The function is called like following
g.Start(func() (bool, error) {
return install(vins, crObjectKey, releasePrefix, kFilePath, objectCli, dependenciesBroadcastingSchema, compStatus)
})
g.Start(func() (bool, error) {
return false, uninstall(currentRelease, kFilePath, updateChartStatus)
})
The stack trace look like following
WARNING: DATA RACE
Read at 0x00c0001614a8 by goroutine 82:
github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()
/Users/github.vs.sar/agm/coperator/components/tools/waitgroup.go:27 +0x84
k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1()
/Users/i88893/go/pkg/mod/k8s.io/apimachinery@v0.22.4/pkg/util/wait/wait.go:73 +0x6d
The start function is this : (my code ) github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
The second in the stack trace is this (not my code)
go/pkg/mod/k8s.io/apimachinery@v0.22.4/pkg/util/wait/wait.go:73 +0x6d
// Start starts f in a new goroutine in the group.
func (g *Group) Start(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
I guess (not an expert in go) that this is related to the usage of g.err
from multiple goroutines
concurrently which isn’t allowed. Same for writing g.requeue
Any idea how to solve this?
Maybe I need to use https://pkg.go.dev/sync#RWMutex
But not sure how...
UPDATE
I took @Danil suggestion (change the lock position) and change it like following
added mutex in the struct and add lock in the function, does it make sense ? Now when I run with race flag everything seems to be OK
type WaitingErrorGroup struct {
g *wait.Group
mu sync.Mutex
err error
requeue bool
}
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
g.mu.Lock()
defer g.mu.Unlock()
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
答案1
得分: 2
问题出现的原因是你尝试从不同的goroutine中操作未同步的共享内存(在你的情况下是g.err
)。
在Go语言中,有两种处理并发代码的方法:
- 使用同步原语来共享内存(例如,sync.Mutex)
- 使用通信来同步(例如,通道)
在你的代码中,似乎你遵循了“使用同步原语来共享内存”的方法,为了解决错误,你需要同步访问g.err
。
你可以使用sync.Mutex和sync.RWMutex。
在你的情况下,你可以这样写:
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
// 在读取和写入g.err之前加锁,在之后解锁
g.mu.Lock()
defer g.mu.Unlock()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
然而,根据Rob Pike的说法,更符合Go语言习惯的方式是“不通过共享内存来通信,而是通过通信来共享内存。”(这意味着使用通道而不是互斥锁)。这已经在评论中被@TheFool和@kostix提到过。
但是从问题中并不清楚你是否有可能重新设计你的代码以遵循这种习惯用法。
英文:
The problem appears because you trying to manipulate not synchronized shared memory (g.err
in your case) from different goroutines.
You have two different approaches to handle concurrent code in go:
- Synchronization primitives for sharing memory (e.g., sync.Mutex)
- Synchronization via communicating (e.g., channels)
It seems that in your code you follow Synchronization primitives for sharing memory
and to resolve the error you need to synchronize access to g.err
.
You can use sync.Mutex and sync.RWMutex
In your case you will have:
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
// Lock before reading and writing g.err and unlock after
g.mu.Lock()
defer g.mu.Unlock()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
However, according to Robe Pike more idiomatic way for Go is Don't communicate by sharing memory; share memory by communicating.
(it means use channels, not mutexes). It is already mentioned by @TheFool and @kostix
in the comments.
But it's not clear for me from the question do you have a possibility to redesign your code to follow this idiom.
答案2
得分: 1
你可以使用通道来传递和处理发生的错误。
例如,像这样的代码:
func handleErrors(c chan error) *sync.WaitGroup {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for err := range c {
fmt.Println(err)
}
}()
return &wg
}
func main() {
c := make(chan error, 2)
wg := sync.WaitGroup{}
defer handleErrors(c).Wait()
defer close(c)
defer wg.Wait()
wg.Add(2)
go func() {
defer wg.Done()
c <- errors.New("error 1")
}()
go func() {
defer wg.Done()
c <- errors.New("error 2")
}()
}
我认为在Go语言中使用通道比其他同步原语(如锁)更符合惯用方式。锁更难以正确使用,并且可能会带来性能损耗。
如果一个goroutine持有锁,其他goroutine必须等待锁释放。因此,在并发执行中引入了一个瓶颈。在上面的示例中,通过对通道进行缓冲解决了这个问题。即使没有任何goroutine读取消息,两个goroutine仍然能够传递它们的消息而不被阻塞。
此外,当使用锁时,可能会出现锁永远不会释放的情况,例如,如果程序员忘记添加相关行导致死锁。尽管在不关闭通道时也可能发生类似的问题。
英文:
You could use a channel to communicate the occurring errors and handle them.
For example, something like this.
func handleErrors(c chan error) *sync.WaitGroup {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for err := range c {
fmt.Println(err)
}
}()
return &wg
}
func main() {
c := make(chan error, 2)
wg := sync.WaitGroup{}
defer handleErrors(c).Wait()
defer close(c)
defer wg.Wait()
wg.Add(2)
go func() {
defer wg.Done()
c <- errors.New("error 1")
}()
go func() {
defer wg.Done()
c <- errors.New("error 2")
}()
}
I think using channels is more idiomatic in go than other sync primitives like locks. Locks are harder to get right, and they can come with performance cost.
If one go routine has the lock, the other goroutines have to wait until the lock is released. So you are introducing a bottleneck in your concurrent execution. In the above example, this is solved by buffering the channel. Even if nothing has read the message yet, still both goroutines are able to pass their message in without being blocked.
Additionally, it can happen that when using a lock, the lock is never released, for example, if the programmer forgot to add the relevant line, leading to a deadlock situation. Although similar bad things can happen when channels are not closed.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论