为什么这个程序卡住了?

huangapple go评论114阅读模式
英文:

Why is this program hanging?

问题

我有一段在Go语言中实现的代码,用于在通道之间进行通信。它似乎能够完成所需的操作,但在最后卡住了。我正在尝试诊断为什么会卡住。

这段代码使用httpbin.org来GET一个随机的UUID,然后将其POST出去,同时遵守我通过信号量通道和速率通道建立的并发和速率限制。

  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "sync"
  9. "time"
  10. )
  11. type HttpBinGetRequest struct {
  12. url string
  13. }
  14. type HttpBinGetResponse struct {
  15. Uuid string `json:"uuid"`
  16. StatusCode int
  17. }
  18. type HttpBinPostRequest struct {
  19. url string
  20. uuid string // 要POST到API的项目
  21. }
  22. type HttpBinPostResponse struct {
  23. Data string `json:"data"`
  24. StatusCode int
  25. }
  26. func main() {
  27. // 准备n个GET请求
  28. var requests []*HttpBinGetRequest
  29. for i := 0; i < 10; i++ {
  30. uri := "https://httpbin.org/uuid"
  31. request := &HttpBinGetRequest{
  32. url: uri,
  33. }
  34. requests = append(requests, request)
  35. }
  36. // 创建GET端点的信号量和速率限制
  37. getSemaphore := make(chan struct{}, 10)
  38. getRate := make(chan struct{}, 10)
  39. defer close(getRate)
  40. defer close(getSemaphore)
  41. for i := 0; i < cap(getRate); i++ {
  42. getRate <- struct{}{}
  43. }
  44. go func() {
  45. // 每秒发送1/n个请求的定时器
  46. // 其中n为速率限制
  47. // 基本上是 (1000 / rps) * time.Millisecond
  48. ticker := time.NewTicker(100 * time.Millisecond)
  49. defer ticker.Stop()
  50. for range ticker.C {
  51. _, ok := <-getRate
  52. if !ok {
  53. return
  54. }
  55. }
  56. }()
  57. // 发送GET请求以获取随机的UUID
  58. respChan := make(chan HttpBinGetResponse)
  59. var wg sync.WaitGroup
  60. for _, request := range requests {
  61. wg.Add(1)
  62. go func(r *HttpBinGetRequest) {
  63. defer wg.Done()
  64. // 检查速率限制器,如果为空则阻塞
  65. getRate <- struct{}{}
  66. resp, _ := get(r, getSemaphore)
  67. fmt.Printf("%+v\n", resp)
  68. // 将响应放入通道中
  69. respChan <- *resp
  70. }(request)
  71. }
  72. // 设置每秒发送10个POST请求
  73. postSemaphore := make(chan struct{}, 10)
  74. postRate := make(chan struct{}, 10)
  75. defer close(postRate)
  76. defer close(postSemaphore)
  77. for i := 0; i < cap(postRate); i++ {
  78. postRate <- struct{}{}
  79. }
  80. go func() {
  81. // 每秒发送1/n个请求的定时器
  82. // 其中n为速率限制
  83. // 基本上是 (1000 / rps) * time.Millisecond
  84. ticker := time.NewTicker(100 * time.Millisecond)
  85. defer ticker.Stop()
  86. for range ticker.C {
  87. _, ok := <-postRate
  88. if !ok {
  89. return
  90. }
  91. }
  92. }()
  93. // 读取可用的响应
  94. for ele := range respChan {
  95. postReq := &HttpBinPostRequest{
  96. url: "https://httpbin.org/post",
  97. uuid: ele.Uuid,
  98. }
  99. go func(r *HttpBinPostRequest) {
  100. postRate <- struct{}{}
  101. postResp, err := post(r, postSemaphore)
  102. if err != nil {
  103. fmt.Println(err)
  104. }
  105. fmt.Printf("%+v\n", postResp)
  106. }(postReq)
  107. }
  108. wg.Wait()
  109. close(respChan)
  110. }
  111. func get(hbgr *HttpBinGetRequest, sem chan struct{}) (*HttpBinGetResponse, error) {
  112. // 向信号量添加一个令牌
  113. sem <- struct{}{}
  114. // 函数完成后移除令牌
  115. defer func() { <-sem }()
  116. httpResp := &HttpBinGetResponse{}
  117. client := &http.Client{}
  118. req, err := http.NewRequest("GET", hbgr.url, nil)
  119. if err != nil {
  120. fmt.Println("error making request")
  121. return httpResp, err
  122. }
  123. req.Header = http.Header{
  124. "accept": {"application/json"},
  125. }
  126. resp, err := client.Do(req)
  127. if err != nil {
  128. fmt.Println(err)
  129. fmt.Println("error getting response")
  130. return httpResp, err
  131. }
  132. // 读取响应
  133. body, err := io.ReadAll(resp.Body)
  134. if err != nil {
  135. fmt.Println("error reading response body")
  136. return httpResp, err
  137. }
  138. json.Unmarshal(body, &httpResp)
  139. httpResp.StatusCode = resp.StatusCode
  140. return httpResp, nil
  141. }
  142. // POST数据到httpbin的方法
  143. func post(hbr *HttpBinPostRequest, sem chan struct{}) (*HttpBinPostResponse, error) {
  144. // 向信号量添加一个令牌
  145. sem <- struct{}{}
  146. defer func() { <-sem }()
  147. httpResp := &HttpBinPostResponse{}
  148. client := &http.Client{}
  149. req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
  150. if err != nil {
  151. fmt.Println("error making request")
  152. return httpResp, err
  153. }
  154. req.Header = http.Header{
  155. "accept": {"application/json"},
  156. }
  157. resp, err := client.Do(req)
  158. if err != nil {
  159. fmt.Println("error getting response")
  160. return httpResp, err
  161. }
  162. // 读取响应
  163. body, err := io.ReadAll(resp.Body)
  164. if err != nil {
  165. fmt.Println("error reading response body")
  166. return httpResp, err
  167. }
  168. json.Unmarshal(body, &httpResp)
  169. httpResp.StatusCode = resp.StatusCode
  170. return httpResp, nil
  171. }
