同步缓冲通道和等待组

huangapple go评论85阅读模式
英文:

Synchronize Buffered channel and Waitgroup

问题

我在使用带有缓冲通道的waitgroup时遇到了问题。问题是waitgroup在通道完全读取之前就关闭了,导致我的通道只读取了一半并在中途中断。

func main() {
	var wg sync.WaitGroup
	var err error

	start := time.Now()
	students := make([]studentDetails, 0)
	studentCh := make(chan studentDetail, 10000)
	errorCh := make(chan error, 1)

	wg.Add(1)

	go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
	go func(ch chan studentDetail, e chan error) {
	
    LOOP:
   	    for {
			select {
			case p, ok := <-ch:
				if ok {
					L.Printf("Links %s: [%s]\n", p.title, p.link)
					students = append(students, p)
				} else {
					L.Print("Closed channel")
					break LOOP
				}
			case err = <-e:
				if err != nil {
					break
				}
			}
		}
	}(studentCh, errorCh)
	wg.Wait()
	close(studentCh)
	close(errorCh)
	L.Warnln("closed: all wait-groups completed!")
	L.Warnf("total items fetched: %d", len(students))

	elapsed := time.Since(start)
	L.Warnf("operation took %s", elapsed)
}

问题在于这个函数是递归的。我的意思是一些http调用来获取学生,然后根据条件进行更多的调用。

func (s Student) getDetailStudents(rCtx context.Context, content chan<- studentDetail, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) {
	util.MustNotNil(rCtx)
	L := logger.GetLogger(rCtx)
	defer func() {
		L.Println("Closing all waitgroup!")
		wg.Done()
	}()

	wc := getWC()
	httpClient := wc.Registry.MustHTTPClient()
	res, err := httpClient.Get(url)
	if err != nil {
		L.Fatal(err)
	}
	defer res.Body.Close()
	if res.StatusCode != 200 {
		L.Errorf("status code error: %d %s", res.StatusCode, res.Status)
		errorCh <- errors.New("service_status_code")
		return
	}

    // 解析响应并通过errorCh返回错误(如果有)
    // 根据响应决定页面子部分是否更多
	if !subSection {
		wg.Add(1)
		go s.getDetailStudents(rCtx, content, errorCh, wg, link, true)
		// L.Warnf("total pages found %d", pageSub.Length()+1)
	}

	// 从响应列表中找到学生并解析每个学生
 	students := s.parseStudentItemList(rCtx, item)
	for _, student := range students {
		content <- student
	}
 
	L.Warnf("Calling HTTP Service for %q with total %d record", url, elementsSub.Length())
}

为了避免原始代码库,变量已更改。

问题在于学生被随机读取,而不是等到Waitgroup完成后再执行。我希望在读取所有学生之前暂停执行,在出现错误时立即中断。

英文:

I am having issue while using waitgroup with the buffered channel. The problem is waitgroup closes before channel is read completely, which make my channel is half read and break in between.

func main() {
	var wg sync.WaitGroup
	var err error

	start := time.Now()
	students := make([]studentDetails, 0)
	studentCh := make(chan studentDetail, 10000)
	errorCh := make(chan error, 1)

	wg.Add(1)

	go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
	go func(ch chan studentDetail, e chan error) {
	
    LOOP:
   	    for {
			select {
			case p, ok := <-ch:
				if ok {
					L.Printf("Links %s: [%s]\n", p.title, p.link)
					students = append(students, p)
				} else {
					L.Print("Closed channel")
					break LOOP
				}
			case err = <-e:
				if err != nil {
					break
				}
			}
		}
	}(studentCh, errorCh)
	wg.Wait()
	close(studentCh)
	close(errorCh)
	L.Warnln("closed: all wait-groups completed!")
	L.Warnf("total items fetched: %d", len(students))

	elapsed := time.Since(start)
	L.Warnf("operation took %s", elapsed)
}

