为什么我的函数不等待goroutine完成?

huangapple go评论85阅读模式
英文:

Why is my function not waiting for the goroutines to complete?

问题

我有一个函数,它发送一个GET请求,然后将响应和编码后的响应存储在一个结构体中。它接受一个指向等待组的指针。

这是那个函数:

type EncodedData string

type EncodedImage struct {
    Data        []byte
    EncodedData EncodedData
    Error       error
}

func GetPainting(url string, EI *EncodedImage, wg *sync.WaitGroup) {
    defer wg.Done()

    res, err := http.Get(url)
    if err != nil {
        EI.Error = errors.Wrapf(err, "无法从提供的URL %s 获取数据", url)
    }

    body, err := ioutil.ReadAll(res.Body)
    if err != nil {
        EI.Error = err
    }

    encoded := b64.StdEncoding.EncodeToString(body)
    EI.Data, EI.EncodedData = body, EncodedData(encoded)
}

这是调用上述函数的函数。它是gin路由的处理程序。

func Search(db *gorm.DB) gin.HandlerFunc {
    return func(c *gin.Context) {
        
        // 这只是接收一个搜索词,进行查询,然后将结果加载到"results"中。
        term := c.Param("term")
        var results []models.Searches
        db.Table("searches").Where("to_tsvector(\"searches\".\"Title\" || '' || \"searches\".\"Artist_Name\") @@ plainto_tsquery(?)", term).Find(&results)

        
        var wg sync.WaitGroup

        // results是一个结构体的切片
        for i, re := range results {

            var ed EncodedImage
            wg.Add(1)

            // 这里是上面定义的函数
            go GetPainting(re.IMG, &ed, &wg)
            if ed.Error != nil {
                c.JSON(http.StatusInternalServerError, ed.Error.Error())
                panic(ed.Error)
            }

            results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)
        }

        wg.Wait()
        c.JSON(http.StatusOK, results)
    }
}

JSON响应显示"data:image/jpeg;base64,",这意味着goroutine没有等待完成。

在不使用额外的goroutine的情况下,所有这些都可以正常工作。换句话说,当我引入"go"关键字时,事情就停止工作了。我想尝试这样做以加快速度。非常感谢您提供的任何见解或建议!

英文:

I have a function that makes a get request and then store both the response and the encoded response in a struct. It takes in a pointer to a wait group

Here is that function

type EncodedData string

type EncodedImage struct {
	Data        []byte
	EncodedData EncodedData
	Error       error
}

func GetPainting(url string, EI *EncodedImage, wg *sync.WaitGroup) {
	defer wg.Done()

	res, err := http.Get(url)
	if err != nil {
		EI.Error = errors.Wrapf(err, "unable to fetch from provided url %s", url)
	}

	body, err := ioutil.ReadAll(res.Body)
	if err != nil {
		EI.Error = err
	}

	encoded := b64.StdEncoding.EncodeToString(body)
	EI.Data, EI.EncodedData = body, EncodedData(encoded)
}

Here is the function that calls the previous function. It's a handler for a gin router.

func Search(db *gorm.DB) gin.HandlerFunc {
	return func(c *gin.Context) {
		
		// this is just receiving a search term, making a query, and then loading it into "results". 
		term := c.Param("term")
		var results []models.Searches
		db.Table("searches").Where("to_tsvector(\"searches\".\"Title\" || '' || \"searches\".\"Artist_Name\") @@ plainto_tsquery(?)", term).Find(&results)

		
		var wg sync.WaitGroup

		// results is an slice of structs
		for i, re := range results {

			var ed EncodedImage
			wg.Add(1)

			// here is the function defined above
			go GetPainting(re.IMG, &ed, &wg)
			if ed.Error != nil {
				c.JSON(http.StatusInternalServerError, ed.Error.Error())
				panic(ed.Error)
			}

			results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)
		}

		wg.Wait()
		c.JSON(http.StatusOK, results)
}

