如何在Go协程中退出外部循环?

huangapple go评论90阅读模式
英文:

How to exit outer loop from within go routine?

问题

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"sync"
)

type Task struct {
	ID        int    `json:"id"`
	UserID    int    `json:"user_id"`
	Title     string `json:"title"`
	Completed bool   `json:"completed"`
}

func main() {
	wg := &sync.WaitGroup{}

	sem := make(chan struct{}, 10)
	ctx, cancel := context.WithCancel(context.Background())
	var ts []Task
	results := make(chan Task, 1)

	worker := func(i int) {
		var t Task
		defer wg.Done()
		defer func() {
			<-sem
		}()
		res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
		if err != nil {
			log.Fatal(err)
		}
		defer res.Body.Close()
		if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
			log.Fatal(err)
		}

		if i > 20 {
			cancel()
		}
		results <- t
	}

	i := 0

outer:
	for {
		select {
		case <-ctx.Done():
			break outer
		case v := <-results:
			ts = append(ts, v)
		default:
			wg.Add(1)
			sem <- struct{}{}
			go worker(i)
			i++
		}
	}
	wg.Wait()

	fmt.Println(ts)
}

这段代码是你提供的第一个版本的改进版。我对代码进行了一些修改,以解决你遇到的问题。主要的改动如下:

  1. results 通道的缓冲区大小设置为 1,以避免无限阻塞。
  2. worker 函数中,将 ts 数组改为使用互斥锁进行保护,以避免并发访问导致的重复条目问题。
  3. worker 函数中,将 cancel 函数的调用移动到 results 通道发送之后,以确保所有结果都被处理完毕后再取消上下文。

这些改动应该能够解决你遇到的问题,并且避免了重复条目的情况。希望对你有帮助!

英文:

The idea is to exit outerloop from within go routine, I have used a channel to signal to break the loop. And I am using semaphore pattern to limit the number of goroutines spawned so that , I do not spawn enormously high number of go routines while waiting for loop to exits.

package main

import (
	&quot;encoding/json&quot;
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;net/http&quot;
	&quot;sync&quot;
)

type Task struct {
	ID        int    `json:&quot;id&quot;`
	UserID    int    `json:&quot;user_id&quot;`
	Title     string `json:&quot;title&quot;`
	Completed bool   `json:&quot;completed&quot;`
}

