英文:
How do I terminate an infinite loop from inside of a goroutine?
问题
我正在使用Go编写一个与Spotify API交互的应用程序,发现自己需要使用一个无限循环来调用一个端点,直到返回的切片长度小于限制,表示已经到达可用条目的末尾。
对于我的用户账户,有1644个已保存的专辑(我通过循环遍历而不使用goroutine来确定的)。然而,当我添加goroutine时,我得到了2544个带有重复项的已保存专辑。我还使用了信号量模式来限制goroutine的数量,以避免超过速率限制。
我认为问题在于使用active
变量而不是通道,但我尝试使用通道后只得到了一个无限循环。
wg := &sync.WaitGroup{}
sem := make(chan bool, 20)
active := true
offset := 0
for {
sem <- true
if active {
// 将每个新的goroutine添加到waitgroup
wg.Add(1)
go func() error {
// 当goroutine完成时从waitgroup中移除
defer wg.Done()
// 释放worker
defer func() { <-sem }()
savedAlbums, err := client.CurrentUsersAlbums(ctx, spotify.Limit(50), spotify.Offset(offset))
if err != nil {
return err
}
userAlbums = append(userAlbums, savedAlbums.Albums...)
if len(savedAlbums.Albums) < 50 {
// 由于限制设置为50,如果返回的专辑数量小于50,我们知道已经完成数据检索
active = false
return nil
} else {
offset += 50
return nil
}
}()
} else {
wg.Wait()
break
}
}
提前感谢!
英文:
I'm writing an app using Go that is interacting with Spotify's API and I find myself needing to use an infinite for loop to call an endpoint until the length of the returned slice is less than the limit, signalling that I've reached the end of the available entries.
For my user account, there are 1644 saved albums (I determined this by looping through without using goroutines). However, when I add goroutines in, I'm getting back 2544 saved albums with duplicates. I'm also using the semaphore pattern to limit the number of goroutines so that I don't exceed the rate limit.
I assume that the issue is with using the active
variable rather than channels, but my attempt at that just resulted in an infinite loop
wg := &sync.WaitGroup{}
sem := make(chan bool, 20)
active := true
offset := 0
for {
sem <- true
if active {
// add each new goroutine to waitgroup
wg.Add(1)
go func() error {
// remove from waitgroup when goroutine is complete
defer wg.Done()
// release the worker
defer func() { <-sem }()
savedAlbums, err := client.CurrentUsersAlbums(ctx, spotify.Limit(50), spotify.Offset(offset))
if err != nil {
return err
}
userAlbums = append(userAlbums, savedAlbums.Albums...)
if len(savedAlbums.Albums) < 50 {
// since the limit is set to 50, we know that if the number of returned albums
// is less than 50 that we're done retrieving data
active = false
return nil
} else {
offset += 50
return nil
}
}()
} else {
wg.Wait()
break
}
}
Thanks in advance!
答案1
得分: 0
我怀疑你的主要问题可能是对go
关键字的理解有误;根据文档:
> "go"语句启动一个函数调用的执行,作为一个独立的并发线程或goroutine,在同一地址空间内。
因此,go func() error {
启动了闭包的执行;这并不意味着任何代码会立即运行。实际上,由于client.CurrentUsersAlbums
需要一段时间,你很可能会请求前50个项目20次。可以通过简化版本的应用程序来演示这一点(playground):
func main() {
wg := &sync.WaitGroup{}
sem := make(chan bool, 20)
active := true
offset := 0
for {
sem <- true
if active {
// 将每个新的goroutine添加到waitgroup
wg.Add(1)
go func() error {
// 当goroutine完成时从waitgroup中移除
defer wg.Done()
// 释放worker
defer func() { <-sem }()
fmt.Println("Getting from:", offset)
time.Sleep(time.Millisecond) // 模拟查询
// 假设我们得到了50个相册
offset += 50
if offset > 2000 {
active = false
}
return nil
}()
} else {
wg.Wait()
break
}
}
}
运行这段代码将产生一些不可预测的结果(请注意,playground会缓存结果,所以请在你的机器上尝试)。但你可能会看到20次Getting from: 0
。
另一个问题是数据竞争。在没有保护的情况下从多个goroutine更新变量(例如sync.Mutex
)会导致未定义的行为。
你需要知道如何解决这个问题,但不幸的是,你需要重新思考你的算法。目前你的流程是:
- 将
pos
设置为0 - 从
pos
开始获取50条记录 - 如果我们得到了50条记录,则
pos=pos+50
并回到步骤2
这是一个顺序算法;只有在请求了前一部分后,你才知道是否已经获取了所有数据。我猜你可以进行推测性查询(并处理失败),但更好的解决方案是找到一种方法来确定预期的结果数量,然后将查询拆分为多个goroutine以获取该数量的记录。
请注意,如果你知道响应的数量,你可以使用以下方法(playground):
noOfResultsToGet := 1644 // 在下面的代码中,我们获取0-1643
noOfResultsPerRequest := 50
noOfSimultaneousRequests := 20 // 你可能不需要这个,但许多服务会限制你可以同时发出的请求的数量(或者至少限制请求的速率)
requestChan := make(chan int) // 将传递起始编号
responseChan := make(chan []string) // 从我们正在进行的任何请求中得到的响应(实际上可以是任何类型)
// 启动goroutine进行请求
var wg sync.WaitGroup
wg.Add(noOfSimultaneousRequests)
for i := 0; i < noOfSimultaneousRequests; i++ {
go func(routineNo int) {
defer wg.Done()
for startPos := range requestChan {
// 模拟进行请求
maxResult := startPos + noOfResultsPerRequest
if maxResult > noOfResultsToGet {
maxResult = noOfResultsToGet
}
rsp := make([]string, 0, noOfResultsPerRequest)
for x := startPos; x < maxResult; x++ {
rsp = append(rsp, strconv.Itoa(x))
}
responseChan <- rsp
fmt.Printf("Goroutine %d handling data from %d to %d\n", routineNo, startPos, startPos+noOfResultsPerRequest)
}
}(i)
}
// 当所有goroutine关闭时关闭响应通道
go func() {
wg.Wait()
close(responseChan)
}()
// 发送请求
go func() {
for reqFrom := 0; reqFrom < noOfResultsToGet; reqFrom += noOfResultsPerRequest {
requestChan <- reqFrom
}
close(requestChan) // 允许goroutine退出
}()
// 接收响应(请注意,这些可能是无序的)
result := make([]string, 0, noOfResultsToGet)
for x := range responseChan {
result = append(result, x...)
}
// 对结果进行排序并输出(来自goroutine的结果可能以任何顺序返回)
sort.Slice(result, func(i, j int) bool {
a, _ := strconv.Atoi(result[i])
b, _ := strconv.Atoi(result[j])
return a < b
})
fmt.Printf("Result: %v", result)
使用通道传递消息通常使这种情况更容易思考,并减少出错的机会。
<details>
<summary>英文:</summary>
I suspect that your main issue may be a misunderstanding of what the `go` keyword does; from [the docs](https://go.dev/ref/spec#Go_statements):
>A "go" statement starts the execution of a function call as an independent concurrent thread of control, or goroutine, within the same address space.
So `go func() error {` **starts** the execution of the closure; it does not mean that any of the code runs immediately. In fact because, `client.CurrentUsersAlbums` will take a while, it's likely you will be requesting the first 50 items 20 times. This can be demonstrated with a simplified version of your application ([playground](https://go.dev/play/p/D1hL3k2tvxO))
func main() {
wg := &sync.WaitGroup{}
sem := make(chan bool, 20)
active := true
offset := 0
for {
sem <- true
if active {
// add each new goroutine to waitgroup
wg.Add(1)
go func() error {
// remove from waitgroup when goroutine is complete
defer wg.Done()
// release the worker
defer func() { <-sem }()
fmt.Println("Getting from:", offset)
time.Sleep(time.Millisecond) // Simulate the query
// Pretend that we got back 50 albums
offset += 50
if offset > 2000 {
active = false
}
return nil
}()
} else {
wg.Wait()
break
}
}
}
Running this will produce somewhat unpredictable results (note that the playground caches results so try it on your machine) but you will probably see 20 X `Getting from: 0`.
A further issue is [data races](https://go.dev/doc/articles/race_detector). Updating a variable from multiple goroutines without protection (e.g. [`sync.Mutex`](https://go.dev/tour/concurrency/9)) results in undefined behaviour.
You will want to know how to fix this but unfortunately you will need to rethink your algorithm. Currently the process you are following is:
1. Set `pos` to 0
2. Get `50` records starting from `pos`
3. If we got `50` records then `pos=pos+50` and `loop back to step 2`
This is a sequential algorithm; you don't know whether you have all of the data until you have requested the previous section. I guess you could make speculative queries (and handle failures) but a better solution would be to find some way to determine the number of results expected and then split the queries to get that number of records between multiple goroutines.
Note that if you do know the number of responses then you can do something like the following ([playground](https://go.dev/play/p/A9oYB6YUG2Q)):
noOfResultsToGet := 1644 // In the below we are getting 0-1643
noOfResultsPerRequest := 50
noOfSimultaneousRequests := 20 // You may not need this but many services will limit the number of simultaneous requests you can make (or, at least, rate limit them)
requestChan := make(chan int) // Will be passed the starting #
responseChan := make(chan []string) // Response from whatever request we are making (can be any type really)
// Start goroutines to make the requests
var wg sync.WaitGroup
wg.Add(noOfSimultaneousRequests)
for i := 0; i < noOfSimultaneousRequests; i++ {
go func(routineNo int) {
defer wg.Done()
for startPos := range requestChan {
// Simulate making the request
maxResult := startPos + noOfResultsPerRequest
if maxResult > noOfResultsToGet {
maxResult = noOfResultsToGet
}
rsp := make([]string, 0, noOfResultsPerRequest)
for x := startPos; x < maxResult; x++ {
rsp = append(rsp, strconv.Itoa(x))
}
responseChan <- rsp
fmt.Printf("Goroutine %d handling data from %d to %d\n", routineNo, startPos, startPos+noOfResultsPerRequest)
}
}(i)
}
// Close the response channel when all goroutines have shut down
go func() {
wg.Wait()
close(responseChan)
}()
// Send the requests
go func() {
for reqFrom := 0; reqFrom < noOfResultsToGet; reqFrom += noOfResultsPerRequest {
requestChan <- reqFrom
}
close(requestChan) // Allow goroutines to exit
}()
// Receive responses (note that these may be out of order)
result := make([]string, 0, noOfResultsToGet)
for x := range responseChan {
result = append(result, x...)
}
// Order the results and output (results from gorouting may come back in any order)
sort.Slice(result, func(i, j int) bool {
a, _ := strconv.Atoi(result[i])
b, _ := strconv.Atoi(result[j])
return a < b
})
fmt.Printf("Result: %v", result)
Relying on channels to pass messages often makes this kind of thing easier to think about and reduces the chance that you will make a mistake.
</details>
# 答案2
**得分**: 0
1. 将 offset 设置为一个参数 -> `go func(offset int) error {`。
2. 在调用 go func 后将 offset 增加 50。
3. 将 active 类型更改为 `chan bool`。
4. 为了避免在 `userAlbums = append(userAlbums, res...)` 上发生数据竞争,我们需要 `创建一个与 userAlbums 相同类型的通道`,然后在 goroutine 内部运行 for 循环,然后将结果发送到该通道。
这是示例:https://go.dev/play/p/yzk8qCURZFC
如果应用于你的代码:
```go
wg := &sync.WaitGroup{}
worker := 20
active := make(chan bool, worker)
for i := 0; i < worker; i++ {
active <- true
}
// 我假设 userAlbums 的类型是 []string
resultsChan := make(chan []string, worker)
go func() {
offset := 0
for {
if <-active {
// 将每个新的 goroutine 添加到 waitgroup
wg.Add(1)
go func(offset int) error {
// 当 goroutine 完成时从 waitgroup 中移除
defer wg.Done()
savedAlbums, err := client.CurrentUsersAlbums(ctx, spotify.Limit(50), spotify.Offset(offset))
if err != nil {
// active <- false // 可能你需要这个
return err
}
resultsChan <- savedAlbums.Albums
if len(savedAlbums.Albums) < 50 {
// 由于限制设置为 50,如果返回的专辑数量小于 50,我们知道已经完成了数据的检索
active <- false
return nil
} else {
active <- true
return nil
}
}(offset)
offset += 50
} else {
wg.Wait()
close(resultsChan)
break
}
}
}()
for res := range resultsChan {
userAlbums = append(userAlbums, res...)
}
英文:
- Set offset as an args ->
go func(offset int) error {
. - Increment offset by 50 after calling go func
- Change active type to
chan bool
- To avoid data race on
userAlbums = append(userAlbums, res...)
. We need tocreate channel that same type as userAlbums
, then run for loop inside goroutine, then send the results to that channel.
this is the example : https://go.dev/play/p/yzk8qCURZFC
if applied to your code :
wg := &sync.WaitGroup{}
worker := 20
active := make(chan bool, worker)
for i := 0; i < worker; i++ {
active <- true
}
// I assume the type of userAlbums is []string
resultsChan := make(chan []string, worker)
go func() {
offset := 0
for {
if <-active {
// add each new goroutine to waitgroup
wg.Add(1)
go func(offset int) error {
// remove from waitgroup when goroutine is complete
defer wg.Done()
savedAlbums, err := client.CurrentUsersAlbums(ctx, spotify.Limit(50), spotify.Offset(offset))
if err != nil {
// active <- false // maybe you need this
return err
}
resultsChan <- savedAlbums.Albums
if len(savedAlbums.Albums) < 50 {
// since the limit is set to 50, we know that if the number of returned albums
// is less than 50 that we're done retrieving data
active <- false
return nil
} else {
active <- true
return nil
}
}(offset)
offset += 50
} else {
wg.Wait()
close(resultsChan)
break
}
}
}()
for res := range resultsChan {
userAlbums = append(userAlbums, res...)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论