How to combine the work of channels and wait group?

huangapple go评论140阅读模式
英文:

How to combine the work of channels and wait group?

问题

有一个代码:

func main() {
    rand.Seed(time.Now().UnixNano())
    res, err := MockFunc()
    fmt.Printf("res - %v, err - %v\n", res, err)
}

func MockFunc() ([]int, error) {
    args := []int{1, 2, 3, 4, 5}
    result := make([]int, 0)
    errCh := make(chan error)

    wg := &sync.WaitGroup{}
    wg.Add(len(args))
    for _, a := range args {
        go func(aLit int) {
            value, err := RandomError(aLit)
            if err != nil {
                errCh <- err
                return
            }
            result = append(result, value)
        }(a)
    }
    errValue := <-errCh // 如果没有错误,我会发生死锁 - fatal error: all goroutines are asleep - deadlock!
    if errValue != nil {
        fmt.Println("returning because err")
        return nil, errValue // 如果RandomError()返回错误,在这里返回预期的错误
    }
    wg.Wait()
    return result, nil
}

func RandomError(arg int) (int, error) {
    time.Sleep(time.Millisecond * 100 * time.Duration(arg))

    errChance := rand.Intn(100)
    if errChance > 40 {
        fmt.Printf("error on arg - %d\n", arg)
        return 0, errors.New("mock err")
    }

    return errChance, nil
}

我希望如果RandomError()函数返回错误,那么MockFunc()函数在等待所有waitgroup之前就完成并返回错误。但是,如果没有错误,我会遇到死锁问题;如果有错误,则一切都按预期工作。

我理解这是因为我没有关闭通道。但是,如果我在wg.Wait()之后关闭它,那么这样做就没有意义,因为如果第一次调用函数返回错误,我将等待所有其他调用的结果。

我希望的是,如果RandomeErr()的任何一次调用返回错误,我将把这个错误从MockFunc()返回到main(),而不必等待所有调用的结束。

英文:

There is a code:

func main() {
	rand.Seed(time.Now().UnixNano())
	res, err := MockFunc()
	fmt.Printf(&quot;res - %v, err - %v\n&quot;, res, err)
}

func MockFunc() ([]int, error) {
	args := []int{1, 2, 3, 4, 5}
	result := make([]int, 0)
	errCh := make(chan error)

	wg := &amp;sync.WaitGroup{}
	wg.Add(len(args))
	for _, a := range args {
		go func(aLit int) {
			value, err := RandomError(aLit)
			if err != nil {
				errCh &lt;- err
				return
			}
			result = append(result, value)
		}(a)
	}
	errValue := &lt;-errCh // if no errors, i have panic - fatal error: all goroutines are asleep - deadlock!
	if errValue != nil {
		fmt.Println(&quot;returning because err&quot;)
		return nil, errValue // if RandomError() returns error, returned here as expected
	}
	wg.Wait()
	return result, nil
}

func RandomError(arg int) (int, error) {
	time.Sleep(time.Millisecond * 100 * time.Duration(arg))

	errChance := rand.Intn(100)
	if errChance &gt; 40 {
		fmt.Printf(&quot;error on arg - %d\n&quot;, arg)
		return 0, errors.New(&quot;mock err&quot;)
	}

	return errChance, nil
}

I need that if the RandomError() function returns an error, then the MockFunc() function completed before waiting for all waitgroups and returned an error. But if there are no errors, then I get a deadlock, if there is, then everything works as expected.

I understand that this is because I am not closing the channel. But if I close it after wg.Wait(), then the meaning of this will be lost, since if the first call to the function returned an error, then I will wait for the results of all the other calls.

I need it so that if one of the calls to RandomeErr() returned an error, I returned this error from MockFunc() to main() without waiting for the end of all calls.

答案1

得分: 3

可能更容易使用errgroup.Group

我还必须创建另一个 goroutine 和 channel 将结果读入数组中。如果在多个 goroutine 中使用 result = append(result, value),你会在某个时刻遇到错误。

使用 errgroup.Group 的一个好处是,在函数返回之前,所有的 goroutine 都必须被清理和停止。这有助于防止内存泄漏,特别是当这些函数是较大系统的一部分时。

package main

import (
	"context"
	"errors"
	"fmt"
	"math/rand"
	"time"

	"golang.org/x/sync/errgroup"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	res, err := MockFunc()
	fmt.Printf("res - %v, err - %v\n", res, err)
}