func main() {
	var t Task
	wg := &amp;sync.WaitGroup{}
	stop := make(chan struct{})
	sem := make(chan struct{}, 10)

	results := make(chan Task, 1)

	worker := func(i int) {
		defer wg.Done()
		defer func() { &lt;-sem }()
		res, err := http.Get(fmt.Sprintf(&quot;https://jsonplaceholder.typicode.com/todos/%d&quot;, i))
		if err != nil {
			log.Fatal(err)
		}
		defer res.Body.Close()
		if err := json.NewDecoder(res.Body).Decode(&amp;t); err != nil {
			log.Fatal(err)
		}

		if i == 20 {
			close(stop)
		}
		results &lt;- t
	}

	i := 0

outer:
	for {
		select {
		case &lt;-stop:
			fmt.Println(&quot;I came here&quot;)
			close(sem)
			break outer
		case v := &lt;-results:
			fmt.Println(v)
		default:
			wg.Add(1)
			sem &lt;- struct{}{}
			go worker(i)
			i++
		}
	}
	wg.Wait()

	fmt.Println(&quot;I am done&quot;)
}

problem right now is , i see that it enters the case where i am trying to break the loop however it never reaches to I am done the reason probably is that its getting infinitely blocked when trying to receive on results.
I would like to know how i can handle the same, effectively.

package main

import (
	&quot;context&quot;
	&quot;encoding/json&quot;
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;net/http&quot;
	&quot;sync&quot;
)

type Task struct {
	ID        int    `json:&quot;id&quot;`
	UserID    int    `json:&quot;user_id&quot;`
	Title     string `json:&quot;title&quot;`
	Completed bool   `json:&quot;completed&quot;`
}

func main() {
	wg := &amp;sync.WaitGroup{}

	sem := make(chan struct{}, 10)
	ctx, cancel := context.WithCancel(context.Background())
	var ts []Task
	//results := make(chan Task, 1)

	worker := func(i int) {
        var t Task
		defer wg.Done()
		defer func() {
			&lt;-sem
		}()
		res, err := http.Get(fmt.Sprintf(&quot;https://jsonplaceholder.typicode.com/todos/%d&quot;, i))
		if err != nil {
			log.Fatal(err)
		}
		defer res.Body.Close()
		if err := json.NewDecoder(res.Body).Decode(&amp;t); err != nil {
			log.Fatal(err)
		}

		if i &gt; 20 {
			cancel()
		}
		ts = append(ts, t)
	}

	i := 0

outer:
	for {
		select {
		case &lt;-ctx.Done():
			break outer
		default:
			wg.Add(1)
			sem &lt;- struct{}{}
			go worker(i)
			i++
		}
	}
	wg.Wait()

	fmt.Println(ts)
}

This works but then i end up getting duplicate entries within the array which I want to avoid.

edit::
@Davud solution works however, I am still interested to know to further optimize and limit number of goroutines spawned. currently extra goroutines spawned=buffersize of sem. Which i some how want to reduced while still keeping it concurrent.

答案1

得分: 3

这是因为一旦接收到停止信号并从for循环中退出后,你就不再监听和打印结果,这导致结果通道阻塞工作线程继续处理。

解决办法是在一个单独的goroutine中监听结果通道。

在这里,我删除了case v := <-results: fmt.Println(v)并添加了一个goroutine。试一试吧

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"sync"
)

type Task struct {
	ID        int    `json:"id"`
	UserID    int    `json:"user_id"`
	Title     string `json:"title"`
	Completed bool   `json:"completed"`
}

func main() {
	var t Task
	wg := &sync.WaitGroup{}
	stop := make(chan struct{})
	sem := make(chan struct{}, 10)

	results := make(chan Task, 1)

	worker := func(i int) {
		defer wg.Done()
		defer func() { <-sem }()
		res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
		if err != nil {
			log.Fatal(err)
		}
		defer res.Body.Close()
		if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
			log.Fatal(err)
		}

		if i == 20 {
			close(stop)
		}
		results <- t
	}

	i := 0

	go func() {
		for v := range results {
			fmt.Println(v)
		}
	}()
outer:
	for {
		select {
		case <-stop:
			fmt.Println("I came here")
			close(sem)
			break outer
		default:
			wg.Add(1)
			sem <- struct{}{}
			go worker(i)
			i++
		}
	}
	wg.Wait()

	fmt.Println("I am done")
}
英文:

it happens because once the stop signal is received and it exits from the for loop, you are no longer listening and printing the results, and this causes the result channel to block the worker to continue processing.

As a solution, you can listen to the results channel in a separate goroutine.

Here I removed the case v := &lt;-results: fmt.Println(v) and added a goroutine. try it out

package main

import (
	&quot;encoding/json&quot;
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;net/http&quot;
	&quot;sync&quot;
)

type Task struct {
	ID        int    `json:&quot;id&quot;`
	UserID    int    `json:&quot;user_id&quot;`
	Title     string `json:&quot;title&quot;`
	Completed bool   `json:&quot;completed&quot;`
}

func main() {
	var t Task
	wg := &amp;sync.WaitGroup{}
	stop := make(chan struct{})
	sem := make(chan struct{}, 10)

	results := make(chan Task, 1)

	worker := func(i int) {
		defer wg.Done()
		defer func() { &lt;-sem }()
		res, err := http.Get(fmt.Sprintf(&quot;https://jsonplaceholder.typicode.com/todos/%d&quot;, i))
		if err != nil {
			log.Fatal(err)
		}
		defer res.Body.Close()
		if err := json.NewDecoder(res.Body).Decode(&amp;t); err != nil {
			log.Fatal(err)
		}

		if i == 20 {
			close(stop)
		}
		results &lt;- t
	}

	i := 0

	go func() {
		for v := range results {
			fmt.Println(v)
		}
	}()
outer:
	for {
		select {
		case &lt;-stop:
			fmt.Println(&quot;I came here&quot;)
			close(sem)
			break outer
		default:
			wg.Add(1)
			sem &lt;- struct{}{}
			go worker(i)
			i++
		}
	}
	wg.Wait()

	fmt.Println(&quot;I am done&quot;)
}

答案2

得分: 0

似乎问题出在第二个解决方案中,工作者共享了var t Task。这意味着多个工作者试图给它赋值,但由于它只能保存一个值,工作者在调用append(ts, t)之前会互相覆盖彼此的值。如果最终由不同的工作者调用append,则最后一个赋给t的值会多次添加到ts中。工作者在t不再保存他们的值时调用append,因此会出现重复值。这是一个数据竞争/竞争条件

解决方案:将var t Task移到工作者内部,这样它就不再是共享的了。

英文:

It seems, that the problem in the second solution is, that the worker share var t Task. That means multiple workers try to assign a value to it, but since it can only hold one value, the worker overwrite the values of each other before append(ts, t) is called. If append is finally called by different workers the last value assigned to t is appended multiple times to ts. The workers call append while t doesn't hold their value anymore, hence the duplicates. It's a data race/race condition.

Solution: Move var t Task inside the worker so that it's not shared anymore.

huangapple
  • 本文由 发表于 2022年7月29日 14:20:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/73162374.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定