The JSON response shows "data:image/jpeg;base64," which means the goroutines aren't being waited on to completion

This all works without using additional goroutines. In other words, things stopped working when I introduced the go keyword. I wanted to try this to speed things up. Any insight or advice is greatly appreciated!

答案1

得分: 1

问题在这里:

go GetPainting(re.IMG, &ed, &wg) // goroutine alters ed
...
results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)

go语句启动一个函数调用的执行,作为一个独立的并发线程控制...”(来源);你不应该假设goroutine何时执行任何操作。所以可能会发生以下情况(我没有详细查看goroutine的管理方式):

  1. go GetPainting(re.IMG, &ed, &wg) - 运行时调度GetPainting运行。
  2. results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData) 运行(ed.EndodedData仍然为nil)。
  3. GetPainting运行。

你创建了一个数据竞争;也就是说,你有一个goroutine在写入ed.EncodedData,另一个goroutine在没有同步的情况下从中读取。通常很难预测在发生竞争时会发生什么;但在这种情况下,你的goroutine正在执行IO操作(http.Get),所以很可能写入会发生在读取之后。

为了帮助解释这个问题(以及潜在的解决方案),让我们简化你的示例(playground):

func routine(wg *sync.WaitGroup, val *int) {
	defer wg.Done()
	time.Sleep(time.Microsecond)
	*val = rand.Int()
}

func main() {
	const iterations = 5
	var wg sync.WaitGroup
	wg.Add(iterations)
	r := make([]int, iterations)
	results := make([]string, iterations)
	for i := 0; i < 5; i++ {
		go routine(&wg, &r[i])
		results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
	}
	wg.Wait()
	for i := 0; i < 5; i++ {
		fmt.Println(r[i], results[i])
	}
}

如你所见,在WaitGroup完成后,r(类似于你的ed)被填充,但results包含了全部为0的值。这指向了一个简单的解决方案(playground):

for i := 0; i < 5; i++ {
	go routine(&wg, &r[i])
}
wg.Wait()
results := make([]string, iterations)
for i := 0; i < 5; i++ {
	results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
}
for i := 0; i < 5; i++ {
	fmt.Println(r[i], results[i])
}

这个解决方案有效,因为在你知道goroutine完成之前,你没有访问它们写入的任何内容(通过WaitGroup)。将这种方法转移到你的代码中相对简单(创建一个utils.EncodedImage的切片,在wg.Wait()之后检查错误/结果)。

虽然上述方法有效,但它在所有goroutine完成之前永远不会完成。通常情况下,这是不可取的,例如,如果接收到一个错误是致命的,那么你可能希望尽快向用户返回响应(并停止任何正在进行的工作)。

处理这个问题有多种方法。将Context传递给函数是一种非常常见的方法,可以使你在何时停止它们时发出信号(对于你的用例,参见NewRequestWithContext)。在处理响应时,你可以自己编写代码(但很容易泄漏goroutine),或者使用类似golang.org/x/sync/errgroup的东西。这是一个示例(playground):

func routine(ctx context.Context, val *int) error {
	select {
	case <-time.After(time.Microsecond * time.Duration(rand.Intn(20))): // select will exit after a number of Milliseconds
	case <-ctx.Done(): // unless this is met (operation cancelled)
		fmt.Println("GoRoutine ending due to context")
		return ctx.Err()
	}
	*val = rand.Int()
	fmt.Println("generated ", *val)
	if simulateErrors && *val > (math.MaxInt/2) {
		return errors.New("Number too big")
	}
	return nil
}

func main() {
	const iterations = 5
	// In your case source context should probably come from gin.Context so the operation is cancelled if the connection drops
	g, ctx := errgroup.WithContext(context.Background())
	r := make([]int, iterations)
	for i := 0; i < iterations; i++ {
		x := &r[i]
		g.Go(func() error {
			return routine(ctx, x)
		})
	}
	if err := g.Wait(); err != nil {
		fmt.Println("Got an error!", err)
		return // Here you send error as response (you might want to send something generic to avoid leaking system detail)
	}
	// Everything has processed OK
	results := make([]string, iterations)
	for i := 0; i < iterations; i++ {
		results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
		fmt.Println(r[i], results[i])
	}
}

