英文:
Using Channels in Go to receive Responses and Write to SQL Concurrently
问题
我正在使用Go语言实现一个从外部API获取JSON数据的流水线,处理消息,然后将其发送到SQL数据库。
我尝试并发运行API请求,然后在返回响应后,通过另一个goroutine的load()
函数将其插入到数据库中。
在我的下面的代码中,有时我会在load()
函数中收到log.Printf()
的输出,有时则不会。这表明我可能关闭了一个通道或者没有正确设置通信。
我尝试的模式类似于这样:
package main
import (
"encoding/json"
"io/ioutil"
"log"
"net/http"
"time"
)
type Request struct {
url string
}
type Response struct {
status int
args Args `json:"args"`
headers Headers `json:"headers"`
origin string `json:"origin"`
url string `json:"url"`
}
type Args struct {
}
type Headers struct {
accept string `json:"Accept"`
}
func main() {
start := time.Now()
numRequests := 5
responses := make(chan Response, 5)
defer close(responses)
for i := 0; i < numRequests; i++ {
req := Request{url: "https://httpbin.org/get"}
go func(req *Request) {
resp, err := extract(req)
if err != nil {
log.Fatal("Error extracting data from API")
return
}
// Send response to channel
responses <- resp
}(&req)
// Perform go routine to load data
go load(responses)
}
log.Println("Execution time: ", time.Since(start))
}
func extract(req *Request) (r Response, err error) {
var resp Response
request, err := http.NewRequest("GET", req.url, nil)
if err != nil {
return resp, err
}
request.Header = http.Header{
"accept": {"application/json"},
}
response, err := http.DefaultClient.Do(request)
defer response.Body.Close()
if err != nil {
log.Fatal("Error")
return resp, err
}
// Read response data
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Fatal("Error")
return resp, err
}
json.Unmarshal(body, &resp)
resp.status = response.StatusCode
return resp, nil
}
type Record struct {
origin string
url string
}
func load(ch chan Response) {
// Read response from channel
resp := <-ch
// Process the response data
records := process(resp)
log.Printf("%+v\n", records)
// Load data to db stuff here
}
func process(resp Response) (record Record) {
// Process the response struct as needed to get a record of data to insert to DB
return record
}
英文:
I am working with Go to implement a pipeline of JSON data from an external API, process the message and then send to a SQL database.
I am trying to concurrently run API requests, then after I return a response, I'd like to send it to be inserted into the DB via another goroutine via load()
.
In my below code, sometimes I'll receive my log.Printf()
in the load()
func, other times I won't. Which indicates that I'm likely closing a channel or not properly setting up the communication.
The pattern I am attempting is something like this:
package main
import (
"encoding/json"
"io/ioutil"
"log"
"net/http"
"time"
)
type Request struct {
url string
}
type Response struct {
status int
args Args `json:"args"`
headers Headers `json:"headers"`
origin string `json:"origin"`
url string `json:"url"`
}
type Args struct {
}
type Headers struct {
accept string `json:"Accept"`
}
func main() {
start := time.Now()
numRequests := 5
responses := make(chan Response, 5)
defer close(responses)
for i := 0; i < numRequests; i++ {
req := Request{url: "https://httpbin.org/get"}
go func(req *Request) {
resp, err := extract(req)
if err != nil {
log.Fatal("Error extracting data from API")
return
}
// Send response to channel
responses <- resp
}(&req)
// Perform go routine to load data
go load(responses)
}
log.Println("Execution time: ", time.Since(start))
}
func extract(req *Request) (r Response, err error) {
var resp Response
request, err := http.NewRequest("GET", req.url, nil)
if err != nil {
return resp, err
}
request.Header = http.Header{
"accept": {"application/json"},
}
response, err := http.DefaultClient.Do(request)
defer response.Body.Close()
if err != nil {
log.Fatal("Error")
return resp, err
}
// Read response data
body, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Fatal("Error")
return resp, err
}
json.Unmarshal(body, &resp)
resp.status = response.StatusCode
return resp, nil
}
type Record struct {
origin string
url string
}
func load(ch chan Response) {
// Read response from channel
resp := <-ch
// Process the response data
records := process(resp)
log.Printf("%+v\n", records)
// Load data to db stuff here
}
func process(resp Response) (record Record) {
// Process the response struct as needed to get a record of data to insert to DB
return record
}
答案1
得分: 1
程序在工作完成之前没有对完成进行保护。因此,有时候程序会在goroutine完成之前终止。
为了防止这种情况发生,可以使用WaitGroup:
wg := sync.WaitGroup{}
for i := 0; i < numRequests; i++ {
...
wg.Add(1)
go func() {
defer wg.Done()
load(responses)
}()
}
wg.Wait()
英文:
The program has no protection against completion before the work is done. So sometimes the program terminates before the goroutine can finish.
To prevent that, use a WaitGroup:
wg:=sync.WaitGroup{}
for i := 0; i < numRequests; i++ {
...
wg.Add(1)
go func() {
defer wg.Done()
load(responses)
}()
}
wg.Wait()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论