英文:
Synchronizing requests between two Separate Rate Limited Endpoints
问题
我正在使用一些第三方API,每个API都有自己的速率限制。Endpoint 1 的速率限制是每秒10次,Endpoint 2 的速率限制是每秒20次。
我需要通过第一个端点处理我的数据,该端点将返回一个对象数组(2-3000个对象之间)。然后,我需要将每个对象中的一些数据发送到第二个端点,同时遵守第二个端点的速率限制。
我计划使用go routines以每次10个的批量发送第一个端点的请求,确保如果所有10个请求在<1秒内完成,我不会在该1秒窗口内发送更多请求。
最终,我希望能够限制每个端点同时发送的并发响应数量。特别是如果我必须为由于服务器返回500+响应而失败的请求构建重试机制等。
为了提出问题,我使用httpbin请求来模拟下面的场景:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
)
type HttpBinGetRequest struct {
url string
}
type HttpBinGetResponse struct {
Uuid string `json:"uuid"`
StatusCode int
}
type HttpBinPostRequest struct {
url string
uuid string // Item to post to API
}
type HttpBinPostResponse struct {
Data string `json:"data"`
StatusCode int
}
func main() {
// 准备500个GET请求
var requests []*HttpBinGetRequest
for i := 0; i < 500; i++ {
uri := "https://httpbin.org/uuid"
request := &HttpBinGetRequest{
url: uri,
}
requests = append(requests, request)
}
// 创建GET端点的信号量和速率限制
getSemaphore := make(chan struct{}, 10)
getRate := make(chan struct{}, 10)
for i := 0; i < cap(getRate); i++ {
getRate <- struct{}{}
}
go func() {
// 每1/10秒触发一次
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := <-getRate
if !ok {
return
}
}
}()
// 发送GET请求以获取随机UUID
var wg sync.WaitGroup
for _, request := range requests {
wg.Add(1)
// Go函数用于发送请求并接收响应
go func(r *HttpBinGetRequest) {
defer wg.Done()
// 检查速率限制器,如果为空则阻塞
getRate <- struct{}{}
// 在函数完成时添加一个令牌到信号量
getSemaphore <- struct{}{}
// 函数完成后移除令牌
defer func() {
<-getSemaphore
}()
resp, _ := get(r)
fmt.Printf("%+v\n", resp)
}(request)
}
wg.Wait()
// 我需要添加代码,从上面的循环中获取响应数据
// 然后将UUID发送到自己的go routines进行POST请求,遵守类似上面的模式
// postSemaphore := make(chan struct{}, 20)
// postRate := make(chan struct{}, 20)
// for i := 0; i < cap(postRate); i++ {
// postRate <- struct{}{}
// }
}
func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {
httpResp := &HttpBinGetResponse{}
client := &http.Client{}
req, err := http.NewRequest("GET", hbgr.url, nil)
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
fmt.Println("error getting response")
return httpResp, err
}
// 读取响应
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}
// 发送数据到httpbin的方法
func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {
httpResp := &HttpBinPostResponse{}
client := &http.Client{}
req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println("error getting response")
return httpResp, err
}
if resp.StatusCode == 429 {
fmt.Println(resp.Header.Get("Retry-After"))
}
// 读取响应
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
fmt.Printf("%+v", httpResp)
return httpResp, nil
}
以上是您提供的代码的翻译。
英文:
I am utilizing some third party APIs that each have their own rate limits. Endpoint 1 has a rate limit of 10/s and Endpoint2 has a rate limit of 20/s.
I need to process my data through endpoint 1 which will return an array of objects (between 2-3000 objects). I then need to take each object and send some data within to the 2nd endpoint while respecting that 2nd endpoint's rate limit.
I plan on sending the first endpoint's request in go routines via a batch of 10 at a time, ensuring that if all 10 requests finish in <1s, I do not exceed by sending more within that 1 second window.
Ultimately, I want to be able to limit how many concurrent responses are going out at a time per endpoint. Especially if I have to build in retries for failed requests due to server 500+ responses, etc.
For purpose of question I am using httpbin requests to simulate the scenario below:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
)
type HttpBinGetRequest struct {
url string
}
type HttpBinGetResponse struct {
Uuid string `json:"uuid"`
StatusCode int
}
type HttpBinPostRequest struct {
url string
uuid string // Item to post to API
}
type HttpBinPostResponse struct {
Data string `json:"data"`
StatusCode int
}
func main() {
// Prepare GET requests for 500 requests
var requests []*HttpBinGetRequest
for i := 0; i < 500; i++ {
uri := "https://httpbin.org/uuid"
request := &HttpBinGetRequest{
url: uri,
}
requests = append(requests, request)
}
// Create semaphore and rate limit for the GET endpoint
getSemaphore := make(chan struct{}, 10)
getRate := make(chan struct{}, 10)
for i := 0; i < cap(getRate); i++ {
getRate <- struct{}{}
}
go func() {
// ticker corresponding to 1/10th of a second
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := <-getRate
if !ok {
return
}
}
}()
// Send our GET requests to obtain a random UUID
var wg sync.WaitGroup
for _, request := range requests {
wg.Add(1)
// Go func to make request and receive the response
go func(r *HttpBinGetRequest) {
defer wg.Done()
// Check the rate limiter and block if it is empty
getRate <- struct{}{}
// Add a token to the semaphore
getSemaphore <- struct{}{}
// Remove token when function is complete
defer func() {
<-getSemaphore
}()
resp, _ := get(r)
fmt.Printf("%+v\n", resp)
}(request)
}
wg.Wait()
// I need to add code that obtains the response data from the above for loop
// then sends the UUID it to its own go routines for a POST request, following a similar pattern above
// To not violate the rate limit of the second endpoint which is 20 calls per second
// postSemaphore := make(chan struct{}, 20)
// postRate := make(chan struct{}, 20)
// for i := 0; i < cap(postRate); i++ {
// postRate <- struct{}{}
// }
}
func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {
httpResp := &HttpBinGetResponse{}
client := &http.Client{}
req, err := http.NewRequest("GET", hbgr.url, nil)
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
fmt.Println("error getting response")
return httpResp, err
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}
// Method to post data to httpbin
func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {
httpResp := &HttpBinPostResponse{}
client := &http.Client{}
req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println("error getting response")
return httpResp, err
}
if resp.StatusCode == 429 {
fmt.Println(resp.Header.Get("Retry-After"))
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
fmt.Printf("%+v", httpResp)
return httpResp, nil
}
答案1
得分: 1
这是一个生产者/消费者模式。你可以使用chan来连接它们。
关于速率限制器,我建议使用golang.org/x/time/rate
包。
由于我们决定使用chan来连接生产者和消费者,所以自然而然地将失败的任务发送到同一个chan中,以便消费者可以再次尝试。
我已经将逻辑封装到了Scheduler[T]
类型中。请参考下面的示例。请注意,该示例只是为了快速说明思路而编写的,并没有经过全面测试。
package main
import (
"context"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"net/http/httptest"
"sort"
"sync"
"time"
"golang.org/x/time/rate"
)
type task[T any] struct {
param T
failedCount int
}
type Scheduler[T any] struct {
name string
limit int
maxTries int
wg sync.WaitGroup
tasks chan task[T]
action func(param T) error
}
// NewScheduler creates a scheduler that runs the action with the specified rate limit.
// It will retry the action if the action returns a non-nil error.
func NewScheduler[T any](name string, limit, maxTries, chanSize int, action func(param T) error) *Scheduler[T] {
return &Scheduler[T]{
name: name,
limit: limit,
maxTries: maxTries,
tasks: make(chan task[T], chanSize),
action: action,
}
}
func (s *Scheduler[T]) AddTask(param T) {
s.wg.Add(1)
s.tasks <- task[T]{param: param}
}
func (s *Scheduler[T]) retryLater(t task[T]) {
s.wg.Add(1)
s.tasks <- t
}
func (s *Scheduler[T]) Run() {
lim := rate.NewLimiter(rate.Limit(s.limit), 1)
for t := range s.tasks {
t := t
if err := lim.Wait(context.Background()); err != nil {
log.Fatalf("wait: %s", err)
return
}
go func() {
defer s.wg.Done()
err := s.action(t.param)
if err != nil {
log.Printf("task %s, param %v failed: %v", s.name, t.param, err)
t.failedCount++
if t.failedCount == s.maxTries {
log.Printf("task %s, param %v failed with %d tries", s.name, t.param, s.maxTries)
return
}
s.retryLater(t)
}
}()
}
}
func (s *Scheduler[T]) Wait() {
s.wg.Wait()
close(s.tasks)
}
func main() {
s := &server{}
ts := httptest.NewServer(s)
defer ts.Close()
schedulerPost := NewScheduler("post", 20, 3, 1, func(param string) error {
return post(fmt.Sprintf("%s/%s", ts.URL, param))
})
go schedulerPost.Run()
schedulerGet := NewScheduler("get", 10, 3, 1, func(param int) error {
id, err := get(fmt.Sprintf("%s/%d", ts.URL, param))
if err != nil {
return err
}
schedulerPost.AddTask(id)
return nil
})
go schedulerGet.Run()
for i := 0; i < 100; i++ {
schedulerGet.AddTask(i)
}
schedulerGet.Wait()
schedulerPost.Wait()
s.printStats()
}
func get(url string) (string, error) {
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}
func post(url string) error {
resp, err := http.Post(url, "", nil)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return nil
}
type server struct {
gMu sync.Mutex
gets []int64
pMu sync.Mutex
posts []int64
}
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Printf("%s: %s", r.Method, r.URL.Path)
// collect request stats.
if r.Method == http.MethodGet {
s.gMu.Lock()
s.gets = append(s.gets, time.Now().UnixMilli())
s.gMu.Unlock()
} else {
s.pMu.Lock()
s.posts = append(s.posts, time.Now().UnixMilli())
s.pMu.Unlock()
}
n := rand.Intn(1000)
// simulate latency.
time.Sleep(time.Duration(n) * time.Millisecond)
// simulate errors.
if n%10 == 0 {
w.WriteHeader(http.StatusInternalServerError)
return
}
if r.Method == http.MethodGet {
fmt.Fprintf(w, "%s", r.URL.Path[1:])
return
}
}
func (s *server) printStats() {
log.Printf("GETS (total: %d):\n", len(s.gets))
printStats(s.gets)
log.Printf("POSTS (total: %d):\n", len(s.posts))
printStats(s.posts)
}
func printStats(ts []int64) {
sort.Slice(ts, func(i, j int) bool {
return ts[i] < ts[j]
})
count := 0
to := ts[0] + 1000
for i := 0; i < len(ts); i++ {
if ts[i] < to {
count++
} else {
fmt.Printf(" %d: %d\n", to, count)
i-- // push back the current item
count = 0
to += 1000
}
}
if count > 0 {
fmt.Printf(" %d: %d\n", to, count)
}
}
输出结果如下:
...
2023/03/25 21:03:30 GETS (total: 112):
1679749398998: 10
1679749399998: 10
1679749400998: 10
1679749401998: 10
1679749402998: 10
1679749403998: 10
1679749404998: 10
1679749405998: 10
1679749406998: 10
1679749407998: 10
1679749408998: 10
1679749409998: 2
2023/03/25 21:03:30 POSTS (total: 111):
1679749399079: 8
1679749400079: 8
1679749401079: 12
1679749402079: 8
1679749403079: 10
1679749404079: 9
1679749405079: 9
1679749406079: 8
1679749407079: 14
1679749408079: 12
1679749409079: 9
1679749410079: 4
英文:
This is a producer/consumer pattern. You can use a chan to connect them.
Regarding the rate limiter, I would use the package golang.org/x/time/rate
.
Since we have decided to use a chan to connect the producer and the consumer, it's natural to send the failed tasks to the same chan so that the consumer can try it again.
I have encapsulated the logic into the type Scheduler[T]
. See the demo below. Please note that the demo is written in a hurry to illustrate the idea only. It's not tested thoroughly.
package main
import (
"context"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"net/http/httptest"
"sort"
"sync"
"time"
"golang.org/x/time/rate"
)
type task[T any] struct {
param T
failedCount int
}
type Scheduler[T any] struct {
name string
limit int
maxTries int
wg sync.WaitGroup
tasks chan task[T]
action func(param T) error
}
// NewScheduler creates a scheduler that runs the action with the specified rate limit.
// It will retry the action if the action returns a non-nil error.
func NewScheduler[T any](name string, limit, maxTries, chanSize int, action func(param T) error) *Scheduler[T] {
return &Scheduler[T]{
name: name,
limit: limit,
maxTries: maxTries,
tasks: make(chan task[T], chanSize),
action: action,
}
}
func (s *Scheduler[T]) AddTask(param T) {
s.wg.Add(1)
s.tasks <- task[T]{param: param}
}
func (s *Scheduler[T]) retryLater(t task[T]) {
s.wg.Add(1)
s.tasks <- t
}
func (s *Scheduler[T]) Run() {
lim := rate.NewLimiter(rate.Limit(s.limit), 1)
for t := range s.tasks {
t := t
if err := lim.Wait(context.Background()); err != nil {
log.Fatalf("wait: %s", err)
return
}
go func() {
defer s.wg.Done()
err := s.action(t.param)
if err != nil {
log.Printf("task %s, param %v failed: %v", s.name, t.param, err)
t.failedCount++
if t.failedCount == s.maxTries {
log.Printf("task %s, param %v failed with %d tries", s.name, t.param, s.maxTries)
return
}
s.retryLater(t)
}
}()
}
}
func (s *Scheduler[T]) Wait() {
s.wg.Wait()
close(s.tasks)
}
func main() {
s := &server{}
ts := httptest.NewServer(s)
defer ts.Close()
schedulerPost := NewScheduler("post", 20, 3, 1, func(param string) error {
return post(fmt.Sprintf("%s/%s", ts.URL, param))
})
go schedulerPost.Run()
schedulerGet := NewScheduler("get", 10, 3, 1, func(param int) error {
id, err := get(fmt.Sprintf("%s/%d", ts.URL, param))
if err != nil {
return err
}
schedulerPost.AddTask(id)
return nil
})
go schedulerGet.Run()
for i := 0; i < 100; i++ {
schedulerGet.AddTask(i)
}
schedulerGet.Wait()
schedulerPost.Wait()
s.printStats()
}
func get(url string) (string, error) {
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}
func post(url string) error {
resp, err := http.Post(url, "", nil)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return nil
}
type server struct {
gMu sync.Mutex
gets []int64
pMu sync.Mutex
posts []int64
}
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Printf("%s: %s", r.Method, r.URL.Path)
// collect request stats.
if r.Method == http.MethodGet {
s.gMu.Lock()
s.gets = append(s.gets, time.Now().UnixMilli())
s.gMu.Unlock()
} else {
s.pMu.Lock()
s.posts = append(s.posts, time.Now().UnixMilli())
s.pMu.Unlock()
}
n := rand.Intn(1000)
// simulate latency.
time.Sleep(time.Duration(n) * time.Millisecond)
// simulate errors.
if n%10 == 0 {
w.WriteHeader(http.StatusInternalServerError)
return
}
if r.Method == http.MethodGet {
fmt.Fprintf(w, "%s", r.URL.Path[1:])
return
}
}
func (s *server) printStats() {
log.Printf("GETS (total: %d):\n", len(s.gets))
printStats(s.gets)
log.Printf("POSTS (total: %d):\n", len(s.posts))
printStats(s.posts)
}
func printStats(ts []int64) {
sort.Slice(ts, func(i, j int) bool {
return ts[i] < ts[j]
})
count := 0
to := ts[0] + 1000
for i := 0; i < len(ts); i++ {
if ts[i] < to {
count++
} else {
fmt.Printf(" %d: %d\n", to, count)
i-- // push back the current item
count = 0
to += 1000
}
}
if count > 0 {
fmt.Printf(" %d: %d\n", to, count)
}
}
The output looks like this:
...
2023/03/25 21:03:30 GETS (total: 112):
1679749398998: 10
1679749399998: 10
1679749400998: 10
1679749401998: 10
1679749402998: 10
1679749403998: 10
1679749404998: 10
1679749405998: 10
1679749406998: 10
1679749407998: 10
1679749408998: 10
1679749409998: 2
2023/03/25 21:03:30 POSTS (total: 111):
1679749399079: 8
1679749400079: 8
1679749401079: 12
1679749402079: 8
1679749403079: 10
1679749404079: 9
1679749405079: 9
1679749406079: 8
1679749407079: 14
1679749408079: 12
1679749409079: 9
1679749410079: 4
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论