*注意:*在生产代码中使用panic要小心。在你的示例中,当HTTP Get失败时,你正在使用panic;这是可能发生的情况,如果发生这种情况,你不希望应用程序关闭(向最终用户返回一个合理的错误,并可能记录失败)。虽然可以捕获panic,但通常最好在检测到错误时处理它们。

英文:

The issue is here:

go GetPainting(re.IMG, &ed, &wg) // goroutine alters ed
...
results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)

"A go statement starts the execution of a function call as an independent concurrent thread of control..." (source); you should not make assumptions as to when the goroutine will perform any action. So what might (I have not looked exactly how goroutines are currently managed) happen is something like:

  1. go GetPainting(re.IMG, &ed, &wg) - runtime schedules GetPainting to run.
  2. results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData) runs (ed.EndodedData is still nil).
  3. GetPainting runs.

You have created a data race; that is you have one goroutine writing to ed.EncodedData and another reading from it without synchronisation. Generally it's difficult to predict what will happen when there is a race; but in this case your goroutine is performing IO (http.Get) so it's very probable that the write will occur after the read.

To help explain this (and potential solutions) lets simplify your example (playground):

func routine(wg *sync.WaitGroup, val *int) {
	defer wg.Done()
	time.Sleep(time.Microsecond)
	*val = rand.Int()
}

func main() {
	const iterations = 5
	var wg sync.WaitGroup
	wg.Add(iterations)
	r := make([]int, iterations)
	results := make([]string, iterations)
	for i := 0; i < 5; i++ {
		go routine(&wg, &r[i])
		results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
	}
	wg.Wait()
	for i := 0; i < 5; i++ {
		fmt.Println(r[i], results[i])
	}
}

As you will see after the WaitGroup is done r (similar to your ed) is populated but results contains all 0 values. This points towards a simple solution (playground):

for i := 0; i < 5; i++ {
go routine(&wg, &r[i])
}
wg.Wait()
results := make([]string, iterations)
for i := 0; i < 5; i++ {
results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
}
for i := 0; i < 5; i++ {
fmt.Println(r[i], results[i])
}

This works because you are not accessing anything that the goroutines write to before you know that they are finished (via the WaitGroup). It's fairly simple to transfer this method into your code (create a slice of utils.EncodedImage and check for errors/results after the wg.Wait()).

While the above works it will never complete before all goroutines complete. Often that is not desirable, for instance, if receiving one error is fatal then you probably want to return a response to the user (and stop any ongoing work) as soon as the error is received.

There are a range of ways of dealing with this. Passing functions a Context is a very common means of enabling you to signal when they should stop (for your use-case see NewRequestWithContext). When it comes to handling the responses you can code this yourself (but it is easy to leak goroutines) or use something like golang.org/x/sync/errgroup. Here is an example (playground):

func routine(ctx context.Context, val *int) error {
	select {
	case <-time.After(time.Microsecond * time.Duration(rand.Intn(20))): // select will exit after a number of Milliseconds
	case <-ctx.Done(): // unless this is met (operation cancelled)
		fmt.Println("GoRoutine ending due to context")
		return ctx.Err()
	}
	*val = rand.Int()
	fmt.Println("generated ", *val)
	if simulateErrors && *val > (math.MaxInt/2) {
		return errors.New("Number too big")
	}
	return nil
}

func main() {
	const iterations = 5
	// In your case source context should probably come from gin.Context so the operation is cancelled if the connection drops
	g, ctx := errgroup.WithContext(context.Background())
	r := make([]int, iterations)
	for i := 0; i < iterations; i++ {
		x := &r[i]
		g.Go(func() error {
			return routine(ctx, x)
		})
	}
	if err := g.Wait(); err != nil {
		fmt.Println("Got an error!", err)
		return // Here you send error as response (you might want to send something generic to avoid leaking system detail)
	}
	// Everything has processed OK
	results := make([]string, iterations)
	for i := 0; i < iterations; i++ {
		results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
		fmt.Println(r[i], results[i])
	}
}

