将多个慢速API查询通道合并为单个SQL事务

huangapple go评论90阅读模式
英文:

Go: channel many slow API queries into single SQL transaction

问题

我想知道以下操作的惯用方式是什么。我有N个慢速API查询和一个数据库连接,我想要一个带有缓冲通道,用于接收响应,并且有一个数据库事务,我将用它来写入数据。我只能想到以下信号量的解决方案,以下是一个示例:

func myFunc(){
  //10个并发的API调用
  sem := make(chan bool, 10) 
  //一个并发安全的映射作为缓冲区
  var myMap  MyConcurrentMap 

  for i:=0;i<N;i++{
    sem<-true
    go func(i int){
      defer func(){<-sem}()
      resp:=slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
      myMap.Put(resp)
    }(i)
  }

  for j=0;j<cap(sem);j++{
    sem<-true
  }
  tx,_ := db.Begin()    
  for data:=range myMap{
   tx.Exec("Insert data into database")
  }
  tx.Commit()
}

我几乎确定有更简单、更清晰和更合适的解决方案,但对我来说似乎很复杂。

编辑:
好吧,我想出了以下解决方案,这样我就不需要缓冲映射了,因此一旦数据到达resp通道,数据就会被打印出来或者可以用来插入数据库,它可以工作,但我仍然不确定是否一切都没问题,至少没有竞争条件。

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

//全局等待组
var wg sync.WaitGroup

func init() {
	//只是为了好玩,使rand种子化
	rand.Seed(time.Now().UnixNano())
}

//模拟一个慢速API调用
func verySlowAPI(id int) int {
	n := rand.Intn(5)
	time.Sleep(time.Duration(n) * time.Second)
	return n
}

func main() {
	//任务数量
	N := 100

	//并发级别
	concur := 10

	//任务通道
	tasks := make(chan int, N)

	//响应通道
	resp := make(chan int, 10)

	//10个并发的goroutine
	wg.Add(concur)
	for i := 1; i <= concur; i++ {
		go worker(tasks, resp)
	}

	//添加任务
	for i := 0; i < N; i++ {
		tasks <- i
	}

	//从goroutine收集数据
	for i := 0; i < N; i++ {
		fmt.Printf("%d\n", <-resp)
	}

	//关闭任务通道
	close(tasks)

	//等待完成
	wg.Wait()

}

func worker(task chan int, resp chan<- int) {
	defer wg.Done()
	for {
		task, ok := <-task
		if !ok {
			return
		}
		n := verySlowAPI(task)
		resp <- n
	}
}
英文:

I wonder what would be idiomatic way to do as following.
I have N slow API queries, and one database connection, I want to have a buffered channel, where responses will come, and one database transaction which I will use to write data.
I could only come up with semaphore thing as following makeup example:

    func myFunc(){
//10 concurrent API calls
sem := make(chan bool, 10) 
//A concurrent safe map as buffer
var myMap  MyConcurrentMap 
for i:=0;i&lt;N;i++{
sem&lt;-true
go func(i int){
defer func(){&lt;-sem}()
resp:=slowAPICall(fmt.Sprintf(&quot;http://slow-api.me?%d&quot;,i))
myMap.Put(resp)
}(i)
}
for j=0;j&lt;cap(sem);j++{
sem&lt;-true
}
tx,_ := db.Begin()    
for data:=range myMap{
tx.Exec(&quot;Insert data into database&quot;)
}
tx.Commit()
}

I am nearly sure there is simpler, cleaner and more proper solution, but it is seems complicated to grasp for me.

EDIT:
Well, I come with following solution, this way I do not need the buffer map, so once data comes to resp channel the data is printed or can be used to insert into a database, it works, I am still not sure if everything OK, at last there are no race.

package main
import (
&quot;fmt&quot;
&quot;math/rand&quot;
&quot;sync&quot;
&quot;time&quot;
)
//Gloab waitGroup
var wg sync.WaitGroup
func init() {
//just for fun sake, make rand seeded
rand.Seed(time.Now().UnixNano())
}
//Emulate a slow API call
func verySlowAPI(id int) int {
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
return n
}
func main() {
//Amount of tasks
N := 100
//Concurrency level
concur := 10
//Channel for tasks
tasks := make(chan int, N)
//Channel for responses
resp := make(chan int, 10)
//10 concurrent groutinezs
wg.Add(concur) 
for i := 1; i &lt;= concur; i++ {
go worker(tasks, resp)
}
//Add tasks
for i := 0; i &lt; N; i++ {
tasks &lt;- i
}
//Collect data from goroutiens
for i := 0; i &lt; N; i++ {
fmt.Printf(&quot;%d\n&quot;, &lt;-resp)
}
//close the tasks channel
close(tasks)
//wait till finish
wg.Wait()
}
func worker(task chan int, resp chan&lt;- int) {
defer wg.Done()
for {
task, ok := &lt;-task
if !ok {
return
}
n := verySlowAPI(task)
resp &lt;- n
}
}

答案1

得分: 2

不需要使用通道来实现信号量,sync.WaitGroup 用于等待一组例程完成。

如果你想使用通道来限制吞吐量,最好使用工作池,并使用通道将任务传递给工作线程:

type job struct {
    i int
}

func myFunc(N int) {
    // 根据需要调整任务总数
    work := make(chan job, 10)
    // res 是 slowAPICall 返回的任意类型
    results := make(chan res, 10)
    resBuff := make([]res, 0, N)

    wg := new(sync.WaitGroup)

    // 10 个并发的 API 调用
    for i = 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            for j := range work {
                resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d", j.i))
                results <- resp
            }
            wg.Done()
        }()
    }

    go func() {
        for r := range results {
            resBuff = append(resBuff, r)
        }
    }

    for i = 0; i < N; i++ {
        work <- job{i}
    }
    close(work)

    wg.Wait()
    close(results)
}
英文:

There's no need to use channels for a semaphore, sync.WaitGroup was made for waiting for a set of routines to complete.

If you're using the channel to limit throughput, you're better off with a worker pool, and using the channel to pass jobs to the workers:

type job struct {
i int
}
func myFunc(N int) {
// Adjust as needed for total number of tasks
work := make(chan job, 10)
// res being whatever type slowAPICall returns
results := make(chan res, 10)
resBuff := make([]res, 0, N)
wg := new(sync.WaitGroup)
// 10 concurrent API calls
for i = 0; i &lt; 10; i++ {
wg.Add(1)
go func() {
for j := range work {
resp := slowAPICall(fmt.Sprintf(&quot;http://slow-api.me?%d&quot;, j.i))
results &lt;- resp
}
wg.Done()
}()
}
go func() {
for r := range results {
resBuff = append(resBuff, r)
}
}
for i = 0; i &lt; N; i++ {
work &lt;- job{i}
}
close(work)
wg.Wait()
close(results)
}

答案2

得分: 1

也许这对你有用。现在你可以摆脱并发映射。以下是一段代码片段:

func myFunc() {
    //10个并发的API调用
    sem := make(chan bool, 10)
    respCh := make(chan YOUR_RESP_TYPE, 10)
    var responses []YOUR_RESP_TYPE

    for i := 0; i < N; i++ {
        sem <- true
        go func(i int) {
            defer func() {
                <-sem
            }()
            resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
            respCh <- resp
        }(i)
    }

    respCollected := make(chan struct{})
    go func() {
        for i := 0; i < N; i++ {
            responses = append(responses, <-respCh)
        }
        close(respCollected)
    }()

    <-respCollected
    tx,_ := db.Begin()
    for _, data := range responses {
        tx.Exec("将数据插入数据库")
    }
    tx.Commit()
}

然后我们需要使用另一个goroutine,从响应通道中收集所有响应并存储在某个切片或映射中。

英文:

Maybe this will work for you. Now you can get rid of your concurrent map. Here is a code snippet:

func myFunc() {
//10 concurrent API calls
sem := make(chan bool, 10)
respCh := make(chan YOUR_RESP_TYPE, 10)
var responses []YOUR_RESP_TYPE
for i := 0; i &lt; N; i++ {
sem &lt;- true
go func(i int) {
defer func() {
&lt;-sem
}()
resp := slowAPICall(fmt.Sprintf(&quot;http://slow-api.me?%d&quot;,i))
respCh &lt;- resp
}(i)
}
respCollected := make(chan struct{})
go func() {
for i := 0; i &lt; N; i++ {
responses = append(responses, &lt;-respCh)
}
close(respCollected)
}()
&lt;-respCollected
tx,_ := db.Begin()
for _, data := range responses {
tx.Exec(&quot;Insert data into database&quot;)
}
tx.Commit()
}

Than we need to use one more goroutine that will collect all responses in some slice or map from a response channel.

huangapple
  • 本文由 发表于 2017年7月25日 01:57:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/45287088.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定