检查所有goroutine是否已经完成,而不使用wg.Wait()。

huangapple go评论68阅读模式
英文:

Check if all goroutines have finished without using wg.Wait()

问题

假设我有一个函数IsAPrimaryColour(),它通过调用三个其他函数IsRed()、*IsGreen()IsBlue()*来工作。由于这三个函数相互独立,它们可以并发运行。返回条件如下:

  1. 如果其中任何一个函数返回true,*IsAPrimaryColour()也应返回true。无需等待其他函数完成。也就是说:如果IsRed()为true或IsGreen()为true或IsBlue()为true,则IsPrimaryColour()*为true。
  2. 如果所有函数都返回false,*IsAPrimaryColour()也应返回false。也就是说:如果IsRed()为false且IsGreen()为false且IsBlue()为false,则IsPrimaryColour()*为false。
  3. 如果其中任何一个函数返回错误,*IsAPrimaryColour()*也应返回错误。无需等待其他函数完成或收集其他错误。

我遇到的问题是,如果其中任何一个函数返回true,我如何退出函数,但如果它们都返回false,又如何等待所有三个函数完成。如果我使用sync.WaitGroup对象,我将需要等待所有3个goroutine完成,然后才能从调用函数返回。

因此,我使用一个循环计数器来跟踪我从通道接收到消息的次数,并在接收到所有3个消息后退出程序。

虽然这个方法已经足够好用,但我想知道是否有更好的方法来实现这个功能?

英文:

Let's say I have a function IsAPrimaryColour() which works by calling three other functions IsRed(), IsGreen() and IsBlue(). Since the three functions are quite independent of one another, they can run concurrently. The return conditions are:

  1. If any of the three functions returns true, IsAPrimaryColour()
    should also return true. There is no need to wait for the other
    functions to finish. That is: IsPrimaryColour() is true if IsRed() is true OR IsGreen() is true OR IsBlue() is true
  2. If all functions return false, IsAPrimaryColour() should also return
    false. That is: IsPrimaryColour() is false if IsRed() is false AND IsGreen() is false AND IsBlue() is false
  3. If any of the three functions returns an error, IsAPrimaryColour()
    should also return the error. There is no need to wait for the other
    functions to finish, or to collect any other errors.

The thing I'm struggling with is how to exit the function if any other three functions return true, but also to wait for all three to finish if they all return false. If I use a sync.WaitGroup object, I will need to wait for all 3 go routines to finish before I can return from the calling function.

Therefore, I'm using a loop counter to keep track of how many times I have received a message on a channel and existing the program once I have received all 3 messages.

https://play.golang.org/p/kNfqWVq4Wix

package main

import (
	"errors"
	"fmt"
	"time"
)

func main() {
	x := "something"
	result, err := IsAPrimaryColour(x)

	if err != nil {
		fmt.Printf("Error: %v\n", err)
	} else {
		fmt.Printf("Result: %v\n", result)
	}
}

func IsAPrimaryColour(value interface{}) (bool, error) {
	found := make(chan bool, 3)
	errors := make(chan error, 3)
	defer close(found)
	defer close(errors)
	var nsec int64 = time.Now().UnixNano()

	//call the first function, return the result on the 'found' channel and any errors on the 'errors' channel
	go func() {
		result, err := IsRed(value)
		if err != nil {
			errors <- err
		} else {
			found <- result
		}
		fmt.Printf("IsRed done in %f nanoseconds \n", float64(time.Now().UnixNano()-nsec))
	}()

	//call the second function, return the result on the 'found' channel and any errors on the 'errors' channel
	go func() {
		result, err := IsGreen(value)
		if err != nil {
			errors <- err
		} else {
			found <- result
		}
		fmt.Printf("IsGreen done in %f nanoseconds \n", float64(time.Now().UnixNano()-nsec))
	}()

	//call the third function, return the result on the 'found' channel and any errors on the 'errors' channel
	go func() {
		result, err := IsBlue(value)
		if err != nil {
			errors <- err
		} else {
			found <- result
		}
		fmt.Printf("IsBlue done in %f nanoseconds \n", float64(time.Now().UnixNano()-nsec))
	}()

	//loop counter which will be incremented every time we read a value from the 'found' channel
	var counter int

	for {
		select {
		case result := <-found:
			counter++
			fmt.Printf("received a value on the results channel after %f nanoseconds. Value of counter is %d\n", float64(time.Now().UnixNano()-nsec), counter)
			if result {
				fmt.Printf("some goroutine returned true\n")
				return true, nil
			}
		case err := <-errors:
			if err != nil {
				fmt.Printf("some goroutine returned an error\n")
				return false, err
			}
		default:
		}

		//check if we have received all 3 messages on the 'found' channel. If so, all 3 functions must have returned false and we can thus return false also
		if counter == 3 {
			fmt.Printf("all goroutines have finished and none of them returned true\n")
			return false, nil
		}
	}
}

