如何确保在goroutine内部启动的goroutine之间同步?

huangapple go评论78阅读模式
英文:

How to ensure goroutines launched within goroutines are synchronized with each other?

问题

这是我第一次使用Go的并发特性,我直接跳入了深水区。

我想要对一个API进行并发调用。请求是基于我想要接收的帖子的标签(可以有1 <= N个标签)。响应体的格式如下:

{
    "posts": [
        {
            "id": 1,
            "author": "Name",
            "authorId": 1,
            "likes": num_likes,
            "popularity": popularity_decimal,
            "reads": num_reads,
            "tags": [ "tag1", "tag2" ]
        },
        ...
    ]
}

我的计划是将一系列通道连接在一起,并生成一些读取和写入这些通道的goroutine:

- 对于每个标签,将其添加到一个tagsChannel中的goroutine中
- 在另一个goroutine中使用该tagsChannel来并发地进行GET请求到端点
- 对于该请求的每个响应,将帖子的底层切片传递给另一个goroutine
- 对于帖子切片中的每个单独的帖子,将帖子添加到一个postChannel中
- 在另一个goroutine中,遍历postChannel并将每个帖子插入数据结构中

以下是我目前的代码:

func (srv *server) Get() {
    // 使用红黑树可以防止重复项,插入和检索速度快,并且已经按ID排序。
    rbt := tree.NewWithIntComparator()
    // 并发方法
    tagChan := make(chan string)                       // tags -> tagChan
    postChan := make(chan models.Post)                 // tagChan -> GET -> post -> postChan
    errChan := make(chan error)                        // 用于同步goroutine之间的错误
    wg := &sync.WaitGroup{}                            // 用于同步goroutine
    wg.Add(4)
    // 创建一个go func来同步我们的等待组
    // 一旦所有goroutine都完成,我们就可以关闭errChan
    go func() {
        wg.Wait()
        close(errChan)
    }()
    go insertTags(tags, tagChan, wg)
    go fetch(postChan, tagChan, errChan, wg)
    go addPostToTree(rbt, postChan, wg)
    for err := range errChan {
        if err != nil {
            srv.HandleError(err, http.StatusInternalServerError).ServeHTTP(w, r)
        }
    }
}
// insertTags将用户传入的标签插入到tagChan中,以便tagChan可以在fetch中传递这些标签。
func insertTags(tags []string, tagChan chan<- string, group *sync.WaitGroup) {
    defer group.Done()
    for _, tag := range tags {
        tagChan <- tag
    }
    close(tagChan)
}
// fetch完成对端点的GET请求
func fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) {
    defer group.Done()
    for tag := range tags {
        ep, err := formURL(tag)
        if err != nil {
            errs <- err
        }
        group.Add(1) // 问题:我是否应该在这里使用一个单独的等待组?
        go func() {
            resp, err := http.Get(ep.String())
            if err != nil {
                errs <- err
            }
            container := models.PostContainer{}
            err = json.NewDecoder(resp.Body).Decode(&container)
            defer resp.Body.Close()
            group.Add(1) // 问题:我是否应该在这里添加一个单独的等待组,并将其传递给insertPosts?
            go insertPosts(posts, container.Posts, group)
            defer group.Done()
        }()
        // group.Done() -- 由于Burak的建议,我删除了这个调用,但现在我的程序出错了
    }
}
// insertPosts将每个单独的帖子插入到我们的posts通道中,以便可以并发地将它们添加到我们的RBT中。
func insertPosts(posts chan<- models.Post, container []models.Post, group *sync.WaitGroup) {
    defer group.Done()
    for _, post := range container {
        posts <- post
    }
}
// addPostToTree遍历通道并将每个单独的帖子插入到我们的RBT中,将帖子ID设置为节点的键。
func addPostToTree(tree *tree.RBT, collection <-chan models.Post, group *sync.WaitGroup) {
    defer group.Done()
    for post := range collection {
        // 忽略返回值和错误:
        // 我们不关心返回的键和错误只会在尝试添加重复项时出现 - 我们不关心
        tree.Insert(post.ID, post)
    }
}

我能够对端点进行一次请求,但是当我尝试提交第二次请求时,我的程序会出现panic: sync: negative WaitGroup counter错误。

我的问题是为什么我的WaitGroup计数器会变为负数?我确保在我的goroutine完成时添加到等待组并标记。

如果在第二次请求时等待组为负数,那么这必须意味着我第一次分配等待组并将其增加4的操作被跳过了...为什么?这可能与关闭通道有关吗?如果是这样,我在哪里关闭通道?

另外,有没有关于调试goroutine的提示?

谢谢你的帮助。

英文:

This is the first time I'm using the concurrency features of Go and I'm jumping right into the deep end.

I want to make concurrent calls to an API. The request is based off of the tags of the posts I want to receive back (there can be 1 <= N tags). The response body looks like this:

