为什么这个程序卡住了?

huangapple go评论87阅读模式
英文:

Why is this program hanging?

问题

我有一段在Go语言中实现的代码,用于在通道之间进行通信。它似乎能够完成所需的操作,但在最后卡住了。我正在尝试诊断为什么会卡住。

这段代码使用httpbin.org来GET一个随机的UUID,然后将其POST出去,同时遵守我通过信号量通道和速率通道建立的并发和速率限制。

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"sync"
	"time"
)

type HttpBinGetRequest struct {
	url string
}

type HttpBinGetResponse struct {
	Uuid       string `json:"uuid"`
	StatusCode int
}

type HttpBinPostRequest struct {
	url  string
	uuid string // 要POST到API的项目
}

type HttpBinPostResponse struct {
	Data       string `json:"data"`
	StatusCode int
}

func main() {

	// 准备n个GET请求
	var requests []*HttpBinGetRequest
	for i := 0; i < 10; i++ {
		uri := "https://httpbin.org/uuid"
		request := &HttpBinGetRequest{
			url: uri,
		}
		requests = append(requests, request)
	}

	// 创建GET端点的信号量和速率限制
	getSemaphore := make(chan struct{}, 10)
	getRate := make(chan struct{}, 10)
	defer close(getRate)
	defer close(getSemaphore)
	for i := 0; i < cap(getRate); i++ {
		getRate <- struct{}{}
	}

	go func() {
		// 每秒发送1/n个请求的定时器
		// 其中n为速率限制
		// 基本上是 (1000 / rps) * time.Millisecond
		ticker := time.NewTicker(100 * time.Millisecond)
		defer ticker.Stop()
		for range ticker.C {
			_, ok := <-getRate
			if !ok {
				return
			}
		}
	}()

	// 发送GET请求以获取随机的UUID
	respChan := make(chan HttpBinGetResponse)
	var wg sync.WaitGroup
	for _, request := range requests {
		wg.Add(1)
		go func(r *HttpBinGetRequest) {
			defer wg.Done()

			// 检查速率限制器,如果为空则阻塞
			getRate <- struct{}{}
			resp, _ := get(r, getSemaphore)

			fmt.Printf("%+v\n", resp)
			// 将响应放入通道中
			respChan <- *resp
		}(request)
	}

	// 设置每秒发送10个POST请求
	postSemaphore := make(chan struct{}, 10)
	postRate := make(chan struct{}, 10)
	defer close(postRate)
	defer close(postSemaphore)
	for i := 0; i < cap(postRate); i++ {
		postRate <- struct{}{}
	}

	go func() {
		// 每秒发送1/n个请求的定时器
		// 其中n为速率限制
		// 基本上是 (1000 / rps) * time.Millisecond
		ticker := time.NewTicker(100 * time.Millisecond)
		defer ticker.Stop()
		for range ticker.C {
			_, ok := <-postRate
			if !ok {
				return
			}
		}
	}()

	// 读取可用的响应
	for ele := range respChan {
		postReq := &HttpBinPostRequest{
			url:  "https://httpbin.org/post",
			uuid: ele.Uuid,
		}
		go func(r *HttpBinPostRequest) {
			postRate <- struct{}{}
			postResp, err := post(r, postSemaphore)
			if err != nil {
				fmt.Println(err)
			}
			fmt.Printf("%+v\n", postResp)
		}(postReq)

	}
	wg.Wait()
	close(respChan)
}

func get(hbgr *HttpBinGetRequest, sem chan struct{}) (*HttpBinGetResponse, error) {

	// 向信号量添加一个令牌
	sem <- struct{}{}

	// 函数完成后移除令牌
	defer func() { <-sem }()
	httpResp := &HttpBinGetResponse{}
	client := &http.Client{}
	req, err := http.NewRequest("GET", hbgr.url, nil)
	if err != nil {
		fmt.Println("error making request")
		return httpResp, err
	}

	req.Header = http.Header{
		"accept": {"application/json"},
	}

	resp, err := client.Do(req)
	if err != nil {
		fmt.Println(err)
		fmt.Println("error getting response")
		return httpResp, err
	}

	// 读取响应
	body, err := io.ReadAll(resp.Body)
	if err != nil {
		fmt.Println("error reading response body")
		return httpResp, err
	}
	json.Unmarshal(body, &httpResp)
	httpResp.StatusCode = resp.StatusCode
	return httpResp, nil
}