func IsRed(value interface{}) (bool, error) {
	return false, nil
}

func IsGreen(value interface{}) (bool, error) {
	time.Sleep(time.Millisecond * 100) //change this to a value greater than 200 to make this function take longer than IsBlue()
	return true, nil
}

func IsBlue(value interface{}) (bool, error) {
	time.Sleep(time.Millisecond * 200)
	return false, errors.New("something went wrong")
}

Although this works well enough, I wonder if I'm not overlooking some language feature to do this in a better way?

答案1

得分: 3

errgroup.WithContext 可以帮助简化这里的并发处理。

如果发生错误或找到结果,你希望停止所有的 goroutine。如果你可以将“找到结果”表示为一个特殊的错误(类似于 io.EOF),那么你可以使用 errgroup 内置的“在第一个错误时取消”行为来关闭整个组:

func IsAPrimaryColour(ctx context.Context, value interface{}) (bool, error) {
    var nsec int64 = time.Now().UnixNano()

    errFound := errors.New("result found")
    g, ctx := errgroup.WithContext(ctx)

    g.Go(func() error {
        result, err := IsRed(ctx, value)
        if result {
            err = errFound
        }
        fmt.Printf("IsRed done in %f nanoseconds \n", float64(time.Now().UnixNano()-nsec))
        return err
    })

    

    err := g.Wait()
    if err == errFound {
        fmt.Printf("some goroutine returned errFound\n")
        return true, nil
    }
    if err != nil {
        fmt.Printf("some goroutine returned an error\n")
        return false, err
    }
    fmt.Printf("all goroutines have finished and none of them returned true\n")
    return false, nil
}