func MockFunc() ([]int, error) {
	args := []int{1, 2, 3, 4, 5}
	resultChan := make(chan int)
	result := make([]int, 0)
	eg, ctx := errgroup.WithContext(context.Background())

	// 将所有结果读入数组
	eg.Go(func() error {
		for {
			select {
			case <-ctx.Done():
				// 总是检查上下文是否已取消,
				// 如果有错误,errgroup 将取消上下文,
				// 所有的 goroutine 都需要退出
				// 在 `eg.Wait` 返回之前。
				return context.Canceled
			case val := <-resultChan:
				result = append(result, val)
				if len(result) == len(args) {
					return nil
				}
			}
		}
	})

	for _, a := range args {
		aLit := a // 复制值,以防止重用内存地址
		eg.Go(func() error {
			value, err := RandomError(aLit)
			if err != nil {
				return err
			}
			select {
			case resultChan <- value:
			case <-ctx.Done():
				return context.Canceled
			default:
			}
			return nil
		})
	}
	return result, eg.Wait()
}

func RandomError(arg int) (int, error) {
	time.Sleep(time.Millisecond * 100 * time.Duration(arg))

	errChance := rand.Intn(100)
	if errChance > 40 {
		fmt.Printf("error on arg - %d\n", arg)
		return 0, errors.New("mock err")
	}

	return errChance, nil
}

如果将所有结果缓冲到一个 channel 中,也可以更简单:

func MockFunc() ([]int, error) {
	args := []int{1, 2, 3, 4, 5}
	resultChan := make(chan int, len(args))
	result := make([]int, 0)
	eg, ctx := errgroup.WithContext(context.Background())

	for _, a := range args {
		aLit := a // 复制值,以防止重用内存地址
		eg.Go(func() error {
			value, err := RandomError(aLit)
			if err != nil {
				return err
			}
			select {
			case resultChan <- value:
			case <-ctx.Done():
				return context.Canceled
			default:
			}
			return nil
		})
	}
	if err := eg.Wait(); err != nil {
		return nil, err
	}
	close(resultChan)
	for val := range resultChan {
		result = append(result, val)
	}
	return result, nil
}

这是我能想到的最简单的实现方式:


func MockFunc() (result []int, err error) {
	args := []int{1, 2, 3, 4, 5}
	resultChan := make(chan int)
	errChan := make(chan error)
	for _, a := range args {
		go func(aLit int) {
			value, err := RandomError(aLit)
			if err != nil {
				errChan <- err
			}
			resultChan <- value
		}(a)
	}
	for i := 0; i < len(args); i++ {
		select {
		case err := <-errChan:
			return nil, err
		case val := <-resultChan:
			result = append(result, val)
		}
	}
	return result, nil
}
英文:

Might be easier to use errgroup.Group.

I also had to create another goroutine and channel to read the results into the array. If you use result = append(result, value) within many goroutines you'll get an error at some point.

One of the benefits of using errgroup.Group is that all goroutines must be cleaned up and stopped before the function will return. This helps prevent memory leaks when these kinds of functions are part of larger system.

package main

import (
	&quot;context&quot;
	&quot;errors&quot;
	&quot;fmt&quot;
	&quot;math/rand&quot;
	&quot;time&quot;

	&quot;golang.org/x/sync/errgroup&quot;
)

func main() {
	rand.Seed(time.Now().UnixNano())
	res, err := MockFunc()
	fmt.Printf(&quot;res - %v, err - %v\n&quot;, res, err)
}

func MockFunc() ([]int, error) {
	args := []int{1, 2, 3, 4, 5}
	resultChan := make(chan int)
	result := make([]int, 0)
	eg, ctx := errgroup.WithContext(context.Background())

	// Read all results into an array
	eg.Go(func() error {
		for {
			select {
			case &lt;-ctx.Done():
				// Always check to see if the context has cancelled,
				// if there is an error errgroup will cancel the
				// context and all goroutines will need to exit
				// before `eg.Wait` returns.
				return context.Canceled
			case val := &lt;-resultChan:
				result = append(result, val)
				if len(result) == len(args) {
					return nil
				}
			}
		}
	})

	for _, a := range args {
		aLit := a // Copy the value so that we don&#39;t re-use the memory address
		eg.Go(func() error {
			value, err := RandomError(aLit)
			if err != nil {
				return err
			}
			select {
			case resultChan &lt;- value:
			case &lt;-ctx.Done():
				return context.Canceled
			default:
			}
			return nil
		})
	}
	return result, eg.Wait()
}

func RandomError(arg int) (int, error) {
	time.Sleep(time.Millisecond * 100 * time.Duration(arg))

	errChance := rand.Intn(100)
	if errChance &gt; 40 {
		fmt.Printf(&quot;error on arg - %d\n&quot;, arg)
		return 0, errors.New(&quot;mock err&quot;)
	}

	return errChance, nil
}

Can also be simpler if you buffer all the results into a channel:

func MockFunc() ([]int, error) {
	args := []int{1, 2, 3, 4, 5}
	resultChan := make(chan int, len(args))
	result := make([]int, 0)
	eg, ctx := errgroup.WithContext(context.Background())

	for _, a := range args {
		aLit := a // Copy the value so that we don&#39;t re-use the memory address
		eg.Go(func() error {
			value, err := RandomError(aLit)
			if err != nil {
				return err
			}
			select {
			case resultChan &lt;- value:
			case &lt;-ctx.Done():
				return context.Canceled
			default:
			}
			return nil
		})
	}
	if err := eg.Wait(); err != nil {
		return nil, err
	}
	close(resultChan)
	for val := range resultChan {
		result = append(result, val)
	}
	return result, nil
}