Note: Be careful using panic in production code. In your example you are doing this when an HTTP Get fails; this is something that is likely to happen at some point and you don't really want your application to shutdown if it does (return a sensible error to the end user and perhaps log the failure). It is possible to catch panics but its generally best to deal with errors as they are detected.

答案2

得分: 0

@Brits在上面的评论中给出了正确的答案。在goroutine中设置results[i].IMG是正确的解决方案。我还添加了他建议的错误处理。以下是更新后的代码,供需要的人使用:

注意:我将GetPainting方法设置为EncodedImage的一个方法,以提高可读性。它为errgroup.Group.Go()返回一个错误。

func (EI *EncodedImage) GetPainting(url string, wg *sync.WaitGroup, result *models.Searches) error {
	defer wg.Done()

	res, err := http.Get(url)
	if err != nil {
		return err
	}

	body, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return err
	}

	encoded := b64.StdEncoding.EncodeToString(body)
	EI.Data, EI.EncodedData = body, EncodedData(encoded)

	result.IMG = fmt.Sprintf("data:image/jpeg;base64,%v", EI.EncodedData)
	
	return nil
}
func Search(db *gorm.DB) gin.HandlerFunc {
	return func(c *gin.Context) {
		term := c.Param("term")
		var results []models.Searches
		db.Table("searches").Where("to_tsvector(\"searches\".\"Title\" || '' || \"searches\".\"Artist_Name\") @@ plainto_tsquery(?)", term).Find(&results)

		var wg sync.WaitGroup
		var g errgroup.Group

		for i, re := range results {

			var ed utils.EncodedImage
			wg.Add(1)

			g.Go(ed.GetPainting(re.IMG, &wg, &results[i]))
			if err := g.Wait(); err != nil {
				c.JSON(http.StatusInternalServerError, err.Error())
				panic(err)
    		}
		}

		g.Wait()
		c.JSON(http.StatusOK, results)
	}
}
英文:

@Brits had the right answer in the comments above. Setting results[i].IMG within the goroutine was the right solution. I also added the error handling he suggested. Here is the updated code below for anyone who needs it:

Note: I made GetPainting a method of EncodedImage for readability when it's called. It returns an error for errgroup.Group.Go()

func (EI *EncodedImage) GetPainting(url string, wg *sync.WaitGroup, result *models.Searches) error {
	defer wg.Done()

	res, err := http.Get(url)
	if err != nil {
		return err
	}

	body, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return err
	}

	encoded := b64.StdEncoding.EncodeToString(body)
	EI.Data, EI.EncodedData = body, EncodedData(encoded)

	result.IMG = fmt.Sprintf("data:image/jpeg;base64,%v", EI.EncodedData)
	
	return nil
}
func Search(db *gorm.DB) gin.HandlerFunc {
	return func(c *gin.Context) {
		term := c.Param("term")
		var results []models.Searches
		db.Table("searches").Where("to_tsvector(\"searches\".\"Title\" || '' || \"searches\".\"Artist_Name\") @@ plainto_tsquery(?)", term).Find(&results)

		var wg sync.WaitGroup
		var g errgroup.Group

		for i, re := range results {

			var ed utils.EncodedImage
			wg.Add(1)

			g.Go(ed.GetPainting(re.IMG, &wg, &results[i]))
			if err := g.Wait(); err != nil {
				c.JSON(http.StatusInternalServerError, err.Error())
				panic(err)
    		}
		}

		g.Wait()
		c.JSON(http.StatusOK, results)
	}
}

huangapple
  • 本文由 发表于 2022年9月25日 09:21:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/73841532.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定