(https://play.golang.org/p/MVeeBpDv4Mn)

英文:

errgroup.WithContext can help simplify the concurrency here.

You want to stop all of the goroutines if an error occurs, or if a result is found. If you can express “a result is found” as a distinguished error (along the lines of io.EOF), then you can use errgroup's built-in “cancel on first error” behavior to shut down the whole group:

func IsAPrimaryColour(ctx context.Context, value interface{}) (bool, error) {
	var nsec int64 = time.Now().UnixNano()

	errFound := errors.New("result found")
	g, ctx := errgroup.WithContext(ctx)

	g.Go(func() error {
		result, err := IsRed(ctx, value)
		if result {
			err = errFound
		}
		fmt.Printf("IsRed done in %f nanoseconds \n", float64(time.Now().UnixNano()-nsec))
		return err
	})

	

	err := g.Wait()
	if err == errFound {
		fmt.Printf("some goroutine returned errFound\n")
		return true, nil
	}
	if err != nil {
		fmt.Printf("some goroutine returned an error\n")
		return false, err
	}
	fmt.Printf("all goroutines have finished and none of them returned true\n")
	return false, nil
}

(https://play.golang.org/p/MVeeBpDv4Mn)

答案2

得分: 1

一些备注:

  • 你不需要关闭通道,因为你事先知道要读取的信号的预期数量。这已经足够作为退出条件。
  • 你不需要重复手动调用函数,可以使用切片。
  • 由于你使用了切片,甚至不需要计数器或静态值3,只需查看函数切片的长度。
  • switch 语句中的默认情况是无用的。只需在等待的输入上阻塞即可。

所以一旦你去掉了所有的冗余代码,代码看起来像这样:

func IsAPrimaryColour(value interface{}) (bool, error) {
	fns := []func(interface{}) (bool, error){IsRed, IsGreen, IsBlue}
	found := make(chan bool, len(fns))
	errors := make(chan error, len(fns))

	for i := 0; i < len(fns); i++ {
		fn := fns[i]
		go func() {
			result, err := fn(value)
			if err != nil {
				errors <- err
				return
			}
			found <- result
		}()
	}

	for i := 0; i < len(fns); i++ {
		select {
		case result := <-found:
			if result {
				return true, nil
			}
		case err := <-errors:
			if err != nil {
				return false, err
			}
		}
	}
	return false, nil
}
  • 你不需要在每个异步调用中观察时间,只需观察整个调用者返回所花费的时间。
func main() {
	now := time.Now()
	x := "something"
	result, err := IsAPrimaryColour(x)

	if err != nil {
		fmt.Printf("Error: %v\n", err)
	} else {
		fmt.Printf("Result: %v\n", result)
	}
	fmt.Println("it took", time.Since(now))
}

https://play.golang.org/p/bARHS6c6m1c

英文:

some remarks,

  • you dont need to close the channels, you know before hand the expected count of signals to read. This is sufficient for an exit condition.
  • you dont need to duplicate manual function calls, use a slice.
  • since you use a slice, you dont even need a counter, or a static value of 3, just look at the length of your func slice.
  • that default case into the switch is useless. just block on the input you are waiting for.

So once you got ride of all the fat, the code looks like


func IsAPrimaryColour(value interface{}) (bool, error) {
	fns := []func(interface{}) (bool, error){IsRed, IsGreen, IsBlue}
	found := make(chan bool, len(fns))
	errors := make(chan error, len(fns))

	for i := 0; i &lt; len(fns); i++ {
		fn := fns[i]
		go func() {
			result, err := fn(value)
			if err != nil {
				errors &lt;- err
				return
			}
			found &lt;- result
		}()
	}

	for i := 0; i &lt; len(fns); i++ {
		select {
		case result := &lt;-found:
			if result {
				return true, nil
			}
		case err := &lt;-errors:
			if err != nil {
				return false, err
			}
		}
	}
	return false, nil
}
  • you dont need to obsereve the time at the each and every async calls, just observe the time the overall caller took to return.
func main() {
	now := time.Now()
	x := &quot;something&quot;
	result, err := IsAPrimaryColour(x)

	if err != nil {
		fmt.Printf(&quot;Error: %v\n&quot;, err)
	} else {
		fmt.Printf(&quot;Result: %v\n&quot;, result)
	}
	fmt.Println(&quot;it took&quot;, time.Since(now))
}

https://play.golang.org/p/bARHS6c6m1c

答案3

得分: 1

处理多个并发函数调用并在满足条件后取消任何未完成的操作的惯用方法是使用上下文值。类似这样:

func operation1(ctx context.Context) bool { ... }
func operation2(ctx context.Context) bool { ... }
func operation3(ctx context.Context) bool { ... }

func atLeastOneSuccess() bool {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // 确保任何仍在运行的函数都收到停止信号
    results := make(chan bool, 3) // 用于发送结果的通道
    go func() {
        results <- operation1(ctx)
    }()
    go func() {
        results <- operation2(ctx)
    }()
    go func() {
        results <- operation3(ctx)
    }()
    for i := 0; i < 3; i++ {
        result := <-results
        if result {
            // 其中一个操作返回成功,因此我们将返回成功,并让延迟调用cancel()告知任何未完成的函数中止。
            return true
        }
    }
    // 我们已经遍历了所有的返回值,并且它们都是false
    return false
}

当然,这假设每个operationN函数实际上都会遵守已取消的上下文。这个答案讨论了如何做到这一点。

英文:

The idiomatic way to handle multiple concurrent function calls, and cancel any outstanding after a condition, is with the use of a context value. Something like this:

func operation1(ctx context.Context) bool { ... }
func operation2(ctx context.Context) bool { ... }
func operation3(ctx context.Context) bool { ... }

func atLeastOneSuccess() bool {
    ctx, cancel := context.WithCancel(context.Background()
    defer cancel() // Ensure any functions still running get the signal to stop
    results := make(chan bool, 3) // A channel to send results
    go func() {
        results &lt;- operation1(ctx)
    }()
    go func() {
        results &lt;- operation2(ctx)
    }()
    go func() {
        results &lt;- operation3(ctx)
    }()
    for i := 0; i &lt; 3; i++ {
        result := &lt;-results
        if result {
            // One of the operations returned success, so we&#39;ll return that
            // and let the deferred call to cancel() tell any outstanding
            // functions to abort.
            return true
        }
    }
    // We&#39;ve looped through all return values, and they were all false
    return false
}

Of course this assumes that each of the operationN functions actually honors a canceled context. This answer discusses how to do that.

答案4

得分: 0

你不必在Wait上阻塞主 goroutine,你可以阻塞其他东西,例如:

doneCh := make(chan struct{})

go func() {
    wg.Wait()
    close(doneCh)
}()

然后你可以在select语句中等待doneCh,以查看所有的routine是否都已经完成。

英文:

You don't have to block the main goroutine on the Wait, you could block something else, for example:

doneCh := make(chan struct{}{})

go func() {
    wg.Wait()
    close(doneCh)
}()

Then you can wait on doneCh in your select to see if all the routines have finished.

huangapple
  • 本文由 发表于 2021年8月26日 21:22:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/68939352.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定