英文:
Unable to use goroutines concurrently to find max until context is cancelled
问题
我已经成功地创建了一个没有 goroutine 的同步解决方案,用于对 compute
调用的 findMax
进行查找。
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
func findMax(ctx context.Context, concurrency int) uint64 {
var (
max uint64 = 0
num uint64 = 0
)
for i := 0; i < concurrency; i++ {
num = compute()
if num > max {
max = num
}
}
return max
}
func compute() uint64 {
// 注意:这是一个模拟阻塞操作的实现。
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
max := findMax(ctx, concurrency)
fmt.Println(max)
}
当我尝试使用 goroutine 来重复调用 compute
函数,直到调用者 main
函数取消上下文 ctx
时,每次都得到 0,而不是预期的 goroutine compute 函数调用的最大值。我尝试了不同的方法,大部分时间都会出现死锁。
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
func findMax(ctx context.Context, concurrency int) uint64 {
var (
max uint64 = 0
num uint64 = 0
)
for i := 0; i < concurrency; i++ {
select {
case <- ctx.Done():
return max
default:
go func() {
num = compute()
if num > max {
max = num
}
}()
}
}
return max
}
func compute() uint64 {
// 注意:这是一个模拟阻塞操作的实现。
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
max := findMax(ctx, concurrency)
fmt.Println(max)
}
链接:https://play.golang.org/p/3fFFq2xlXAE
英文:
I have successfully made a synchronous solution without goroutines to findMax
of compute
calls.
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
func findMax(ctx context.Context, concurrency int) uint64 {
var (
max uint64 = 0
num uint64 = 0
)
for i := 0; i < concurrency; i++ {
num = compute()
if num > max {
max = num
}
}
return max
}
func compute() uint64 {
// NOTE: This is a MOCK implementation of the blocking operation.
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
max := findMax(ctx, concurrency)
fmt.Println(max)
}
https://play.golang.org/p/lYXRNTDtNCI
When I attempt to use goroutines to use findMax
to repeatedly call compute
function using as many goroutines until context ctx
is canceled by the caller main
function. I am getting 0 every time and not the expected max of the grouting compute function calls. I have tried different ways to do it and get deadlock most of the time.
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
func findMax(ctx context.Context, concurrency int) uint64 {
var (
max uint64 = 0
num uint64 = 0
)
for i := 0; i < concurrency; i++ {
select {
case <- ctx.Done():
return max
default:
go func() {
num = compute()
if num > max {
max = num
}
}()
}
}
return max
}
func compute() uint64 {
// NOTE: This is a MOCK implementation of the blocking operation.
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
max := findMax(ctx, concurrency)
fmt.Println(max)
}
答案1
得分: 4
你的程序存在多个问题:
- 你正在生成多个goroutine,这些goroutine在操作共享变量
max
和num
,由于它们没有受到保护(例如通过互斥锁),导致数据竞争。 - 这里的
num
被每个工作goroutine修改,但它应该是每个工作goroutine的局部变量,否则计算的数据可能会丢失(例如,一个工作goroutine计算并将结果存储在num中,但紧接着第二个工作goroutine计算并替换了num的值)。
num = compute // 应该是 "num := compute"
- 你没有等待每个goroutine完成计算,这可能导致不正确的结果,因为即使上下文没有被取消,也没有考虑到每个工作goroutine的计算结果。使用
sync.WaitGroup
或通道来解决这个问题。
下面是一个示例程序,解决了你代码中的大部分问题:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type result struct {
sync.RWMutex
max uint64
}
func findMax(ctx context.Context, workers int) uint64 {
var (
res = result{}
wg = sync.WaitGroup{}
)
for i := 0; i < workers; i++ {
select {
case <-ctx.Done():
// RLock读取res.max
res.RLock()
ret := res.max
res.RUnlock()
return ret
default:
wg.Add(1)
go func() {
defer wg.Done()
num := compute()
// Lock以确保从res.max读取和写入res.max是安全的。否则,可能会发生数据竞争。
res.Lock()
if num > res.max {
res.max = num
}
res.Unlock()
}()
}
}
// 等待所有goroutine完成工作,即所有工作人员都完成计算并更新了最大值。
wg.Wait()
return res.max
}
func compute() uint64 {
rnd := rand.Int63n(100)
time.Sleep(time.Duration(rnd) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
fmt.Println(findMax(ctx, concurrency))
}
正如@Brits在评论中指出的,当上下文被取消时,请确保停止那些不再需要的工作goroutine的处理(如果可能)。
英文:
Your program has multiple problems:
- You are spawning multiple goroutines that are operating on shared variables i.e.,
max
andnum
leading to data race as they are not protected (eg. by Mutex). - Here
num
is modified by every worker goroutine but it should have been local to the worker otherwise the computed data could be lost (eg. one worker goroutine computed a result and stored it in num, but right after that a second worker computes and replaces the value of num).
num = compute // Should be "num := compute"
- You are not waiting for every goroutine to finish it's computation and it may result in incorrect results as every workers computation wasn't taken into account even though context wasn't cancelled. Use
sync.WaitGroup
or channels to fix this.
Here's a sample program that addresses most of the issues in your code:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type result struct {
sync.RWMutex
max uint64
}
func findMax(ctx context.Context, workers int) uint64 {
var (
res = result{}
wg = sync.WaitGroup{}
)
for i := 0; i < workers; i++ {
select {
case <-ctx.Done():
// RLock to read res.max
res.RLock()
ret := res.max
res.RUnlock()
return ret
default:
wg.Add(1)
go func() {
defer wg.Done()
num := compute()
// Lock so that read from res.max and write
// to res.max is safe. Else, data race could
// occur.
res.Lock()
if num > res.max {
res.max = num
}
res.Unlock()
}()
}
}
// Wait for all the goroutine to finish work i.e., all
// workers are done computing and updating the max.
wg.Wait()
return res.max
}
func compute() uint64 {
rnd := rand.Int63n(100)
time.Sleep(time.Duration(rnd) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
fmt.Println(findMax(ctx, concurrency))
}
As @Brits pointed out in the comments that when context is cancelled make sure that you stop those worker goroutines to stop processing (if possible) because it is not needed anymore.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论