无法同时使用goroutines来查找最大值,直到上下文被取消。

huangapple go评论118阅读模式
英文:

Unable to use goroutines concurrently to find max until context is cancelled

问题

我已经成功地创建了一个没有 goroutine 的同步解决方案,用于对 compute 调用的 findMax 进行查找。

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "time"
  7. )
  8. func findMax(ctx context.Context, concurrency int) uint64 {
  9. var (
  10. max uint64 = 0
  11. num uint64 = 0
  12. )
  13. for i := 0; i < concurrency; i++ {
  14. num = compute()
  15. if num > max {
  16. max = num
  17. }
  18. }
  19. return max
  20. }
  21. func compute() uint64 {
  22. // 注意:这是一个模拟阻塞操作的实现。
  23. time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
  24. return rand.Uint64()
  25. }
  26. func main() {
  27. maxDuration := 2 * time.Second
  28. concurrency := 10
  29. ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
  30. defer cancel()
  31. max := findMax(ctx, concurrency)
  32. fmt.Println(max)
  33. }

当我尝试使用 goroutine 来重复调用 compute 函数,直到调用者 main 函数取消上下文 ctx 时,每次都得到 0,而不是预期的 goroutine compute 函数调用的最大值。我尝试了不同的方法,大部分时间都会出现死锁。

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "time"
  7. )
  8. func findMax(ctx context.Context, concurrency int) uint64 {
  9. var (
  10. max uint64 = 0
  11. num uint64 = 0
  12. )
  13. for i := 0; i < concurrency; i++ {
  14. select {
  15. case <- ctx.Done():
  16. return max
  17. default:
  18. go func() {
  19. num = compute()
  20. if num > max {
  21. max = num
  22. }
  23. }()
  24. }
  25. }
  26. return max
  27. }
  28. func compute() uint64 {
  29. // 注意:这是一个模拟阻塞操作的实现。
  30. time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
  31. return rand.Uint64()
  32. }
  33. func main() {
  34. maxDuration := 2 * time.Second
  35. concurrency := 10
  36. ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
  37. defer cancel()
  38. max := findMax(ctx, concurrency)
  39. fmt.Println(max)
  40. }

链接:https://play.golang.org/p/3fFFq2xlXAE

英文:

I have successfully made a synchronous solution without goroutines to findMax of compute calls.

  1. package main
  2. import (
  3. &quot;context&quot;
  4. &quot;fmt&quot;
  5. &quot;math/rand&quot;
  6. &quot;time&quot;
  7. )
  8. func findMax(ctx context.Context, concurrency int) uint64 {
  9. var (
  10. max uint64 = 0
  11. num uint64 = 0
  12. )
  13. for i := 0; i &lt; concurrency; i++ {
  14. num = compute()
  15. if num &gt; max {
  16. max = num
  17. }
  18. }
  19. return max
  20. }
  21. func compute() uint64 {
  22. // NOTE: This is a MOCK implementation of the blocking operation.
  23. time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
  24. return rand.Uint64()
  25. }
  26. func main() {
  27. maxDuration := 2 * time.Second
  28. concurrency := 10
  29. ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
  30. defer cancel()
  31. max := findMax(ctx, concurrency)
  32. fmt.Println(max)
  33. }

https://play.golang.org/p/lYXRNTDtNCI

When I attempt to use goroutines to use findMax to repeatedly call compute function using as many goroutines until context ctx is canceled by the caller main function. I am getting 0 every time and not the expected max of the grouting compute function calls. I have tried different ways to do it and get deadlock most of the time.

  1. package main
  2. import (
  3. &quot;context&quot;
  4. &quot;fmt&quot;
  5. &quot;math/rand&quot;
  6. &quot;time&quot;
  7. )
  8. func findMax(ctx context.Context, concurrency int) uint64 {
  9. var (
  10. max uint64 = 0
  11. num uint64 = 0
  12. )
  13. for i := 0; i &lt; concurrency; i++ {
  14. select {
  15. case &lt;- ctx.Done():
  16. return max
  17. default:
  18. go func() {
  19. num = compute()
  20. if num &gt; max {
  21. max = num
  22. }
  23. }()
  24. }
  25. }
  26. return max
  27. }
  28. func compute() uint64 {
  29. // NOTE: This is a MOCK implementation of the blocking operation.
  30. time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
  31. return rand.Uint64()
  32. }
  33. func main() {
  34. maxDuration := 2 * time.Second
  35. concurrency := 10
  36. ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
  37. defer cancel()
  38. max := findMax(ctx, concurrency)
  39. fmt.Println(max)
  40. }

https://play.golang.org/p/3fFFq2xlXAE

答案1

得分: 4

