无法同时使用goroutines来查找最大值,直到上下文被取消。

huangapple go评论148阅读模式
英文:

Unable to use goroutines concurrently to find max until context is cancelled

问题

我已经成功地创建了一个没有 goroutine 的同步解决方案,用于对 compute 调用的 findMax 进行查找。

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"
)

func findMax(ctx context.Context, concurrency int) uint64 {
    var (
        max uint64 = 0
        num uint64 = 0
    )

    for i := 0; i < concurrency; i++ {
        num = compute()

        if num > max {
            max = num
        }
    }

    return max
}

func compute() uint64 {
    // 注意:这是一个模拟阻塞操作的实现。
    
    time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
    return rand.Uint64()
}

func main() {
    maxDuration := 2 * time.Second
    concurrency := 10

    ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
    defer cancel()

    max := findMax(ctx, concurrency)
    fmt.Println(max)
}

当我尝试使用 goroutine 来重复调用 compute 函数,直到调用者 main 函数取消上下文 ctx 时,每次都得到 0,而不是预期的 goroutine compute 函数调用的最大值。我尝试了不同的方法,大部分时间都会出现死锁。

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"
)

func findMax(ctx context.Context, concurrency int) uint64 {
    var (
        max uint64 = 0
        num uint64 = 0
    )

    for i := 0; i < concurrency; i++ {
        select {
        case <- ctx.Done():
            return max
        default:
            go func() {
                num = compute()
                if num > max {
                    max = num
                }
            }()
        }
    }

    return max
}

func compute() uint64 {
    // 注意:这是一个模拟阻塞操作的实现。
    
    time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
    return rand.Uint64()
}

func main() {
    maxDuration := 2 * time.Second
    concurrency := 10

    ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
    defer cancel()

    max := findMax(ctx, concurrency)
    fmt.Println(max)
}

链接:https://play.golang.org/p/3fFFq2xlXAE

英文:

I have successfully made a synchronous solution without goroutines to findMax of compute calls.

package main

import (
    &quot;context&quot;
    &quot;fmt&quot;
    &quot;math/rand&quot;
    &quot;time&quot;
)

func findMax(ctx context.Context, concurrency int) uint64 {
    var (
        max uint64 = 0
        num uint64 = 0
    )

    for i := 0; i &lt; concurrency; i++ {
        num = compute()

        if num &gt; max {
            max = num
        }
    }

    return max
}

func compute() uint64 {
    // NOTE: This is a MOCK implementation of the blocking operation.
    
    time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
    return rand.Uint64()
}

func main() {
    maxDuration := 2 * time.Second
    concurrency := 10

    ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
    defer cancel()

    max := findMax(ctx, concurrency)
    fmt.Println(max)
}


https://play.golang.org/p/lYXRNTDtNCI

When I attempt to use goroutines to use findMax to repeatedly call compute function using as many goroutines until context ctx is canceled by the caller main function. I am getting 0 every time and not the expected max of the grouting compute function calls. I have tried different ways to do it and get deadlock most of the time.

package main

import (
    &quot;context&quot;
    &quot;fmt&quot;
    &quot;math/rand&quot;
    &quot;time&quot;
)

func findMax(ctx context.Context, concurrency int) uint64 {
    var (
        max uint64 = 0
        num uint64 = 0
    )

    for i := 0; i &lt; concurrency; i++ {
        select {
        case &lt;- ctx.Done():
            return max
        default:
            go func() {
		        num = compute()
                if num &gt; max {
           	        max = num
        	    }
	        }()
        }
    }

    return max
}

func compute() uint64 {
    // NOTE: This is a MOCK implementation of the blocking operation.
    
    time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
    return rand.Uint64()
}

func main() {
    maxDuration := 2 * time.Second
    concurrency := 10

    ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
    defer cancel()

    max := findMax(ctx, concurrency)
    fmt.Println(max)
}

https://play.golang.org/p/3fFFq2xlXAE

答案1

得分: 4