// POST数据到httpbin的方法
func post(hbr *HttpBinPostRequest, sem chan struct{}) (*HttpBinPostResponse, error) {

	// 向信号量添加一个令牌
	sem <- struct{}{}
	defer func() { <-sem }()
	httpResp := &HttpBinPostResponse{}
	client := &http.Client{}
	req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
	if err != nil {
		fmt.Println("error making request")
		return httpResp, err
	}

	req.Header = http.Header{
		"accept": {"application/json"},
	}

	resp, err := client.Do(req)
	if err != nil {
		fmt.Println("error getting response")
		return httpResp, err
	}

	// 读取响应
	body, err := io.ReadAll(resp.Body)
	if err != nil {
		fmt.Println("error reading response body")
		return httpResp, err
	}
	json.Unmarshal(body, &httpResp)
	httpResp.StatusCode = resp.StatusCode
	return httpResp, nil
}

英文:

I have code that communicates between channels in Go. It appears to complete the desired operations, but it hangs at the end. I'm trying to diagnose why it is hanging.

The code is using httpbin.org to GET a random UUID, then post it, while obeying concurrency and rate limits that I have established via a semaphore channel and a rate channel.

package main
import (
&quot;bytes&quot;
&quot;encoding/json&quot;
&quot;fmt&quot;
&quot;io&quot;
&quot;net/http&quot;
&quot;sync&quot;
&quot;time&quot;
)
type HttpBinGetRequest struct {
url string
}
type HttpBinGetResponse struct {
Uuid       string `json:&quot;uuid&quot;`
StatusCode int
}
type HttpBinPostRequest struct {
url  string
uuid string // Item to post to API
}
type HttpBinPostResponse struct {
Data       string `json:&quot;data&quot;`
StatusCode int
}
func main() {
// Prepare GET requests for n requests
var requests []*HttpBinGetRequest
for i := 0; i &lt; 10; i++ {
uri := &quot;https://httpbin.org/uuid&quot;
request := &amp;HttpBinGetRequest{
url: uri,
}
requests = append(requests, request)
}
// Create semaphore and rate limit for the GET endpoint
getSemaphore := make(chan struct{}, 10)
getRate := make(chan struct{}, 10)
defer close(getRate)
defer close(getSemaphore)
for i := 0; i &lt; cap(getRate); i++ {
getRate &lt;- struct{}{}
}
go func() {
// ticker corresponding to 1/nth of a second
// where n = rate limit
// basically (1000 / rps) * time.Millisecond
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := &lt;-getRate
if !ok {
return
}
}
}()
// Send our GET requests to obtain a random UUID
respChan := make(chan HttpBinGetResponse)
var wg sync.WaitGroup
for _, request := range requests {
wg.Add(1)
// cnt := c
// Go func to make request and receive the response
go func(r *HttpBinGetRequest) {
defer wg.Done()
// Check the rate limiter and block if it is empty
getRate &lt;- struct{}{}
// fmt.Printf(&quot;Request #%d at: %s\n&quot;, cnt, time.Now().UTC().Format(&quot;2006-01-02T15:04:05.000Z07:00&quot;))
resp, _ := get(r, getSemaphore)
fmt.Printf(&quot;%+v\n&quot;, resp)
// Place our response into the channel
respChan &lt;- *resp
// fmt.Printf(&quot;%+v,%s\n&quot;, resp, time.Now().UTC().Format(&quot;2006-01-02T15:04:05.000Z07:00&quot;))
}(request)
}
// Set up for POST requests 10/s
postSemaphore := make(chan struct{}, 10)
postRate := make(chan struct{}, 10)
defer close(postRate)
defer close(postSemaphore)
for i := 0; i &lt; cap(postRate); i++ {
postRate &lt;- struct{}{}
}
go func() {
// ticker corresponding to 1/nth of a second
// where n = rate limit
// basically (1000 / rps) * time.Millisecond
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := &lt;-postRate
if !ok {
return
}
}
}()
// Read responses as they become available
for ele := range respChan {
postReq := &amp;HttpBinPostRequest{
url:  &quot;https://httpbin.org/post&quot;,
uuid: ele.Uuid,
}
go func(r *HttpBinPostRequest) {
postRate &lt;- struct{}{}
postResp, err := post(r, postSemaphore)
if err != nil {
fmt.Println(err)
}
fmt.Printf(&quot;%+v\n&quot;, postResp)
}(postReq)
}
wg.Wait()
close(respChan)
}
func get(hbgr *HttpBinGetRequest, sem chan struct{}) (*HttpBinGetResponse, error) {
// Add a token to the semaphore
sem &lt;- struct{}{}
// Remove token when function is complete
defer func() { &lt;-sem }()
httpResp := &amp;HttpBinGetResponse{}
client := &amp;http.Client{}
req, err := http.NewRequest(&quot;GET&quot;, hbgr.url, nil)
if err != nil {
fmt.Println(&quot;error making request&quot;)
return httpResp, err
}
req.Header = http.Header{
&quot;accept&quot;: {&quot;application/json&quot;},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
fmt.Println(&quot;error getting response&quot;)
return httpResp, err
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(&quot;error reading response body&quot;)
return httpResp, err
}
json.Unmarshal(body, &amp;httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}
// Method to post data to httpbin
func post(hbr *HttpBinPostRequest, sem chan struct{}) (*HttpBinPostResponse, error) {
// Add a token to the semaphore
sem &lt;- struct{}{}
defer func() { &lt;-sem }()
httpResp := &amp;HttpBinPostResponse{}
client := &amp;http.Client{}
req, err := http.NewRequest(&quot;POST&quot;, hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
if err != nil {
fmt.Println(&quot;error making request&quot;)
return httpResp, err
}
req.Header = http.Header{
&quot;accept&quot;: {&quot;application/json&quot;},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(&quot;error getting response&quot;)
return httpResp, err
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(&quot;error reading response body&quot;)
return httpResp, err
}
json.Unmarshal(body, &amp;httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}

答案1

得分: 2

你正在通过range语句从respChan中读取数据。这段代码在通道关闭之前不会退出,而通道的关闭发生在该代码块之后。

for ele := range respChan {
    // ...
}

wg.Wait()
close(respChan)

因此,程序永远不会退出,因为所有这些逻辑都在同一个goroutine中。

为了修复这个问题,并确保在程序退出之前处理所有记录,将通道读取代码保留在主goroutine中,并将等待/关闭逻辑放在它自己的goroutine中:

go func() {
    wg.Wait()        // 等待工作协程完成...
    close(respChan)  // ...现在向主goroutine发出完成信号
}()

for ele := range respChan {
    // ...
}

编辑:为了等待最终的range循环中的任何子协程,可能有一种更简洁的方法只使用一个WaitGroup,但是一个快速的修复方法可以是:

var swg sync.WaitGroup

go func() {
    wg.Wait()        // 等待工作协程完成...
    swg.Wait()       // ...和子任务
    close(respChan)  // ...现在向主goroutine发出完成信号
}()

for ele := range respChan {
    // ...
    swg.Add(1)
    go func() {
        defer swg.Done()
        // ...
    }()
}
英文:

You are reading from respChan at the end of your code via a range statement. This code will not exit until the channel is closed - which happens after that code block.

for ele := range respChan {
// ...
}
wg.Wait()        
close(respChan)

So the program will never exit - since all this logic is within the same goroutine.

To fix - and ensure all records are processed before program exit - keep the channel reading code in the main goroutine and put the wait/close logic in it's own goroutine:

go func() {
wg.Wait()        // wait for workers to finish ...
close(respChan)  // ... now signal the main goroutine we&#39;re done
}()
for ele := range respChan {
// ...
}

EDIT to wait for any sub-goroutines within the final range loop - there's probably a cleaner way to use just one waitgroup, but one quick fix could be:

var swg sync.WaitGroup
go func() {
wg.Wait()        // wait for workers to finish ...
swg.Wait()       // ... and sub-tasks
close(respChan)  // ... now signal the main goroutine we&#39;re done
}()
for ele := range respChan {
// ...
swg.Add(1)
go func() {
defer swg.Done()
// ...
}()
}

huangapple
  • 本文由 发表于 2023年3月24日 21:26:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/75834225.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定