英文:
Go: channel many slow API queries into single SQL transaction
问题
我想知道以下操作的惯用方式是什么。我有N个慢速API查询和一个数据库连接,我想要一个带有缓冲通道,用于接收响应,并且有一个数据库事务,我将用它来写入数据。我只能想到以下信号量的解决方案,以下是一个示例:
func myFunc(){
//10个并发的API调用
sem := make(chan bool, 10)
//一个并发安全的映射作为缓冲区
var myMap MyConcurrentMap
for i:=0;i<N;i++{
sem<-true
go func(i int){
defer func(){<-sem}()
resp:=slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
myMap.Put(resp)
}(i)
}
for j=0;j<cap(sem);j++{
sem<-true
}
tx,_ := db.Begin()
for data:=range myMap{
tx.Exec("Insert data into database")
}
tx.Commit()
}
我几乎确定有更简单、更清晰和更合适的解决方案,但对我来说似乎很复杂。
编辑:
好吧,我想出了以下解决方案,这样我就不需要缓冲映射了,因此一旦数据到达resp
通道,数据就会被打印出来或者可以用来插入数据库,它可以工作,但我仍然不确定是否一切都没问题,至少没有竞争条件。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
//全局等待组
var wg sync.WaitGroup
func init() {
//只是为了好玩,使rand种子化
rand.Seed(time.Now().UnixNano())
}
//模拟一个慢速API调用
func verySlowAPI(id int) int {
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
return n
}
func main() {
//任务数量
N := 100
//并发级别
concur := 10
//任务通道
tasks := make(chan int, N)
//响应通道
resp := make(chan int, 10)
//10个并发的goroutine
wg.Add(concur)
for i := 1; i <= concur; i++ {
go worker(tasks, resp)
}
//添加任务
for i := 0; i < N; i++ {
tasks <- i
}
//从goroutine收集数据
for i := 0; i < N; i++ {
fmt.Printf("%d\n", <-resp)
}
//关闭任务通道
close(tasks)
//等待完成
wg.Wait()
}
func worker(task chan int, resp chan<- int) {
defer wg.Done()
for {
task, ok := <-task
if !ok {
return
}
n := verySlowAPI(task)
resp <- n
}
}
英文:
I wonder what would be idiomatic way to do as following.
I have N slow API queries, and one database connection, I want to have a buffered channel, where responses will come, and one database transaction which I will use to write data.
I could only come up with semaphore thing as following makeup example:
func myFunc(){
//10 concurrent API calls
sem := make(chan bool, 10)
//A concurrent safe map as buffer
var myMap MyConcurrentMap
for i:=0;i<N;i++{
sem<-true
go func(i int){
defer func(){<-sem}()
resp:=slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
myMap.Put(resp)
}(i)
}
for j=0;j<cap(sem);j++{
sem<-true
}
tx,_ := db.Begin()
for data:=range myMap{
tx.Exec("Insert data into database")
}
tx.Commit()
}
I am nearly sure there is simpler, cleaner and more proper solution, but it is seems complicated to grasp for me.
EDIT:
Well, I come with following solution, this way I do not need the buffer map, so once data comes to resp
channel the data is printed or can be used to insert into a database, it works, I am still not sure if everything OK, at last there are no race.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
//Gloab waitGroup
var wg sync.WaitGroup
func init() {
//just for fun sake, make rand seeded
rand.Seed(time.Now().UnixNano())
}
//Emulate a slow API call
func verySlowAPI(id int) int {
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
return n
}
func main() {
//Amount of tasks
N := 100
//Concurrency level
concur := 10
//Channel for tasks
tasks := make(chan int, N)
//Channel for responses
resp := make(chan int, 10)
//10 concurrent groutinezs
wg.Add(concur)
for i := 1; i <= concur; i++ {
go worker(tasks, resp)
}
//Add tasks
for i := 0; i < N; i++ {
tasks <- i
}
//Collect data from goroutiens
for i := 0; i < N; i++ {
fmt.Printf("%d\n", <-resp)
}
//close the tasks channel
close(tasks)
//wait till finish
wg.Wait()
}
func worker(task chan int, resp chan<- int) {
defer wg.Done()
for {
task, ok := <-task
if !ok {
return
}
n := verySlowAPI(task)
resp <- n
}
}
答案1
得分: 2
不需要使用通道来实现信号量,sync.WaitGroup 用于等待一组例程完成。
如果你想使用通道来限制吞吐量,最好使用工作池,并使用通道将任务传递给工作线程:
type job struct {
i int
}
func myFunc(N int) {
// 根据需要调整任务总数
work := make(chan job, 10)
// res 是 slowAPICall 返回的任意类型
results := make(chan res, 10)
resBuff := make([]res, 0, N)
wg := new(sync.WaitGroup)
// 10 个并发的 API 调用
for i = 0; i < 10; i++ {
wg.Add(1)
go func() {
for j := range work {
resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d", j.i))
results <- resp
}
wg.Done()
}()
}
go func() {
for r := range results {
resBuff = append(resBuff, r)
}
}
for i = 0; i < N; i++ {
work <- job{i}
}
close(work)
wg.Wait()
close(results)
}
英文:
There's no need to use channels for a semaphore, sync.WaitGroup was made for waiting for a set of routines to complete.
If you're using the channel to limit throughput, you're better off with a worker pool, and using the channel to pass jobs to the workers:
type job struct {
i int
}
func myFunc(N int) {
// Adjust as needed for total number of tasks
work := make(chan job, 10)
// res being whatever type slowAPICall returns
results := make(chan res, 10)
resBuff := make([]res, 0, N)
wg := new(sync.WaitGroup)
// 10 concurrent API calls
for i = 0; i < 10; i++ {
wg.Add(1)
go func() {
for j := range work {
resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d", j.i))
results <- resp
}
wg.Done()
}()
}
go func() {
for r := range results {
resBuff = append(resBuff, r)
}
}
for i = 0; i < N; i++ {
work <- job{i}
}
close(work)
wg.Wait()
close(results)
}
答案2
得分: 1
也许这对你有用。现在你可以摆脱并发映射。以下是一段代码片段:
func myFunc() {
//10个并发的API调用
sem := make(chan bool, 10)
respCh := make(chan YOUR_RESP_TYPE, 10)
var responses []YOUR_RESP_TYPE
for i := 0; i < N; i++ {
sem <- true
go func(i int) {
defer func() {
<-sem
}()
resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
respCh <- resp
}(i)
}
respCollected := make(chan struct{})
go func() {
for i := 0; i < N; i++ {
responses = append(responses, <-respCh)
}
close(respCollected)
}()
<-respCollected
tx,_ := db.Begin()
for _, data := range responses {
tx.Exec("将数据插入数据库")
}
tx.Commit()
}
然后我们需要使用另一个goroutine,从响应通道中收集所有响应并存储在某个切片或映射中。
英文:
Maybe this will work for you. Now you can get rid of your concurrent map. Here is a code snippet:
func myFunc() {
//10 concurrent API calls
sem := make(chan bool, 10)
respCh := make(chan YOUR_RESP_TYPE, 10)
var responses []YOUR_RESP_TYPE
for i := 0; i < N; i++ {
sem <- true
go func(i int) {
defer func() {
<-sem
}()
resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
respCh <- resp
}(i)
}
respCollected := make(chan struct{})
go func() {
for i := 0; i < N; i++ {
responses = append(responses, <-respCh)
}
close(respCollected)
}()
<-respCollected
tx,_ := db.Begin()
for _, data := range responses {
tx.Exec("Insert data into database")
}
tx.Commit()
}
Than we need to use one more goroutine that will collect all responses in some slice or map from a response channel.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论