使用Go中的通道同时接收响应并写入SQL

huangapple go评论105阅读模式
英文:

Using Channels in Go to receive Responses and Write to SQL Concurrently

问题

我正在使用Go语言实现一个从外部API获取JSON数据的流水线,处理消息,然后将其发送到SQL数据库。

我尝试并发运行API请求,然后在返回响应后,通过另一个goroutine的load()函数将其插入到数据库中。

在我的下面的代码中,有时我会在load()函数中收到log.Printf()的输出,有时则不会。这表明我可能关闭了一个通道或者没有正确设置通信。

我尝试的模式类似于这样:

  1. package main
  2. import (
  3. "encoding/json"
  4. "io/ioutil"
  5. "log"
  6. "net/http"
  7. "time"
  8. )
  9. type Request struct {
  10. url string
  11. }
  12. type Response struct {
  13. status int
  14. args Args `json:"args"`
  15. headers Headers `json:"headers"`
  16. origin string `json:"origin"`
  17. url string `json:"url"`
  18. }
  19. type Args struct {
  20. }
  21. type Headers struct {
  22. accept string `json:"Accept"`
  23. }
  24. func main() {
  25. start := time.Now()
  26. numRequests := 5
  27. responses := make(chan Response, 5)
  28. defer close(responses)
  29. for i := 0; i < numRequests; i++ {
  30. req := Request{url: "https://httpbin.org/get"}
  31. go func(req *Request) {
  32. resp, err := extract(req)
  33. if err != nil {
  34. log.Fatal("Error extracting data from API")
  35. return
  36. }
  37. // Send response to channel
  38. responses <- resp
  39. }(&req)
  40. // Perform go routine to load data
  41. go load(responses)
  42. }
  43. log.Println("Execution time: ", time.Since(start))
  44. }
  45. func extract(req *Request) (r Response, err error) {
  46. var resp Response
  47. request, err := http.NewRequest("GET", req.url, nil)
  48. if err != nil {
  49. return resp, err
  50. }
  51. request.Header = http.Header{
  52. "accept": {"application/json"},
  53. }
  54. response, err := http.DefaultClient.Do(request)
  55. defer response.Body.Close()
  56. if err != nil {
  57. log.Fatal("Error")
  58. return resp, err
  59. }
  60. // Read response data
  61. body, err := ioutil.ReadAll(response.Body)
  62. if err != nil {
  63. log.Fatal("Error")
  64. return resp, err
  65. }
  66. json.Unmarshal(body, &resp)
  67. resp.status = response.StatusCode
  68. return resp, nil
  69. }
  70. type Record struct {
  71. origin string
  72. url string
  73. }
  74. func load(ch chan Response) {
  75. // Read response from channel
  76. resp := <-ch
  77. // Process the response data
  78. records := process(resp)
  79. log.Printf("%+v\n", records)
  80. // Load data to db stuff here
  81. }
  82. func process(resp Response) (record Record) {
  83. // Process the response struct as needed to get a record of data to insert to DB
  84. return record
  85. }
英文:

I am working with Go to implement a pipeline of JSON data from an external API, process the message and then send to a SQL database.

I am trying to concurrently run API requests, then after I return a response, I'd like to send it to be inserted into the DB via another goroutine via load().

In my below code, sometimes I'll receive my log.Printf() in the load() func, other times I won't. Which indicates that I'm likely closing a channel or not properly setting up the communication.

The pattern I am attempting is something like this:

  1. package main
  2. import (
  3. &quot;encoding/json&quot;
  4. &quot;io/ioutil&quot;
  5. &quot;log&quot;
  6. &quot;net/http&quot;
  7. &quot;time&quot;
  8. )
  9. type Request struct {
  10. url string
  11. }
  12. type Response struct {
  13. status int
  14. args Args `json:&quot;args&quot;`
  15. headers Headers `json:&quot;headers&quot;`
  16. origin string `json:&quot;origin&quot;`
  17. url string `json:&quot;url&quot;`
  18. }
  19. type Args struct {
  20. }
  21. type Headers struct {
  22. accept string `json:&quot;Accept&quot;`
  23. }
  24. func main() {
  25. start := time.Now()
  26. numRequests := 5
  27. responses := make(chan Response, 5)
  28. defer close(responses)
  29. for i := 0; i &lt; numRequests; i++ {
  30. req := Request{url: &quot;https://httpbin.org/get&quot;}
  31. go func(req *Request) {
  32. resp, err := extract(req)
  33. if err != nil {
  34. log.Fatal(&quot;Error extracting data from API&quot;)
  35. return
  36. }
  37. // Send response to channel
  38. responses &lt;- resp
  39. }(&amp;req)
  40. // Perform go routine to load data
  41. go load(responses)
  42. }
  43. log.Println(&quot;Execution time: &quot;, time.Since(start))
  44. }
  45. func extract(req *Request) (r Response, err error) {
  46. var resp Response
  47. request, err := http.NewRequest(&quot;GET&quot;, req.url, nil)
  48. if err != nil {
  49. return resp, err
  50. }
  51. request.Header = http.Header{
  52. &quot;accept&quot;: {&quot;application/json&quot;},
  53. }
  54. response, err := http.DefaultClient.Do(request)
  55. defer response.Body.Close()
  56. if err != nil {
  57. log.Fatal(&quot;Error&quot;)
  58. return resp, err
  59. }
  60. // Read response data
  61. body, err := ioutil.ReadAll(response.Body)
  62. if err != nil {
  63. log.Fatal(&quot;Error&quot;)
  64. return resp, err
  65. }
  66. json.Unmarshal(body, &amp;resp)
  67. resp.status = response.StatusCode
  68. return resp, nil
  69. }
  70. type Record struct {
  71. origin string
  72. url string
  73. }
  74. func load(ch chan Response) {
  75. // Read response from channel
  76. resp := &lt;-ch
  77. // Process the response data
  78. records := process(resp)
  79. log.Printf(&quot;%+v\n&quot;, records)
  80. // Load data to db stuff here
  81. }
  82. func process(resp Response) (record Record) {
  83. // Process the response struct as needed to get a record of data to insert to DB
  84. return record
  85. }

答案1

得分: 1

程序在工作完成之前没有对完成进行保护。因此,有时候程序会在goroutine完成之前终止。

为了防止这种情况发生,可以使用WaitGroup:

  1. wg := sync.WaitGroup{}
  2. for i := 0; i < numRequests; i++ {
  3. ...
  4. wg.Add(1)
  5. go func() {
  6. defer wg.Done()
  7. load(responses)
  8. }()
  9. }
  10. wg.Wait()
英文:

The program has no protection against completion before the work is done. So sometimes the program terminates before the goroutine can finish.

To prevent that, use a WaitGroup:

  1. wg:=sync.WaitGroup{}
  2. for i := 0; i &lt; numRequests; i++ {
  3. ...
  4. wg.Add(1)
  5. go func() {
  6. defer wg.Done()
  7. load(responses)
  8. }()
  9. }
  10. wg.Wait()

huangapple
  • 本文由 发表于 2022年11月19日 02:37:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/74494031.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定