And here is the simplest possible implementation I could think of:


func MockFunc() (result []int, err error) {
	args := []int{1, 2, 3, 4, 5}
	resultChan := make(chan int)
	errChan := make(chan error)
	for _, a := range args {
		go func(aLit int) {
			value, err := RandomError(aLit)
			if err != nil {
				errChan &lt;- err
			}
			resultChan &lt;- value
		}(a)
	}
	for i := 0; i &lt; len(args); i++ {
		select {
		case err := &lt;-errChan:
			return nil, err
		case val := &lt;-resultChan:
			result = append(result, val)
		}
	}
	return result, nil
}

答案2

得分: 2

你有三个死锁:

  1. errValue := <-errCh // 如果没有错误,我会发生panic - fatal error: all goroutines are asleep - deadlock!
  2. wg.Wait() # 第44行
  3. errCh <- err # 第29行

第二个死锁从未发生,因为有返回语句或者第一个死锁。
为了修复死锁1,我们必须以非阻塞模式读取通道。
为了修复死锁2,我们必须在每个goroutine中使用wg.Done。
为了修复死锁3,我们必须使用带缓冲的通道,或者以某种方式消耗通道。这里为了简单起见,我选择了带缓冲的通道,但我们也可以在for循环中读取errCh,并在遇到错误时返回错误。

package main
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
res, err := MockFunc()
fmt.Printf("res - %v, err - %v\n", res, err)
}
func MockFunc() ([]int, error) {
args := []int{1, 2, 3, 4, 5}
result := make([]int, 0)
errCh := make(chan error, len(args))
wg := &sync.WaitGroup{}
wg.Add(len(args))
for _, a := range args {
go func(aLit int) {
defer wg.Done()
value, err := RandomError(aLit)
if err != nil {
errCh <- err
return
}
result = append(result, value)
}(a)
}
var errValue error
select {
case errValue = <-errCh: // 如果没有错误,我会发生panic - fatal error: all goroutines are asleep - deadlock!
default:
}
if errValue != nil {
fmt.Println("returning because err")
return nil, errValue // 如果RandomError()返回错误,在这里返回预期的错误
}
wg.Wait()
return result, nil
}
func RandomError(arg int) (int, error) {
time.Sleep(time.Millisecond * 100 * time.Duration(arg))
errChance := rand.Intn(100)
if errChance > 40 {
fmt.Printf("error on arg - %d\n", arg)
return 0, errors.New("mock err")
}
return errChance, nil
}
英文:

You have three deadlocks:

  1. errValue := <-errCh // if no errors, i have panic - fatal error: all goroutines are asleep - deadlock!
  2. wg.Wait() # line 44
  3. errCh <- err # line 29

the second one never happend because of return statement or first deadlock.
for fixing deadlock 1, we must read channel with non-blocking mode.
for fixing deadlock 2, we must use wg.Done in each goroutine.
for fixing deadlock 3, we must use buffered channel or somehow we must consume the channel. here for simplicity I choose buffered channel, but we can read errCh inside for loop and return error if we saw error.

package main
import (
&quot;errors&quot;
&quot;fmt&quot;
&quot;math/rand&quot;
&quot;sync&quot;
&quot;time&quot;
)
func main() {
rand.Seed(time.Now().UnixNano())
res, err := MockFunc()
fmt.Printf(&quot;res - %v, err - %v\n&quot;, res, err)
}
func MockFunc() ([]int, error) {
args := []int{1, 2, 3, 4, 5}
result := make([]int, 0)
errCh := make(chan error, len(args))
wg := &amp;sync.WaitGroup{}
wg.Add(len(args))
for _, a := range args {
go func(aLit int) {
defer wg.Done()
value, err := RandomError(aLit)
if err != nil {
errCh &lt;- err
return
}
result = append(result, value)
}(a)
}
var errValue error
select {
case errValue = &lt;-errCh: // if no errors, i have panic - fatal error: all goroutines are asleep - deadlock!
default:
}
if errValue != nil {
fmt.Println(&quot;returning because err&quot;)
return nil, errValue // if RandomError() returns error, returned here as expected
}
wg.Wait()
return result, nil
}
func RandomError(arg int) (int, error) {
time.Sleep(time.Millisecond * 100 * time.Duration(arg))
errChance := rand.Intn(100)
if errChance &gt; 40 {
fmt.Printf(&quot;error on arg - %d\n&quot;, arg)
return 0, errors.New(&quot;mock err&quot;)
}
return errChance, nil
}

huangapple
  • 本文由 发表于 2022年1月5日 01:41:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/70582931.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定