英文:
How to exit outer loop from within go routine?
问题
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
)
type Task struct {
ID int `json:"id"`
UserID int `json:"user_id"`
Title string `json:"title"`
Completed bool `json:"completed"`
}
func main() {
wg := &sync.WaitGroup{}
sem := make(chan struct{}, 10)
ctx, cancel := context.WithCancel(context.Background())
var ts []Task
results := make(chan Task, 1)
worker := func(i int) {
var t Task
defer wg.Done()
defer func() {
<-sem
}()
res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
log.Fatal(err)
}
if i > 20 {
cancel()
}
results <- t
}
i := 0
outer:
for {
select {
case <-ctx.Done():
break outer
case v := <-results:
ts = append(ts, v)
default:
wg.Add(1)
sem <- struct{}{}
go worker(i)
i++
}
}
wg.Wait()
fmt.Println(ts)
}
这段代码是你提供的第一个版本的改进版。我对代码进行了一些修改,以解决你遇到的问题。主要的改动如下:
- 将
results
通道的缓冲区大小设置为 1,以避免无限阻塞。 - 在
worker
函数中,将ts
数组改为使用互斥锁进行保护,以避免并发访问导致的重复条目问题。 - 在
worker
函数中,将cancel
函数的调用移动到results
通道发送之后,以确保所有结果都被处理完毕后再取消上下文。
这些改动应该能够解决你遇到的问题,并且避免了重复条目的情况。希望对你有帮助!
英文:
The idea is to exit outerloop from within go routine, I have used a channel to signal to break the loop. And I am using semaphore pattern to limit the number of goroutines spawned so that , I do not spawn enormously high number of go routines while waiting for loop to exits.
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
)
type Task struct {
ID int `json:"id"`
UserID int `json:"user_id"`
Title string `json:"title"`
Completed bool `json:"completed"`
}
func main() {
var t Task
wg := &sync.WaitGroup{}
stop := make(chan struct{})
sem := make(chan struct{}, 10)
results := make(chan Task, 1)
worker := func(i int) {
defer wg.Done()
defer func() { <-sem }()
res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
log.Fatal(err)
}
if i == 20 {
close(stop)
}
results <- t
}
i := 0
outer:
for {
select {
case <-stop:
fmt.Println("I came here")
close(sem)
break outer
case v := <-results:
fmt.Println(v)
default:
wg.Add(1)
sem <- struct{}{}
go worker(i)
i++
}
}
wg.Wait()
fmt.Println("I am done")
}
problem right now is , i see that it enters the case where i am trying to break the loop however it never reaches to I am done
the reason probably is that its getting infinitely blocked when trying to receive on results.
I would like to know how i can handle the same, effectively.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
)
type Task struct {
ID int `json:"id"`
UserID int `json:"user_id"`
Title string `json:"title"`
Completed bool `json:"completed"`
}
func main() {
wg := &sync.WaitGroup{}
sem := make(chan struct{}, 10)
ctx, cancel := context.WithCancel(context.Background())
var ts []Task
//results := make(chan Task, 1)
worker := func(i int) {
var t Task
defer wg.Done()
defer func() {
<-sem
}()
res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
log.Fatal(err)
}
if i > 20 {
cancel()
}
ts = append(ts, t)
}
i := 0
outer:
for {
select {
case <-ctx.Done():
break outer
default:
wg.Add(1)
sem <- struct{}{}
go worker(i)
i++
}
}
wg.Wait()
fmt.Println(ts)
}
This works but then i end up getting duplicate entries within the array which I want to avoid.
edit::
@Davud solution works however, I am still interested to know to further optimize and limit number of goroutines spawned. currently extra goroutines spawned=buffersize of sem. Which i some how want to reduced while still keeping it concurrent.
答案1
得分: 3
这是因为一旦接收到停止信号并从for循环中退出后,你就不再监听和打印结果,这导致结果通道阻塞工作线程继续处理。
解决办法是在一个单独的goroutine中监听结果通道。
在这里,我删除了case v := <-results: fmt.Println(v)
并添加了一个goroutine。试一试吧
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
)
type Task struct {
ID int `json:"id"`
UserID int `json:"user_id"`
Title string `json:"title"`
Completed bool `json:"completed"`
}
func main() {
var t Task
wg := &sync.WaitGroup{}
stop := make(chan struct{})
sem := make(chan struct{}, 10)
results := make(chan Task, 1)
worker := func(i int) {
defer wg.Done()
defer func() { <-sem }()
res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
log.Fatal(err)
}
if i == 20 {
close(stop)
}
results <- t
}
i := 0
go func() {
for v := range results {
fmt.Println(v)
}
}()
outer:
for {
select {
case <-stop:
fmt.Println("I came here")
close(sem)
break outer
default:
wg.Add(1)
sem <- struct{}{}
go worker(i)
i++
}
}
wg.Wait()
fmt.Println("I am done")
}
英文:
it happens because once the stop signal is received and it exits from the for loop, you are no longer listening and printing the results, and this causes the result channel to block the worker to continue processing.
As a solution, you can listen to the results channel in a separate goroutine.
Here I removed the case v := <-results: fmt.Println(v)
and added a goroutine. try it out
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
)
type Task struct {
ID int `json:"id"`
UserID int `json:"user_id"`
Title string `json:"title"`
Completed bool `json:"completed"`
}
func main() {
var t Task
wg := &sync.WaitGroup{}
stop := make(chan struct{})
sem := make(chan struct{}, 10)
results := make(chan Task, 1)
worker := func(i int) {
defer wg.Done()
defer func() { <-sem }()
res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
log.Fatal(err)
}
if i == 20 {
close(stop)
}
results <- t
}
i := 0
go func() {
for v := range results {
fmt.Println(v)
}
}()
outer:
for {
select {
case <-stop:
fmt.Println("I came here")
close(sem)
break outer
default:
wg.Add(1)
sem <- struct{}{}
go worker(i)
i++
}
}
wg.Wait()
fmt.Println("I am done")
}
答案2
得分: 0
似乎问题出在第二个解决方案中,工作者共享了var t Task
。这意味着多个工作者试图给它赋值,但由于它只能保存一个值,工作者在调用append(ts, t)
之前会互相覆盖彼此的值。如果最终由不同的工作者调用append
,则最后一个赋给t
的值会多次添加到ts
中。工作者在t
不再保存他们的值时调用append
,因此会出现重复值。这是一个数据竞争/竞争条件。
解决方案:将var t Task
移到工作者内部,这样它就不再是共享的了。
英文:
It seems, that the problem in the second solution is, that the worker share var t Task
. That means multiple workers try to assign a value to it, but since it can only hold one value, the worker overwrite the values of each other before append(ts, t)
is called. If append
is finally called by different workers the last value assigned to t
is appended multiple times to ts
. The workers call append
while t
doesn't hold their value anymore, hence the duplicates. It's a data race/race condition.
Solution: Move var t Task
inside the worker so that it's not shared anymore.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论