英文:
X number of goroutines to update the same variable
问题
我想要创建X个goroutine来使用并行性更新CountValue
(goroutine的数量应与CPU核心数量相同)。
解决方案1:
func count(numRoutines int) (countValue int) {
var mu sync.Mutex
k := func(i int) {
mu.Lock()
defer mu.Unlock()
countValue += 5
}
for i := 0; i < numRoutines; i++ {
go k(i)
}
这会导致数据竞争,返回的countValue
为0。
解决方案2:
func count(numRoutines int) (countValue int) {
k := func(i int, c chan int) {
c <- 5
}
c := make(chan int)
for i := 0; i < numRoutines; i++ {
go k(i, c)
}
for i := 0; i < numRoutines; i++ {
countValue += <- c
}
return
}
我对其进行了基准测试,发现顺序相加比使用goroutine更快。我认为这是因为这里有两个for循环,当我将countValue += <- c
放在第一个for循环内时,代码运行得更快。
解决方案3:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i++ {
wg.Add(1)
go k(i)
}
go func() {
for i := range c {
countValue += i
}
}()
wg.Wait()
return
}
仍然存在数据竞争:/
有没有更好的方法来解决这个问题?
英文:
I want to make X number of goroutines to update CountValue
using parallelism (numRoutines are as much as how many CPU you have).
Solution 1:
func count(numRoutines int) (countValue int) {
var mu sync.Mutex
k := func(i int) {
mu.Lock()
defer mu.Unlock()
countValue += 5
}
for i := 0; i < numRoutines; i++ {
go k(i)
}
It becomes a data race and the returned countValue = 0
.
Solution 2:
func count(numRoutines int) (countValue int) {
k := func(i int, c chan int) {
c <- 5
}
c := make(chan int)
for i := 0; i < numRoutines; i++ {
go k(i, c)
}
for i := 0; i < numRoutines; i++ {
countValue += <- c
}
return
}
I did a benchmark test on it and doing a sequential addition would work faster than using goroutines. I think it's because I have two for loops here as when I put countValue += <- c
inside the first for loop, the code runs faster.
Solution 3:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i++ {
wg.Add(1)
go k(i)
}
go func() {
for i := range c {
countValue += i
}
}()
wg.Wait()
return
}
Still a race count :/
Is there any way better to do this?
答案1
得分: 7
有一种更好的方法可以安全地增加一个变量:使用sync/atomic
:
import "sync/atomic"
var words int64
k := func() {
_ = atomic.AddInt64(&words, 5) // 原子增加
}
使用通道基本上消除了对互斥锁的需求,或者消除了对变量本身的并发访问的风险,这里的等待组只是有点过度使用。
使用通道的方法:
words := 0
done := make(chan struct{}) // 或者使用 context
ch := make(chan int, numRoutines) // 缓冲区,以便每个例程都可以写入
go func() {
read := 0
for i := range ch {
words += 5 // 或者使用 i 或其他值
read++
if read == numRoutines {
break // 我们已经从所有例程接收到数据
}
}
close(done) // 表示此例程已终止
}()
for i := 0; i < numRoutines; i++ {
ch <- i // 在通道上写入需要在计数例程中使用的任何值
}
<-done // 等待增加 words 的例程返回
close(ch) // 不再需要该通道
fmt.Printf("Counted %d\n", words)
可以看出,numRoutines
不再是例程的数量,而是通道上的写入次数。你仍然可以将其移动到各个例程中:
for i := 0; i < numRoutines; i++ {
go func(ch chan<- int, i int) {
// 在这里执行操作
ch <- 5 * i // 例如
}(ch, i)
}
使用等待组的方法:
不使用可取消的上下文或通道,可以使用等待组 + 原子操作来获得相同的结果。我认为最简单的方法是创建一个类型:
type counter struct {
words int64
}
func (c *counter) doStuff(wg *sync.WaitGroup, i int) {
defer wg.Done()
_ = atomic.AddInt64(&c.words, int64(i*5)) // 需要添加的任何值
}
func main() {
cnt := counter{}
wg := sync.WaitGroup{}
wg.Add(numRoutines) // 创建等待组
for i := 0; i < numRoutines; i++ {
go cnt.doStuff(&wg, i)
}
wg.Wait() // 等待所有例程完成
fmt.Println("Counted %d\n", cnt.words)
}
修复你的第三种解决方案:
正如我在评论中提到的,你的第三种解决方案仍然会导致竞态条件,因为通道 c
从未关闭,这意味着例程:
go func() {
for i := range c {
countValue += i
}
}()
永远不会返回。等待组也只能确保你已经将所有值发送到通道,但不能确保 countValue
已经增加到最终值。修复的方法是在 wg.Wait()
返回后关闭通道,然后添加一个 done
通道,在最后一个例程返回之前关闭它,并在返回之前添加一个 <-done
语句。
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i++ {
wg.Add(1)
go k(i)
}
done := make(chan struct{})
go func() {
for i := range c {
countValue += i
}
close(done)
}()
wg.Wait()
close(c)
<-done
return
}
这样做会增加一些混乱,而且我认为有点凌乱。可能更容易将 wg.Wait()
调用移动到一个例程中:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
// 将 wg 作为参数添加,使得更容易将此函数移出此作用域
k := func(wg *sync.WaitGroup, i int) {
defer wg.Done()
c <- 5
}
wg.Add(numShards) // 一次增加等待组
for i := 0; i < numShards; i++ {
go k(&wg, i)
}
go func() {
wg.Wait()
close(c) // 这会结束对通道的循环
}()
// 只需迭代通道,直到它关闭
for i := range c {
countValue += i
}
// 我们已经添加了所有值到 countValue
return
}
英文:
There definitely is a better way to safely increment a variable: using sync/atomic
:
import "sync/atomic"
var words int64
k := func() {
_ = atomic.AddInt64(&words, 5) // increment atomically
}
Using a channel basically eliminates the need for a mutex, or takes away the the risk of concurrent access to the variable itself, and a waitgroup here is just a bit overkill
Channels:
words := 0
done := make(chan struct{}) // or use context
ch := make(chan int, numRoutines) // buffer so each routine can write
go func () {
read := 0
for i := range ch {
words += 5 // or use i or something
read++
if read == numRoutines {
break // we've received data from all routines
}
}
close(done) // indicate this routine has terminated
}()
for i := 0; i < numRoutines; i++ {
ch <- i // write whatever value needs to be used in the counting routine on the channel
}
<- done // wait for our routine that increments words to return
close(ch) // this channel is no longer needed
fmt.Printf("Counted %d\n", words)
As you can tell, the numRoutines
no longer is the number of routines, but rather the number of writes on the channel. You can move that to individual routines, still:
for i := 0; i < numRoutines; i++ {
go func(ch chan<- int, i int) {
// do stuff here
ch <- 5 * i // for example
}(ch, i)
}
WaitGroup:
Instead of using a context that you can cancel, or a channel, you can use a waitgroup + atomic to get the same result. The easiest way IMO to do so is to create a type:
type counter struct {
words int64
}
func (c *counter) doStuff(wg *sync.WaitGroup, i int) {
defer wg.Done()
_ = atomic.AddInt64(&c.words, i * 5) // whatever value you need to add
}
func main () {
cnt := counter{}
wg := sync.WaitGroup{}
wg.Add(numRoutines) // create the waitgroup
for i := 0; i < numRoutines; i++ {
go cnt.doStuff(&wg, i)
}
wg.Wait() // wait for all routines to finish
fmt.Println("Counted %d\n", cnt.words)
}
Fix for your third solution
As I mentioned in the comment: your third solution is still causing a race condition because the channel c
is never closed, meaning the routine:
go func () {
for i := range c {
countValue += i
}
}()
Never returns. The waitgroup also only ensures that you've sent all values on the channel, but not that the countValue
has been incremented to its final value. The fix would be to either close the channel after wg.Wait()
returns so the routine can return, and add a done
channel that you can close when this last routine returns, and add a <-done
statement before returning.
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i++ {
wg.Add(1)
go k(i)
}
done := make(chan struct{})
go func() {
for i := range c {
countValue += i
}
close(done)
}()
wg.Wait()
close(c)
<-done
return
}
This adds some clutter, though, and IMO is a bit messy. It might just be easier to just move the wg.Wait()
call to a routine:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
// add wg as argument, makes it easier to move this function outside of this scope
k := func(wg *sync.WaitGroup, i int) {
defer wg.Done()
c <- 5
}
wg.Add(numShards) // increment the waitgroup once
for i := 0; i < numShards; i++ {
go k(&wg, i)
}
go func() {
wg.Wait()
close(c) // this ends the loop over the channel
}()
// just iterate over the channel until it is closed
for i := range c {
countValue += i
}
// we've added all values to countValue
return
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论