The problem is this function is recursive. I mean some http call to fetch students and then make more calls depending on condition.

func (s Student) getDetailStudents(rCtx context.Context, content chan<- studentDetail, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) {
	util.MustNotNil(rCtx)
	L := logger.GetLogger(rCtx)
	defer func() {
		L.Println("Closing all waitgroup!")
		wg.Done()
	}()

	wc := getWC()
	httpClient := wc.Registry.MustHTTPClient()
	res, err := httpClient.Get(url)
	if err != nil {
		L.Fatal(err)
	}
	defer res.Body.Close()
	if res.StatusCode != 200 {
		L.Errorf("status code error: %d %s", res.StatusCode, res.Status)
		errorCh <- errors.New("service_status_code")
		return
	}

    // parse response and return error if found some through errorCh as done above.
    // decide page subSection based on response if it is more.
	if !subSection {
		wg.Add(1)
		go s.getDetailStudents(rCtx, content, errorCh, wg, link, true)
		// L.Warnf("total pages found %d", pageSub.Length()+1)
	}

	// Find students from response list and parse each Student
 	students := s.parseStudentItemList(rCtx, item)
	for _, student := range students {
		content <- student
	}
 
	L.Warnf("Calling HTTP Service for %q with total %d record", url, elementsSub.Length())
}

Variables are changed to avoid original code base.

The problem is students are read randomly as soon as Waitgroup complete. I am expecting to hold the execution until all students are read, In case of error it should break as soon error encounter.

答案1

得分: 1

你需要知道接收goroutine何时完成。WaitGroup可以为生成goroutine完成这个任务。因此,你可以使用两个WaitGroup:

wg.Add(1)
go s.getDetailStudents(rCtx, studentCh, errorCh, &wg, s.Link, false)
wgReader.Add(1)
go func(ch chan studentDetail, e chan error) {
    defer wgReader.Done()
    ...
}
wg.Wait()
close(studentCh)
close(errorCh)
wgReader.Wait() // 等待读取goroutine完成
英文:

You need to know when the receiving goroutine completes. The WaitGroup does that for the generating goroutine. So, you can use two waitgroups:

wg.Add(1)
go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
wgReader.Add(1)
go func(ch chan studentDetail, e chan error) {
defer wgReader.Done()
...
}
wg.Wait()
close(studentCh)
close(errorCh)
wgReader.Wait() // Wait for the readers to complete
</details>
# 答案2
**得分**: 1
由于您正在使用带缓冲的通道,因此在关闭通道后,您可以检索剩余的值。您还需要一种机制,防止主函数在读取器仍在工作时过早退出,正如@Burak Serdar所建议的那样。
我重构了代码以提供一个可工作的示例,但它应该能够传达要点。
```go
package main
import (
"context"
"log"
"sync"
"time"
)
type studentDetails struct {
title string
link  string
}
func main() {
var wg sync.WaitGroup
var err error
students := make([]studentDetails, 0)
studentCh := make(chan studentDetails, 10000)
errorCh := make(chan error, 1)
start := time.Now()
wg.Add(1)
go getDetailStudents(context.TODO(), studentCh, errorCh, &wg, "http://example.com", false)
LOOP:
for {
select {
case p, ok := <-studentCh:
if ok {
log.Printf("Links %s: [%s]\n", p.title, p.link)
students = append(students, p)
} else {
log.Println("Draining student channel")
for p := range studentCh {
log.Printf("Links %s: [%s]\n", p.title, p.link)
students = append(students, p)
}
break LOOP
}
case err = <-errorCh:
if err != nil {
break LOOP
}
case <-wrapWait(&wg):
close(studentCh)
}
}
close(errorCh)
elapsed := time.Since(start)
log.Printf("operation took %s", elapsed)
}
func getDetailStudents(rCtx context.Context, content chan<- studentDetails, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) {
defer func() {
log.Println("Closing")
wg.Done()
}()
if !subSection {
wg.Add(1)
go getDetailStudents(rCtx, content, errorCh, wg, url, true)
// L.Warnf("total pages found %d", pageSub.Length()+1)
}
content <- studentDetails{
title: "title",
link:  "link",
}
}
// helper function to allow using WaitGroup in a select
func wrapWait(wg *sync.WaitGroup) <-chan struct{} {
out := make(chan struct{})
go func() {
wg.Wait()
out <- struct{}{}
}()
return out
}
英文:

Since you are using buffered channels you can retrieve the remaining values after closing the channel. You will also need a mechanism to prevent your main function from exiting too early while the reader is still doing work ,as @Burak Serdar has advised.

I restructured the code to give a working example but it should get the point across.

package main

import (
	&quot;context&quot;
	&quot;log&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

type studentDetails struct {
	title string
	link  string
}

func main() {
	var wg sync.WaitGroup
	var err error
	students := make([]studentDetails, 0)
	studentCh := make(chan studentDetails, 10000)
	errorCh := make(chan error, 1)

	start := time.Now()
	wg.Add(1)
	go getDetailStudents(context.TODO(), studentCh, errorCh, &amp;wg, &quot;http://example.com&quot;, false)

LOOP:
	for {
		select {
		case p, ok := &lt;-studentCh:
			if ok {
				log.Printf(&quot;Links %s: [%s]\n&quot;, p.title, p.link)
				students = append(students, p)
			} else {
				log.Println(&quot;Draining student channel&quot;)
				for p := range studentCh {
					log.Printf(&quot;Links %s: [%s]\n&quot;, p.title, p.link)
					students = append(students, p)
				}
				break LOOP
			}
		case err = &lt;-errorCh:
			if err != nil {

				break LOOP
			}
		case &lt;-wrapWait(&amp;wg):
			close(studentCh)
		}
	}
	close(errorCh)
	elapsed := time.Since(start)
	log.Printf(&quot;operation took %s&quot;, elapsed)
}

func getDetailStudents(rCtx context.Context, content chan&lt;- studentDetails, errorCh chan&lt;- error, wg *sync.WaitGroup, url string, subSection bool) {
	defer func() {
		log.Println(&quot;Closing&quot;)
		wg.Done()
	}()
	if !subSection {
		wg.Add(1)
		go getDetailStudents(rCtx, content, errorCh, wg, url, true)
		// L.Warnf(&quot;total pages found %d&quot;, pageSub.Length()+1)
	}
	content &lt;- studentDetails{
		title: &quot;title&quot;,
		link:  &quot;link&quot;,
	}
}

// helper function to allow using WaitGroup in a select
func wrapWait(wg *sync.WaitGroup) &lt;-chan struct{} {
	out := make(chan struct{})
	go func() {
		wg.Wait()
		out &lt;- struct{}{}
	}()
	return out
}

答案3

得分: 0

wg.Add(1)
go func(){
defer wg.Done()
// 我认为你不需要一个递归函数。
// 这个函数过于复杂。
s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
}(...)

wg.Add(1)
go func(ch chan studentDetail, e chan error) {
defer wg.Done()
...
}(...)

wg.Wait()
close(studentCh)
close(errorCh)

这样应该解决了问题。s.getDetailStudents函数必须简化。将其递归化没有任何好处。

英文:
wg.Add(1)
go func(){
defer wg.Done()
// I do not think that you need a recursive function.
// this function overcomplicated.
s.getDetailStudents(rCtx, studentCh , errorCh, &amp;wg, s.Link, false)
}(...)
wg.Add(1)
go func(ch chan studentDetail, e chan error) {
defer wg.Done()
...
}(...)
wg.Wait()
close(studentCh)
close(errorCh)

This should solve the problem. s.getDetailStudents function must be simplified. Making it recursive does not have any benefit.

huangapple
  • 本文由 发表于 2023年1月20日 16:51:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/75181668.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定