英文:

I have code that communicates between channels in Go. It appears to complete the desired operations, but it hangs at the end. I'm trying to diagnose why it is hanging.

The code is using httpbin.org to GET a random UUID, then post it, while obeying concurrency and rate limits that I have established via a semaphore channel and a rate channel.

  1. package main
  2. import (
  3. &quot;bytes&quot;
  4. &quot;encoding/json&quot;
  5. &quot;fmt&quot;
  6. &quot;io&quot;
  7. &quot;net/http&quot;
  8. &quot;sync&quot;
  9. &quot;time&quot;
  10. )
  11. type HttpBinGetRequest struct {
  12. url string
  13. }
  14. type HttpBinGetResponse struct {
  15. Uuid string `json:&quot;uuid&quot;`
  16. StatusCode int
  17. }
  18. type HttpBinPostRequest struct {
  19. url string
  20. uuid string // Item to post to API
  21. }
  22. type HttpBinPostResponse struct {
  23. Data string `json:&quot;data&quot;`
  24. StatusCode int
  25. }
  26. func main() {
  27. // Prepare GET requests for n requests
  28. var requests []*HttpBinGetRequest
  29. for i := 0; i &lt; 10; i++ {
  30. uri := &quot;https://httpbin.org/uuid&quot;
  31. request := &amp;HttpBinGetRequest{
  32. url: uri,
  33. }
  34. requests = append(requests, request)
  35. }
  36. // Create semaphore and rate limit for the GET endpoint
  37. getSemaphore := make(chan struct{}, 10)
  38. getRate := make(chan struct{}, 10)
  39. defer close(getRate)
  40. defer close(getSemaphore)
  41. for i := 0; i &lt; cap(getRate); i++ {
  42. getRate &lt;- struct{}{}
  43. }
  44. go func() {
  45. // ticker corresponding to 1/nth of a second
  46. // where n = rate limit
  47. // basically (1000 / rps) * time.Millisecond
  48. ticker := time.NewTicker(100 * time.Millisecond)
  49. defer ticker.Stop()
  50. for range ticker.C {
  51. _, ok := &lt;-getRate
  52. if !ok {
  53. return
  54. }
  55. }
  56. }()
  57. // Send our GET requests to obtain a random UUID
  58. respChan := make(chan HttpBinGetResponse)
  59. var wg sync.WaitGroup
  60. for _, request := range requests {
  61. wg.Add(1)
  62. // cnt := c
  63. // Go func to make request and receive the response
  64. go func(r *HttpBinGetRequest) {
  65. defer wg.Done()
  66. // Check the rate limiter and block if it is empty
  67. getRate &lt;- struct{}{}
  68. // fmt.Printf(&quot;Request #%d at: %s\n&quot;, cnt, time.Now().UTC().Format(&quot;2006-01-02T15:04:05.000Z07:00&quot;))
  69. resp, _ := get(r, getSemaphore)
  70. fmt.Printf(&quot;%+v\n&quot;, resp)
  71. // Place our response into the channel
  72. respChan &lt;- *resp
  73. // fmt.Printf(&quot;%+v,%s\n&quot;, resp, time.Now().UTC().Format(&quot;2006-01-02T15:04:05.000Z07:00&quot;))
  74. }(request)
  75. }
  76. // Set up for POST requests 10/s
  77. postSemaphore := make(chan struct{}, 10)
  78. postRate := make(chan struct{}, 10)
  79. defer close(postRate)
  80. defer close(postSemaphore)
  81. for i := 0; i &lt; cap(postRate); i++ {
  82. postRate &lt;- struct{}{}
  83. }
  84. go func() {
  85. // ticker corresponding to 1/nth of a second
  86. // where n = rate limit
  87. // basically (1000 / rps) * time.Millisecond
  88. ticker := time.NewTicker(100 * time.Millisecond)
  89. defer ticker.Stop()
  90. for range ticker.C {
  91. _, ok := &lt;-postRate
  92. if !ok {
  93. return
  94. }
  95. }
  96. }()
  97. // Read responses as they become available
  98. for ele := range respChan {
  99. postReq := &amp;HttpBinPostRequest{
  100. url: &quot;https://httpbin.org/post&quot;,
  101. uuid: ele.Uuid,
  102. }
  103. go func(r *HttpBinPostRequest) {
  104. postRate &lt;- struct{}{}
  105. postResp, err := post(r, postSemaphore)
  106. if err != nil {
  107. fmt.Println(err)
  108. }
  109. fmt.Printf(&quot;%+v\n&quot;, postResp)
  110. }(postReq)
  111. }
  112. wg.Wait()
  113. close(respChan)
  114. }
  115. func get(hbgr *HttpBinGetRequest, sem chan struct{}) (*HttpBinGetResponse, error) {
  116. // Add a token to the semaphore
  117. sem &lt;- struct{}{}
  118. // Remove token when function is complete
  119. defer func() { &lt;-sem }()
  120. httpResp := &amp;HttpBinGetResponse{}
  121. client := &amp;http.Client{}
  122. req, err := http.NewRequest(&quot;GET&quot;, hbgr.url, nil)
  123. if err != nil {
  124. fmt.Println(&quot;error making request&quot;)
  125. return httpResp, err
  126. }
  127. req.Header = http.Header{
  128. &quot;accept&quot;: {&quot;application/json&quot;},
  129. }
  130. resp, err := client.Do(req)
  131. if err != nil {
  132. fmt.Println(err)
  133. fmt.Println(&quot;error getting response&quot;)
  134. return httpResp, err
  135. }
  136. // Read Response
  137. body, err := io.ReadAll(resp.Body)
  138. if err != nil {
  139. fmt.Println(&quot;error reading response body&quot;)
  140. return httpResp, err
  141. }
  142. json.Unmarshal(body, &amp;httpResp)
  143. httpResp.StatusCode = resp.StatusCode
  144. return httpResp, nil
  145. }
  146. // Method to post data to httpbin
  147. func post(hbr *HttpBinPostRequest, sem chan struct{}) (*HttpBinPostResponse, error) {
  148. // Add a token to the semaphore
  149. sem &lt;- struct{}{}
  150. defer func() { &lt;-sem }()
  151. httpResp := &amp;HttpBinPostResponse{}
  152. client := &amp;http.Client{}
  153. req, err := http.NewRequest(&quot;POST&quot;, hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
  154. if err != nil {
  155. fmt.Println(&quot;error making request&quot;)
  156. return httpResp, err
  157. }
  158. req.Header = http.Header{
  159. &quot;accept&quot;: {&quot;application/json&quot;},
  160. }
  161. resp, err := client.Do(req)
  162. if err != nil {
  163. fmt.Println(&quot;error getting response&quot;)
  164. return httpResp, err
  165. }
  166. // Read Response
  167. body, err := io.ReadAll(resp.Body)
  168. if err != nil {
  169. fmt.Println(&quot;error reading response body&quot;)
  170. return httpResp, err
  171. }
  172. json.Unmarshal(body, &amp;httpResp)
  173. httpResp.StatusCode = resp.StatusCode
  174. return httpResp, nil
  175. }

答案1

得分: 2

你正在通过range语句从respChan中读取数据。这段代码在通道关闭之前不会退出,而通道的关闭发生在该代码块之后。

  1. for ele := range respChan {
  2. // ...
  3. }
  4. wg.Wait()
  5. close(respChan)

因此,程序永远不会退出,因为所有这些逻辑都在同一个goroutine中。

为了修复这个问题,并确保在程序退出之前处理所有记录,将通道读取代码保留在主goroutine中,并将等待/关闭逻辑放在它自己的goroutine中:

  1. go func() {
  2. wg.Wait() // 等待工作协程完成...
  3. close(respChan) // ...现在向主goroutine发出完成信号
  4. }()
  5. for ele := range respChan {
  6. // ...
  7. }

编辑:为了等待最终的range循环中的任何子协程,可能有一种更简洁的方法只使用一个WaitGroup,但是一个快速的修复方法可以是:

  1. var swg sync.WaitGroup
  2. go func() {
  3. wg.Wait() // 等待工作协程完成...
  4. swg.Wait() // ...和子任务
  5. close(respChan) // ...现在向主goroutine发出完成信号
  6. }()
  7. for ele := range respChan {
  8. // ...
  9. swg.Add(1)
  10. go func() {
  11. defer swg.Done()
  12. // ...
  13. }()
  14. }
英文:

You are reading from respChan at the end of your code via a range statement. This code will not exit until the channel is closed - which happens after that code block.

  1. for ele := range respChan {
  2. // ...
  3. }
  4. wg.Wait()
  5. close(respChan)

So the program will never exit - since all this logic is within the same goroutine.

To fix - and ensure all records are processed before program exit - keep the channel reading code in the main goroutine and put the wait/close logic in it's own goroutine:

  1. go func() {
  2. wg.Wait() // wait for workers to finish ...
  3. close(respChan) // ... now signal the main goroutine we&#39;re done
  4. }()
  5. for ele := range respChan {
  6. // ...
  7. }

EDIT to wait for any sub-goroutines within the final range loop - there's probably a cleaner way to use just one waitgroup, but one quick fix could be:

  1. var swg sync.WaitGroup
  2. go func() {
  3. wg.Wait() // wait for workers to finish ...
  4. swg.Wait() // ... and sub-tasks
  5. close(respChan) // ... now signal the main goroutine we&#39;re done
  6. }()
  7. for ele := range respChan {
  8. // ...
  9. swg.Add(1)
  10. go func() {
  11. defer swg.Done()
  12. // ...
  13. }()
  14. }

huangapple
  • 本文由 发表于 2023年3月24日 21:26:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/75834225.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定