{
    &quot;posts&quot;: [
        {
            &quot;id&quot;: 1,
            &quot;author&quot;: &quot;Name&quot;,
            &quot;authorId&quot;: 1,
            &quot;likes&quot;: num_likes,
            &quot;popularity&quot;: popularity_decimal,
            &quot;reads&quot;: num_reads,
            &quot;tags&quot;: [ &quot;tag1&quot;, &quot;tag2&quot; ]
        },
        ...
    ]
}

My plan is to daisy-chain a bunch of channels together and spawn a number of goroutines that read and or write from those channels:

- for each tag, add it to a tagsChannel inside a goroutine
- use that tagsChannel inside another goroutine to make concurrent GET requests to the endpoint
- for each response of that request, pass the underlying slice of posts into another goroutine
- for each individual post inside the slice of posts, add the post to a postChannel
- inside another goroutine, iterate over postChannel and insert each post into a data structure

Here's what I have so far:

func (srv *server) Get() {
    // Using red-black tree prevents any duplicates, fast insertion
	// and retrieval times, and is sorted already on ID.
	rbt := tree.NewWithIntComparator()
	// concurrent approach
	tagChan := make(chan string)                       // tags -&gt; tagChan
	postChan := make(chan models.Post)                 // tagChan -&gt; GET -&gt; post -&gt; postChan
	errChan := make(chan error)                        // for synchronizing errors across goroutines
	wg := &amp;sync.WaitGroup{}                            // for synchronizing goroutines
	wg.Add(4)
	// create a go func to synchronize our wait groups
	// once all goroutines are finished, we can close our errChan
	go func() {
		wg.Wait()
		close(errChan)
	}()
	go insertTags(tags, tagChan, wg)
	go fetch(postChan, tagChan, errChan, wg)
	go addPostToTree(rbt, postChan, wg)
	for err := range errChan {
		if err != nil {
			srv.HandleError(err, http.StatusInternalServerError).ServeHTTP(w, r)
		}
	}
}
// insertTags inserts user&#39;s passed-in tags to tagChan
// so that tagChan may pass those along in fetch.
func insertTags(tags []string, tagChan chan&lt;- string, group *sync.WaitGroup) {
	defer group.Done()
	for _, tag := range tags {
		tagChan &lt;- tag
	}
	close(tagChan)
}
// fetch completes a GET request to the endpoint
func fetch(posts chan&lt;- models.Post, tags &lt;-chan string, errs chan&lt;- error, group *sync.WaitGroup) {
	defer group.Done()
	for tag := range tags {
		ep, err := formURL(tag)
		if err != nil {
			errs &lt;- err
		}
		group.Add(1) // QUESTION should I use a separate wait group here?
		go func() {
			resp, err := http.Get(ep.String())
			if err != nil {
				errs &lt;- err
			}
			container := models.PostContainer{}
			err = json.NewDecoder(resp.Body).Decode(&amp;container)
			defer resp.Body.Close()
			group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?
			go insertPosts(posts, container.Posts, group)
			defer group.Done()
		}()
		// group.Done() -- removed this call due to Burak, but now my program hands
	}
}
// insertPosts inserts each individual post into our posts channel so that they may be
// concurrently added to our RBT.
func insertPosts(posts chan&lt;- models.Post, container []models.Post, group *sync.WaitGroup) {
	defer group.Done()
	for _, post := range container {
		posts &lt;- post
	}
}
// addPostToTree iterates over the channel and
// inserts each individual post into our RBT,
// setting the post ID as the node&#39;s key.
func addPostToTree(tree *tree.RBT, collection &lt;-chan models.Post, group *sync.WaitGroup) {
	defer group.Done()
	for post := range collection {
		// ignore return value &amp; error here:
		// we don&#39;t care about the returned key and
		// error is only ever if a duplicate is attempted to be added -- we don&#39;t care
		tree.Insert(post.ID, post)
	}
}

I'm able to make one request to the endpoint, but as soon as try to submit a second request, my program fails with panic: sync: negative WaitGroup counter.

My question is why exactly is my WaitGroup counter going negative? I make sure to add to the waitgroup and mark when my goroutines are done.

If the waitgroup is negative on the second request, then that must mean that the first time I allocate a waitgroup and add 4 to it is being skipped... why? Does this have something to do with closing channels, maybe? And if so, where do I close a channel?

Also -- does anyone have tips for debugging goroutines?

Thanks for your help.

答案1

得分: 0

首先,整个设计相当复杂。我在最后提到了我的想法。

你的代码中有两个问题:

  1. posts 通道从未关闭,因此 addPostToTree 可能永远不会退出循环,导致一个 waitGroup 永远不会减少(在你的情况下程序挂起)。程序有可能陷入死锁,无限期等待其他 goroutine 释放它,但所有 goroutine 都被阻塞住了。
    解决方案: 你可以关闭 postChan 通道。但是如何关闭呢?通常建议生产者关闭通道,但你有多个生产者。所以最好的选择是,等待所有生产者完成,然后关闭通道。为了等待所有生产者完成,你需要创建另一个 waitGroup 并使用它来跟踪子例程。

代码:

// fetch 完成对端点的 GET 请求
func fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) {
    postsWG := &sync.WaitGroup{}
    for tag := range tags {
        ep, err := formURL(tag)
        if err != nil {
            errs <- err
        }
        postsWG.Add(1) // 问题:我应该在这里使用一个单独的 waitGroup 吗?
        go func() {
            resp, err := http.Get(ep.String())
            if err != nil {
                errs <- err
            }
            container := models.PostContainer{}
            err = json.NewDecoder(resp.Body).Decode(&container)
            defer resp.Body.Close()
            go insertPosts(posts, container.Posts, postsWG)
        }()
    }

    defer func() {
        postsWG.Wait()
        close(posts)
        group.Done()
    }()
}
  1. 现在,我们还有另一个问题,主 waitGroup 应该用 3 来初始化,而不是 4。这是因为主例程只启动了另外 3 个例程 wg.Add(3),所以它只需要跟踪这 3 个例程。对于子例程,我们使用了一个不同的 waitGroup,所以这不再是父例程的负担。

代码:

errChan := make(chan error)                        // 用于同步错误的通道
wg := &sync.WaitGroup{}                            // 用于同步例程的 waitGroup
wg.Add(3)
// 创建一个 go 函数来同步我们的 waitGroup
// 一旦所有例程都完成,我们就可以关闭 errChan

简而言之--

复杂的设计 - 主 waitGroup 在一个地方启动,但每个 goroutine 都可以根据需要修改这个 waitGroup。因此,它没有单一的所有者,这使得调试和维护变得非常复杂(而且不能保证没有错误)。我建议将其拆分,并为每个子例程单独创建跟踪器。这样,启动更多例程的调用者只需专注于跟踪其子 goroutine。这个例程将在完成后通知其父 waitGroup(以及其子例程完成后),而不是让子例程直接通知祖父例程。

另外,在 fetch 方法中,发出 HTTP 请求并获取响应后,为什么要创建另一个 goroutine 来处理这些数据?无论如何,这个 goroutine 在数据插入之前都无法退出,也没有执行其他数据处理操作。据我所了解,第二个 goroutine 是多余的。

group.Add(1) // 问题:我应该在这里添加一个单独的 waitGroup 并将其传递给 insertPosts 吗?
go insertPosts(posts, container.Posts, group)
defer group.Done()
英文:

Firstly, the whole design is quite complicated. Mentioned my thoughts towards the end.

There are 2 problems in your code:

  1. posts channel is never closed, due to which addPostToTree might never be existing the loop, resulting in one waitGroup never decreasing (In your case the program hangs). There is a chance the program waits indefinitely with a deadlock (Thinking other goroutine will release it, but all goroutines are stuck).<br/>
    Solution: You can close the postChan channel. But how? Its always recommended for the producer to always close the channel, but you've multiple producers. So the best option is, wait for all producers to finish and then close the channel. In order to wait for all producers to finish, you'll need to create another waitGroup and use that to track the child routines.

Code:

// fetch completes a GET request to the endpoint
func fetch(posts chan&lt;- models.Post, tags &lt;-chan string, errs chan&lt;- error, group *sync.WaitGroup) {
    postsWG := &amp;sync.WaitGroup{}
    for tag := range tags {
        ep, err := formURL(tag)
        if err != nil {
            errs &lt;- err
        }
        postsWG.Add(1) // QUESTION should I use a separate wait group here?
        go func() {
            resp, err := http.Get(ep.String())
            if err != nil {
                errs &lt;- err
            }
            container := models.PostContainer{}
            err = json.NewDecoder(resp.Body).Decode(&amp;container)
            defer resp.Body.Close()
            go insertPosts(posts, container.Posts, postsWG)
        }()
    }

	defer func() {
    	postsWG.Wait()
		close(posts)
    	group.Done()
    }()
}
  1. Now, we've another issue, the main waitGroup should be initialized with 3 instead of 4. This is because the main routine is only spinning up 3 more routines wg.Add(3), so it has to keep track of only those. For child routines, we're using a different waitGroup, so that is not the headache of the parent anymore.

Code:

errChan := make(chan error)                        // for synchronizing errors across goroutines
    wg := &amp;sync.WaitGroup{}                            // for synchronizing goroutines
    wg.Add(3)
    // create a go func to synchronize our wait groups
    // once all goroutines are finished, we can close our errChan

TLDR --

Complex Design - As the main wait group is started at one place, but each goroutine is modifying this waitGroup as it feels necessary. So there is no single owner for this, which makes debugging and maintaining super complex (+ can't ensure it'll be bug free). <br/>
I would recommend breaking this down and have separate trackers for each child routines. That way, the caller who is spinning up more routines can only concentrate on tracking its child goroutines. This routine will then inform its parent waitGroup only after its done (& its child is done, rather than letting the child routinue informing the grandparent directly).

Also, in fetch method after making the HTTP call and getting the response, why create another goroutine to process this data? Either way this goroutine cannot exit until the data insertion happens, nor is it doing someother action which data processing happens. From what I understand, The second goroutine is redundant.

group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?
            go insertPosts(posts, container.Posts, group)
            defer group.Done()

huangapple
  • 本文由 发表于 2021年11月6日 07:20:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/69860229.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定