你的程序存在多个问题:

  1. 你正在生成多个goroutine,这些goroutine在操作共享变量maxnum,由于它们没有受到保护(例如通过互斥锁),导致数据竞争。
  2. 这里的num被每个工作goroutine修改,但它应该是每个工作goroutine的局部变量,否则计算的数据可能会丢失(例如,一个工作goroutine计算并将结果存储在num中,但紧接着第二个工作goroutine计算并替换了num的值)。
 num = compute // 应该是 "num := compute"
  1. 你没有等待每个goroutine完成计算,这可能导致不正确的结果,因为即使上下文没有被取消,也没有考虑到每个工作goroutine的计算结果。使用sync.WaitGroup或通道来解决这个问题。

下面是一个示例程序,解决了你代码中的大部分问题:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type result struct {
	sync.RWMutex
	max uint64
}

func findMax(ctx context.Context, workers int) uint64 {
	var (
		res = result{}
		wg  = sync.WaitGroup{}
	)

	for i := 0; i < workers; i++ {
		select {
		case <-ctx.Done():
			// RLock读取res.max
			res.RLock()
			ret := res.max
			res.RUnlock()
			return ret
		default:
			wg.Add(1)
			go func() {
				defer wg.Done()
				num := compute()

				// Lock以确保从res.max读取和写入res.max是安全的。否则,可能会发生数据竞争。
				res.Lock()
				if num > res.max {
					res.max = num
				}
				res.Unlock()
			}()
		}
	}

	// 等待所有goroutine完成工作,即所有工作人员都完成计算并更新了最大值。
	wg.Wait()

	return res.max
}

func compute() uint64 {
	rnd := rand.Int63n(100)
	time.Sleep(time.Duration(rnd) * time.Millisecond)
	return rand.Uint64()
}

func main() {
	maxDuration := 2 * time.Second
	concurrency := 10

	ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
	defer cancel()

	fmt.Println(findMax(ctx, concurrency))
}

正如@Brits在评论中指出的,当上下文被取消时,请确保停止那些不再需要的工作goroutine的处理(如果可能)。

英文:

Your program has multiple problems:

  1. You are spawning multiple goroutines that are operating on shared variables i.e., max and num leading to data race as they are not protected (eg. by Mutex).
  2. Here num is modified by every worker goroutine but it should have been local to the worker otherwise the computed data could be lost (eg. one worker goroutine computed a result and stored it in num, but right after that a second worker computes and replaces the value of num).
 num = compute // Should be &quot;num := compute&quot;
  1. You are not waiting for every goroutine to finish it's computation and it may result in incorrect results as every workers computation wasn't taken into account even though context wasn't cancelled. Use sync.WaitGroup or channels to fix this.

Here's a sample program that addresses most of the issues in your code:

package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;math/rand&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

type result struct {
	sync.RWMutex
	max uint64
}

func findMax(ctx context.Context, workers int) uint64 {
	var (
		res = result{}
		wg  = sync.WaitGroup{}
	)

	for i := 0; i &lt; workers; i++ {
		select {
		case &lt;-ctx.Done():
			// RLock to read res.max
			res.RLock()
			ret := res.max
			res.RUnlock()
			return ret
		default:
			wg.Add(1)
			go func() {
				defer wg.Done()
				num := compute()

				// Lock so that read from res.max and write
				// to res.max is safe. Else, data race could
				// occur.
				res.Lock()
				if num &gt; res.max {
					res.max = num
				}
				res.Unlock()
			}()
		}
	}

	// Wait for all the goroutine to finish work i.e., all
	// workers are done computing and updating the max.
	wg.Wait()

	return res.max
}

func compute() uint64 {
	rnd := rand.Int63n(100)
	time.Sleep(time.Duration(rnd) * time.Millisecond)
	return rand.Uint64()
}

func main() {
	maxDuration := 2 * time.Second
	concurrency := 10

	ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
	defer cancel()

	fmt.Println(findMax(ctx, concurrency))
}

As @Brits pointed out in the comments that when context is cancelled make sure that you stop those worker goroutines to stop processing (if possible) because it is not needed anymore.

huangapple
  • 本文由 发表于 2021年6月17日 14:40:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/68014184.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定