使用Go中的通道同时接收响应并写入SQL

huangapple go评论77阅读模式
英文:

Using Channels in Go to receive Responses and Write to SQL Concurrently

问题

我正在使用Go语言实现一个从外部API获取JSON数据的流水线,处理消息,然后将其发送到SQL数据库。

我尝试并发运行API请求,然后在返回响应后,通过另一个goroutine的load()函数将其插入到数据库中。

在我的下面的代码中,有时我会在load()函数中收到log.Printf()的输出,有时则不会。这表明我可能关闭了一个通道或者没有正确设置通信。

我尝试的模式类似于这样:

package main

import (
	"encoding/json"
	"io/ioutil"
	"log"
	"net/http"
	"time"
)

type Request struct {
	url string
}

type Response struct {
	status  int
	args    Args    `json:"args"`
	headers Headers `json:"headers"`
	origin  string  `json:"origin"`
	url     string  `json:"url"`
}

type Args struct {
}

type Headers struct {
	accept string `json:"Accept"`
}

func main() {
	start := time.Now()

	numRequests := 5
	responses := make(chan Response, 5)
	defer close(responses)
	for i := 0; i < numRequests; i++ {
		req := Request{url: "https://httpbin.org/get"}
		go func(req *Request) {
			resp, err := extract(req)
			if err != nil {
				log.Fatal("Error extracting data from API")
				return
			}
			// Send response to channel
			responses <- resp
		}(&req)

		// Perform go routine to load data
		go load(responses)
	}

	log.Println("Execution time: ", time.Since(start))
}

func extract(req *Request) (r Response, err error) {
	var resp Response
	request, err := http.NewRequest("GET", req.url, nil)
	if err != nil {
		return resp, err
	}
	request.Header = http.Header{
		"accept": {"application/json"},
	}

	response, err := http.DefaultClient.Do(request)
	defer response.Body.Close()

	if err != nil {
		log.Fatal("Error")
		return resp, err
	}
	// Read response data
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		log.Fatal("Error")
		return resp, err
	}
	json.Unmarshal(body, &resp)
	resp.status = response.StatusCode

	return resp, nil
}

type Record struct {
	origin string
	url    string
}

func load(ch chan Response) {

	// Read response from channel
	resp := <-ch

	// Process the response data
	records := process(resp)
	log.Printf("%+v\n", records)

	// Load data to db stuff here

}

func process(resp Response) (record Record) {
	// Process the response struct as needed to get a record of data to insert to DB
	return record
}
英文:

I am working with Go to implement a pipeline of JSON data from an external API, process the message and then send to a SQL database.

I am trying to concurrently run API requests, then after I return a response, I'd like to send it to be inserted into the DB via another goroutine via load().

In my below code, sometimes I'll receive my log.Printf() in the load() func, other times I won't. Which indicates that I'm likely closing a channel or not properly setting up the communication.

The pattern I am attempting is something like this:

package main
import (
&quot;encoding/json&quot;
&quot;io/ioutil&quot;
&quot;log&quot;
&quot;net/http&quot;
&quot;time&quot;
)
type Request struct {
url string
}
type Response struct {
status  int
args    Args    `json:&quot;args&quot;`
headers Headers `json:&quot;headers&quot;`
origin  string  `json:&quot;origin&quot;`
url     string  `json:&quot;url&quot;`
}
type Args struct {
}
type Headers struct {
accept string `json:&quot;Accept&quot;`
}
func main() {
start := time.Now()
numRequests := 5
responses := make(chan Response, 5)
defer close(responses)
for i := 0; i &lt; numRequests; i++ {
req := Request{url: &quot;https://httpbin.org/get&quot;}
go func(req *Request) {
resp, err := extract(req)
if err != nil {
log.Fatal(&quot;Error extracting data from API&quot;)
return
}
// Send response to channel
responses &lt;- resp
}(&amp;req)
// Perform go routine to load data
go load(responses)
}
log.Println(&quot;Execution time: &quot;, time.Since(start))
}
func extract(req *Request) (r Response, err error) {
var resp Response
request, err := http.NewRequest(&quot;GET&quot;, req.url, nil)
if err != nil {
return resp, err
}
request.Header = http.Header{
&quot;accept&quot;: {&quot;application/json&quot;},
}
response, err := http.DefaultClient.Do(request)
defer response.Body.Close()
if err != nil {
log.Fatal(&quot;Error&quot;)
return resp, err
}
// Read response data
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Fatal(&quot;Error&quot;)
return resp, err
}
json.Unmarshal(body, &amp;resp)
resp.status = response.StatusCode
return resp, nil
}
type Record struct {
origin string
url    string
}
func load(ch chan Response) {
// Read response from channel
resp := &lt;-ch
// Process the response data
records := process(resp)
log.Printf(&quot;%+v\n&quot;, records)
// Load data to db stuff here
}
func process(resp Response) (record Record) {
// Process the response struct as needed to get a record of data to insert to DB
return record
}

答案1

得分: 1

程序在工作完成之前没有对完成进行保护。因此,有时候程序会在goroutine完成之前终止。

为了防止这种情况发生,可以使用WaitGroup:

   wg := sync.WaitGroup{}
   for i := 0; i < numRequests; i++ {
     ...
     wg.Add(1)
     go func() {
        defer wg.Done()
        load(responses)
     }()
   }
  wg.Wait()
英文:

The program has no protection against completion before the work is done. So sometimes the program terminates before the goroutine can finish.

To prevent that, use a WaitGroup:

   wg:=sync.WaitGroup{}
for i := 0; i &lt; numRequests; i++ {
...
wg.Add(1)
go func() {
defer wg.Done()
load(responses)
}()
}
wg.Wait()

huangapple
  • 本文由 发表于 2022年11月19日 02:37:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/74494031.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定