英文:
Why is my function not waiting for the goroutines to complete?
问题
我有一个函数,它发送一个GET请求,然后将响应和编码后的响应存储在一个结构体中。它接受一个指向等待组的指针。
这是那个函数:
type EncodedData string
type EncodedImage struct {
Data []byte
EncodedData EncodedData
Error error
}
func GetPainting(url string, EI *EncodedImage, wg *sync.WaitGroup) {
defer wg.Done()
res, err := http.Get(url)
if err != nil {
EI.Error = errors.Wrapf(err, "无法从提供的URL %s 获取数据", url)
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
EI.Error = err
}
encoded := b64.StdEncoding.EncodeToString(body)
EI.Data, EI.EncodedData = body, EncodedData(encoded)
}
这是调用上述函数的函数。它是gin路由的处理程序。
func Search(db *gorm.DB) gin.HandlerFunc {
return func(c *gin.Context) {
// 这只是接收一个搜索词,进行查询,然后将结果加载到"results"中。
term := c.Param("term")
var results []models.Searches
db.Table("searches").Where("to_tsvector(\"searches\".\"Title\" || '' || \"searches\".\"Artist_Name\") @@ plainto_tsquery(?)", term).Find(&results)
var wg sync.WaitGroup
// results是一个结构体的切片
for i, re := range results {
var ed EncodedImage
wg.Add(1)
// 这里是上面定义的函数
go GetPainting(re.IMG, &ed, &wg)
if ed.Error != nil {
c.JSON(http.StatusInternalServerError, ed.Error.Error())
panic(ed.Error)
}
results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)
}
wg.Wait()
c.JSON(http.StatusOK, results)
}
}
JSON响应显示"data:image/jpeg;base64,",这意味着goroutine没有等待完成。
在不使用额外的goroutine的情况下,所有这些都可以正常工作。换句话说,当我引入"go"关键字时,事情就停止工作了。我想尝试这样做以加快速度。非常感谢您提供的任何见解或建议!
英文:
I have a function that makes a get request and then store both the response and the encoded response in a struct. It takes in a pointer to a wait group
Here is that function
type EncodedData string
type EncodedImage struct {
Data []byte
EncodedData EncodedData
Error error
}
func GetPainting(url string, EI *EncodedImage, wg *sync.WaitGroup) {
defer wg.Done()
res, err := http.Get(url)
if err != nil {
EI.Error = errors.Wrapf(err, "unable to fetch from provided url %s", url)
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
EI.Error = err
}
encoded := b64.StdEncoding.EncodeToString(body)
EI.Data, EI.EncodedData = body, EncodedData(encoded)
}
Here is the function that calls the previous function. It's a handler for a gin router.
func Search(db *gorm.DB) gin.HandlerFunc {
return func(c *gin.Context) {
// this is just receiving a search term, making a query, and then loading it into "results".
term := c.Param("term")
var results []models.Searches
db.Table("searches").Where("to_tsvector(\"searches\".\"Title\" || '' || \"searches\".\"Artist_Name\") @@ plainto_tsquery(?)", term).Find(&results)
var wg sync.WaitGroup
// results is an slice of structs
for i, re := range results {
var ed EncodedImage
wg.Add(1)
// here is the function defined above
go GetPainting(re.IMG, &ed, &wg)
if ed.Error != nil {
c.JSON(http.StatusInternalServerError, ed.Error.Error())
panic(ed.Error)
}
results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)
}
wg.Wait()
c.JSON(http.StatusOK, results)
}
The JSON response shows "data:image/jpeg;base64," which means the goroutines aren't being waited on to completion
This all works without using additional goroutines. In other words, things stopped working when I introduced the go keyword. I wanted to try this to speed things up. Any insight or advice is greatly appreciated!
答案1
得分: 1
问题在这里:
go GetPainting(re.IMG, &ed, &wg) // goroutine alters ed
...
results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)
“go
语句启动一个函数调用的执行,作为一个独立的并发线程控制...”(来源);你不应该假设goroutine何时执行任何操作。所以可能会发生以下情况(我没有详细查看goroutine的管理方式):
go GetPainting(re.IMG, &ed, &wg)
- 运行时调度GetPainting
运行。results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)
运行(ed.EndodedData
仍然为nil
)。GetPainting
运行。
你创建了一个数据竞争;也就是说,你有一个goroutine在写入ed.EncodedData
,另一个goroutine在没有同步的情况下从中读取。通常很难预测在发生竞争时会发生什么;但在这种情况下,你的goroutine正在执行IO操作(http.Get
),所以很可能写入会发生在读取之后。
为了帮助解释这个问题(以及潜在的解决方案),让我们简化你的示例(playground):
func routine(wg *sync.WaitGroup, val *int) {
defer wg.Done()
time.Sleep(time.Microsecond)
*val = rand.Int()
}
func main() {
const iterations = 5
var wg sync.WaitGroup
wg.Add(iterations)
r := make([]int, iterations)
results := make([]string, iterations)
for i := 0; i < 5; i++ {
go routine(&wg, &r[i])
results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
}
wg.Wait()
for i := 0; i < 5; i++ {
fmt.Println(r[i], results[i])
}
}
如你所见,在WaitGroup
完成后,r
(类似于你的ed
)被填充,但results
包含了全部为0的值。这指向了一个简单的解决方案(playground):
for i := 0; i < 5; i++ {
go routine(&wg, &r[i])
}
wg.Wait()
results := make([]string, iterations)
for i := 0; i < 5; i++ {
results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
}
for i := 0; i < 5; i++ {
fmt.Println(r[i], results[i])
}
这个解决方案有效,因为在你知道goroutine完成之前,你没有访问它们写入的任何内容(通过WaitGroup
)。将这种方法转移到你的代码中相对简单(创建一个utils.EncodedImage
的切片,在wg.Wait()
之后检查错误/结果)。
虽然上述方法有效,但它在所有goroutine完成之前永远不会完成。通常情况下,这是不可取的,例如,如果接收到一个错误是致命的,那么你可能希望尽快向用户返回响应(并停止任何正在进行的工作)。
处理这个问题有多种方法。将Context
传递给函数是一种非常常见的方法,可以使你在何时停止它们时发出信号(对于你的用例,参见NewRequestWithContext)。在处理响应时,你可以自己编写代码(但很容易泄漏goroutine),或者使用类似golang.org/x/sync/errgroup
的东西。这是一个示例(playground):
func routine(ctx context.Context, val *int) error {
select {
case <-time.After(time.Microsecond * time.Duration(rand.Intn(20))): // select will exit after a number of Milliseconds
case <-ctx.Done(): // unless this is met (operation cancelled)
fmt.Println("GoRoutine ending due to context")
return ctx.Err()
}
*val = rand.Int()
fmt.Println("generated ", *val)
if simulateErrors && *val > (math.MaxInt/2) {
return errors.New("Number too big")
}
return nil
}
func main() {
const iterations = 5
// In your case source context should probably come from gin.Context so the operation is cancelled if the connection drops
g, ctx := errgroup.WithContext(context.Background())
r := make([]int, iterations)
for i := 0; i < iterations; i++ {
x := &r[i]
g.Go(func() error {
return routine(ctx, x)
})
}
if err := g.Wait(); err != nil {
fmt.Println("Got an error!", err)
return // Here you send error as response (you might want to send something generic to avoid leaking system detail)
}
// Everything has processed OK
results := make([]string, iterations)
for i := 0; i < iterations; i++ {
results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
fmt.Println(r[i], results[i])
}
}
*注意:*在生产代码中使用panic
要小心。在你的示例中,当HTTP Get失败时,你正在使用panic
;这是可能发生的情况,如果发生这种情况,你不希望应用程序关闭(向最终用户返回一个合理的错误,并可能记录失败)。虽然可以捕获panic,但通常最好在检测到错误时处理它们。
英文:
The issue is here:
go GetPainting(re.IMG, &ed, &wg) // goroutine alters ed
...
results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)
"A go
statement starts the execution of a function call as an independent concurrent thread of control..." (source); you should not make assumptions as to when the goroutine will perform any action. So what might (I have not looked exactly how goroutines are currently managed) happen is something like:
go GetPainting(re.IMG, &ed, &wg)
- runtime schedulesGetPainting
to run.results[i].IMG = fmt.Sprintf("data:image/jpeg;base64,%v", ed.EncodedData)
runs (ed.EndodedData
is stillnil
).GetPainting
runs.
You have created a data race; that is you have one goroutine writing to ed.EncodedData
and another reading from it without synchronisation. Generally it's difficult to predict what will happen when there is a race; but in this case your goroutine is performing IO (http.Get
) so it's very probable that the write will occur after the read.
To help explain this (and potential solutions) lets simplify your example (playground):
func routine(wg *sync.WaitGroup, val *int) {
defer wg.Done()
time.Sleep(time.Microsecond)
*val = rand.Int()
}
func main() {
const iterations = 5
var wg sync.WaitGroup
wg.Add(iterations)
r := make([]int, iterations)
results := make([]string, iterations)
for i := 0; i < 5; i++ {
go routine(&wg, &r[i])
results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
}
wg.Wait()
for i := 0; i < 5; i++ {
fmt.Println(r[i], results[i])
}
}
As you will see after the WaitGroup
is done r
(similar to your ed
) is populated but results
contains all 0 values. This points towards a simple solution (playground):
for i := 0; i < 5; i++ {
go routine(&wg, &r[i])
}
wg.Wait()
results := make([]string, iterations)
for i := 0; i < 5; i++ {
results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
}
for i := 0; i < 5; i++ {
fmt.Println(r[i], results[i])
}
This works because you are not accessing anything that the goroutines write to before you know that they are finished (via the WaitGroup
). It's fairly simple to transfer this method into your code (create a slice of utils.EncodedImage
and check for errors/results after the wg.Wait()
).
While the above works it will never complete before all goroutines complete. Often that is not desirable, for instance, if receiving one error is fatal then you probably want to return a response to the user (and stop any ongoing work) as soon as the error is received.
There are a range of ways of dealing with this. Passing functions a Context
is a very common means of enabling you to signal when they should stop (for your use-case see NewRequestWithContext). When it comes to handling the responses you can code this yourself (but it is easy to leak goroutines) or use something like golang.org/x/sync/errgroup
. Here is an example (playground):
func routine(ctx context.Context, val *int) error {
select {
case <-time.After(time.Microsecond * time.Duration(rand.Intn(20))): // select will exit after a number of Milliseconds
case <-ctx.Done(): // unless this is met (operation cancelled)
fmt.Println("GoRoutine ending due to context")
return ctx.Err()
}
*val = rand.Int()
fmt.Println("generated ", *val)
if simulateErrors && *val > (math.MaxInt/2) {
return errors.New("Number too big")
}
return nil
}
func main() {
const iterations = 5
// In your case source context should probably come from gin.Context so the operation is cancelled if the connection drops
g, ctx := errgroup.WithContext(context.Background())
r := make([]int, iterations)
for i := 0; i < iterations; i++ {
x := &r[i]
g.Go(func() error {
return routine(ctx, x)
})
}
if err := g.Wait(); err != nil {
fmt.Println("Got an error!", err)
return // Here you send error as response (you might want to send something generic to avoid leaking system detail)
}
// Everything has processed OK
results := make([]string, iterations)
for i := 0; i < iterations; i++ {
results[i] = fmt.Sprintf("data:image/jpeg;base64,%d", r[i])
fmt.Println(r[i], results[i])
}
}
Note: Be careful using panic
in production code. In your example you are doing this when an HTTP Get fails; this is something that is likely to happen at some point and you don't really want your application to shutdown if it does (return a sensible error to the end user and perhaps log the failure). It is possible to catch panics but its generally best to deal with errors as they are detected.
答案2
得分: 0
@Brits在上面的评论中给出了正确的答案。在goroutine中设置results[i].IMG
是正确的解决方案。我还添加了他建议的错误处理。以下是更新后的代码,供需要的人使用:
注意:我将GetPainting方法设置为EncodedImage的一个方法,以提高可读性。它为errgroup.Group.Go()返回一个错误。
func (EI *EncodedImage) GetPainting(url string, wg *sync.WaitGroup, result *models.Searches) error {
defer wg.Done()
res, err := http.Get(url)
if err != nil {
return err
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
encoded := b64.StdEncoding.EncodeToString(body)
EI.Data, EI.EncodedData = body, EncodedData(encoded)
result.IMG = fmt.Sprintf("data:image/jpeg;base64,%v", EI.EncodedData)
return nil
}
func Search(db *gorm.DB) gin.HandlerFunc {
return func(c *gin.Context) {
term := c.Param("term")
var results []models.Searches
db.Table("searches").Where("to_tsvector(\"searches\".\"Title\" || '' || \"searches\".\"Artist_Name\") @@ plainto_tsquery(?)", term).Find(&results)
var wg sync.WaitGroup
var g errgroup.Group
for i, re := range results {
var ed utils.EncodedImage
wg.Add(1)
g.Go(ed.GetPainting(re.IMG, &wg, &results[i]))
if err := g.Wait(); err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
panic(err)
}
}
g.Wait()
c.JSON(http.StatusOK, results)
}
}
英文:
@Brits had the right answer in the comments above. Setting results[i].IMG
within the goroutine was the right solution. I also added the error handling he suggested. Here is the updated code below for anyone who needs it:
Note: I made GetPainting a method of EncodedImage for readability when it's called. It returns an error for errgroup.Group.Go()
func (EI *EncodedImage) GetPainting(url string, wg *sync.WaitGroup, result *models.Searches) error {
defer wg.Done()
res, err := http.Get(url)
if err != nil {
return err
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
encoded := b64.StdEncoding.EncodeToString(body)
EI.Data, EI.EncodedData = body, EncodedData(encoded)
result.IMG = fmt.Sprintf("data:image/jpeg;base64,%v", EI.EncodedData)
return nil
}
func Search(db *gorm.DB) gin.HandlerFunc {
return func(c *gin.Context) {
term := c.Param("term")
var results []models.Searches
db.Table("searches").Where("to_tsvector(\"searches\".\"Title\" || '' || \"searches\".\"Artist_Name\") @@ plainto_tsquery(?)", term).Find(&results)
var wg sync.WaitGroup
var g errgroup.Group
for i, re := range results {
var ed utils.EncodedImage
wg.Add(1)
g.Go(ed.GetPainting(re.IMG, &wg, &results[i]))
if err := g.Wait(); err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
panic(err)
}
}
g.Wait()
c.JSON(http.StatusOK, results)
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论