你的程序存在多个问题:

  1. 你正在生成多个goroutine,这些goroutine在操作共享变量maxnum,由于它们没有受到保护(例如通过互斥锁),导致数据竞争。
  2. 这里的num被每个工作goroutine修改,但它应该是每个工作goroutine的局部变量,否则计算的数据可能会丢失(例如,一个工作goroutine计算并将结果存储在num中,但紧接着第二个工作goroutine计算并替换了num的值)。
  1. num = compute // 应该是 "num := compute"
  1. 你没有等待每个goroutine完成计算,这可能导致不正确的结果,因为即使上下文没有被取消,也没有考虑到每个工作goroutine的计算结果。使用sync.WaitGroup或通道来解决这个问题。

下面是一个示例程序,解决了你代码中的大部分问题:

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. type result struct {
  10. sync.RWMutex
  11. max uint64
  12. }
  13. func findMax(ctx context.Context, workers int) uint64 {
  14. var (
  15. res = result{}
  16. wg = sync.WaitGroup{}
  17. )
  18. for i := 0; i < workers; i++ {
  19. select {
  20. case <-ctx.Done():
  21. // RLock读取res.max
  22. res.RLock()
  23. ret := res.max
  24. res.RUnlock()
  25. return ret
  26. default:
  27. wg.Add(1)
  28. go func() {
  29. defer wg.Done()
  30. num := compute()
  31. // Lock以确保从res.max读取和写入res.max是安全的。否则,可能会发生数据竞争。
  32. res.Lock()
  33. if num > res.max {
  34. res.max = num
  35. }
  36. res.Unlock()
  37. }()
  38. }
  39. }
  40. // 等待所有goroutine完成工作,即所有工作人员都完成计算并更新了最大值。
  41. wg.Wait()
  42. return res.max
  43. }
  44. func compute() uint64 {
  45. rnd := rand.Int63n(100)
  46. time.Sleep(time.Duration(rnd) * time.Millisecond)
  47. return rand.Uint64()
  48. }
  49. func main() {
  50. maxDuration := 2 * time.Second
  51. concurrency := 10
  52. ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
  53. defer cancel()
  54. fmt.Println(findMax(ctx, concurrency))
  55. }

正如@Brits在评论中指出的,当上下文被取消时,请确保停止那些不再需要的工作goroutine的处理(如果可能)。

英文:

Your program has multiple problems:

  1. You are spawning multiple goroutines that are operating on shared variables i.e., max and num leading to data race as they are not protected (eg. by Mutex).
  2. Here num is modified by every worker goroutine but it should have been local to the worker otherwise the computed data could be lost (eg. one worker goroutine computed a result and stored it in num, but right after that a second worker computes and replaces the value of num).
  1. num = compute // Should be &quot;num := compute&quot;
  1. You are not waiting for every goroutine to finish it's computation and it may result in incorrect results as every workers computation wasn't taken into account even though context wasn't cancelled. Use sync.WaitGroup or channels to fix this.

Here's a sample program that addresses most of the issues in your code:

  1. package main
  2. import (
  3. &quot;context&quot;
  4. &quot;fmt&quot;
  5. &quot;math/rand&quot;
  6. &quot;sync&quot;
  7. &quot;time&quot;
  8. )
  9. type result struct {
  10. sync.RWMutex
  11. max uint64
  12. }
  13. func findMax(ctx context.Context, workers int) uint64 {
  14. var (
  15. res = result{}
  16. wg = sync.WaitGroup{}
  17. )
  18. for i := 0; i &lt; workers; i++ {
  19. select {
  20. case &lt;-ctx.Done():
  21. // RLock to read res.max
  22. res.RLock()
  23. ret := res.max
  24. res.RUnlock()
  25. return ret
  26. default:
  27. wg.Add(1)
  28. go func() {
  29. defer wg.Done()
  30. num := compute()
  31. // Lock so that read from res.max and write
  32. // to res.max is safe. Else, data race could
  33. // occur.
  34. res.Lock()
  35. if num &gt; res.max {
  36. res.max = num
  37. }
  38. res.Unlock()
  39. }()
  40. }
  41. }
  42. // Wait for all the goroutine to finish work i.e., all
  43. // workers are done computing and updating the max.
  44. wg.Wait()
  45. return res.max
  46. }
  47. func compute() uint64 {
  48. rnd := rand.Int63n(100)
  49. time.Sleep(time.Duration(rnd) * time.Millisecond)
  50. return rand.Uint64()
  51. }
  52. func main() {
  53. maxDuration := 2 * time.Second
  54. concurrency := 10
  55. ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
  56. defer cancel()
  57. fmt.Println(findMax(ctx, concurrency))
  58. }

As @Brits pointed out in the comments that when context is cancelled make sure that you stop those worker goroutines to stop processing (if possible) because it is not needed anymore.

huangapple
  • 本文由 发表于 2021年6月17日 14:40:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/68014184.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定