在两个独立的速率限制端点之间同步请求

huangapple go评论120阅读模式
英文:

Synchronizing requests between two Separate Rate Limited Endpoints

问题

我正在使用一些第三方API,每个API都有自己的速率限制。Endpoint 1 的速率限制是每秒10次,Endpoint 2 的速率限制是每秒20次。

我需要通过第一个端点处理我的数据,该端点将返回一个对象数组(2-3000个对象之间)。然后,我需要将每个对象中的一些数据发送到第二个端点,同时遵守第二个端点的速率限制。

我计划使用go routines以每次10个的批量发送第一个端点的请求,确保如果所有10个请求在<1秒内完成,我不会在该1秒窗口内发送更多请求。

最终,我希望能够限制每个端点同时发送的并发响应数量。特别是如果我必须为由于服务器返回500+响应而失败的请求构建重试机制等。

为了提出问题,我使用httpbin请求来模拟下面的场景:

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"sync"
	"time"
)

type HttpBinGetRequest struct {
	url string
}

type HttpBinGetResponse struct {
	Uuid       string `json:"uuid"`
	StatusCode int
}

type HttpBinPostRequest struct {
	url  string
	uuid string // Item to post to API
}

type HttpBinPostResponse struct {
	Data       string `json:"data"`
	StatusCode int
}

func main() {

	// 准备500个GET请求
	var requests []*HttpBinGetRequest
	for i := 0; i < 500; i++ {
		uri := "https://httpbin.org/uuid"
		request := &HttpBinGetRequest{
			url: uri,
		}
		requests = append(requests, request)
	}

	// 创建GET端点的信号量和速率限制
	getSemaphore := make(chan struct{}, 10)
	getRate := make(chan struct{}, 10)
	for i := 0; i < cap(getRate); i++ {
		getRate <- struct{}{}
	}

	go func() {
		// 每1/10秒触发一次
		ticker := time.NewTicker(100 * time.Millisecond)
		defer ticker.Stop()
		for range ticker.C {
			_, ok := <-getRate
			if !ok {
				return
			}
		}
	}()

	// 发送GET请求以获取随机UUID
	var wg sync.WaitGroup
	for _, request := range requests {
		wg.Add(1)
		// Go函数用于发送请求并接收响应
		go func(r *HttpBinGetRequest) {
			defer wg.Done()

			// 检查速率限制器,如果为空则阻塞
			getRate <- struct{}{}

			// 在函数完成时添加一个令牌到信号量
			getSemaphore <- struct{}{}

			// 函数完成后移除令牌
			defer func() {
				<-getSemaphore
			}()
			resp, _ := get(r)
			fmt.Printf("%+v\n", resp)
		}(request)
	}
	wg.Wait()

	// 我需要添加代码,从上面的循环中获取响应数据
	// 然后将UUID发送到自己的go routines进行POST请求,遵守类似上面的模式
	// postSemaphore := make(chan struct{}, 20)
	// postRate := make(chan struct{}, 20)
	// for i := 0; i < cap(postRate); i++ {
	// 	postRate <- struct{}{}
	// }
}

func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {

	httpResp := &HttpBinGetResponse{}
	client := &http.Client{}
	req, err := http.NewRequest("GET", hbgr.url, nil)
	if err != nil {
		fmt.Println("error making request")
		return httpResp, err
	}

	req.Header = http.Header{
		"accept": {"application/json"},
	}

	resp, err := client.Do(req)
	if err != nil {
		fmt.Println(err)
		fmt.Println("error getting response")
		return httpResp, err
	}

	// 读取响应
	body, err := io.ReadAll(resp.Body)
	if err != nil {
		fmt.Println("error reading response body")
		return httpResp, err
	}
	json.Unmarshal(body, &httpResp)
	httpResp.StatusCode = resp.StatusCode
	return httpResp, nil
}

// 发送数据到httpbin的方法
func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {

	httpResp := &HttpBinPostResponse{}
	client := &http.Client{}
	req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
	if err != nil {
		fmt.Println("error making request")
		return httpResp, err
	}

	req.Header = http.Header{
		"accept": {"application/json"},
	}

	resp, err := client.Do(req)
	if err != nil {
		fmt.Println("error getting response")
		return httpResp, err
	}

	if resp.StatusCode == 429 {
		fmt.Println(resp.Header.Get("Retry-After"))
	}

	// 读取响应
	body, err := io.ReadAll(resp.Body)
	if err != nil {
		fmt.Println("error reading response body")
		return httpResp, err
	}
	json.Unmarshal(body, &httpResp)
	httpResp.StatusCode = resp.StatusCode
	fmt.Printf("%+v", httpResp)
	return httpResp, nil
}

以上是您提供的代码的翻译。

英文:

I am utilizing some third party APIs that each have their own rate limits. Endpoint 1 has a rate limit of 10/s and Endpoint2 has a rate limit of 20/s.

I need to process my data through endpoint 1 which will return an array of objects (between 2-3000 objects). I then need to take each object and send some data within to the 2nd endpoint while respecting that 2nd endpoint's rate limit.

I plan on sending the first endpoint's request in go routines via a batch of 10 at a time, ensuring that if all 10 requests finish in <1s, I do not exceed by sending more within that 1 second window.

Ultimately, I want to be able to limit how many concurrent responses are going out at a time per endpoint. Especially if I have to build in retries for failed requests due to server 500+ responses, etc.

For purpose of question I am using httpbin requests to simulate the scenario below:

package main
import (
&quot;bytes&quot;
&quot;encoding/json&quot;
&quot;fmt&quot;
&quot;io&quot;
&quot;net/http&quot;
&quot;sync&quot;
&quot;time&quot;
)
type HttpBinGetRequest struct {
url string
}
type HttpBinGetResponse struct {
Uuid       string `json:&quot;uuid&quot;`
StatusCode int
}
type HttpBinPostRequest struct {
url  string
uuid string // Item to post to API
}
type HttpBinPostResponse struct {
Data       string `json:&quot;data&quot;`
StatusCode int
}
func main() {
// Prepare GET requests for 500 requests
var requests []*HttpBinGetRequest
for i := 0; i &lt; 500; i++ {
uri := &quot;https://httpbin.org/uuid&quot;
request := &amp;HttpBinGetRequest{
url: uri,
}
requests = append(requests, request)
}
// Create semaphore and rate limit for the GET endpoint
getSemaphore := make(chan struct{}, 10)
getRate := make(chan struct{}, 10)
for i := 0; i &lt; cap(getRate); i++ {
getRate &lt;- struct{}{}
}
go func() {
// ticker corresponding to 1/10th of a second
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := &lt;-getRate
if !ok {
return
}
}
}()
// Send our GET requests to obtain a random UUID
var wg sync.WaitGroup
for _, request := range requests {
wg.Add(1)
// Go func to make request and receive the response
go func(r *HttpBinGetRequest) {
defer wg.Done()
// Check the rate limiter and block if it is empty
getRate &lt;- struct{}{}
// Add a token to the semaphore
getSemaphore &lt;- struct{}{}
// Remove token when function is complete
defer func() {
&lt;-getSemaphore
}()
resp, _ := get(r)
fmt.Printf(&quot;%+v\n&quot;, resp)
}(request)
}
wg.Wait()
// I need to add code that obtains the response data from the above for loop
// then sends the UUID it to its own go routines for a POST request, following a similar pattern above
// To not violate the rate limit of the second endpoint which is 20 calls per second
// postSemaphore := make(chan struct{}, 20)
// postRate := make(chan struct{}, 20)
// for i := 0; i &lt; cap(postRate); i++ {
// 	postRate &lt;- struct{}{}
// }
}
func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {
httpResp := &amp;HttpBinGetResponse{}
client := &amp;http.Client{}
req, err := http.NewRequest(&quot;GET&quot;, hbgr.url, nil)
if err != nil {
fmt.Println(&quot;error making request&quot;)
return httpResp, err
}
req.Header = http.Header{
&quot;accept&quot;: {&quot;application/json&quot;},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
fmt.Println(&quot;error getting response&quot;)
return httpResp, err
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(&quot;error reading response body&quot;)
return httpResp, err
}
json.Unmarshal(body, &amp;httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}
// Method to post data to httpbin
func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {
httpResp := &amp;HttpBinPostResponse{}
client := &amp;http.Client{}
req, err := http.NewRequest(&quot;POST&quot;, hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
if err != nil {
fmt.Println(&quot;error making request&quot;)
return httpResp, err
}
req.Header = http.Header{
&quot;accept&quot;: {&quot;application/json&quot;},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(&quot;error getting response&quot;)
return httpResp, err
}
if resp.StatusCode == 429 {
fmt.Println(resp.Header.Get(&quot;Retry-After&quot;))
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(&quot;error reading response body&quot;)
return httpResp, err
}
json.Unmarshal(body, &amp;httpResp)
httpResp.StatusCode = resp.StatusCode
fmt.Printf(&quot;%+v&quot;, httpResp)
return httpResp, nil
}

答案1

得分: 1

这是一个生产者/消费者模式。你可以使用chan来连接它们。

关于速率限制器,我建议使用golang.org/x/time/rate包。

由于我们决定使用chan来连接生产者和消费者,所以自然而然地将失败的任务发送到同一个chan中,以便消费者可以再次尝试。

我已经将逻辑封装到了Scheduler[T]类型中。请参考下面的示例。请注意,该示例只是为了快速说明思路而编写的,并没有经过全面测试。

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"math/rand"
	"net/http"
	"net/http/httptest"
	"sort"
	"sync"
	"time"

	"golang.org/x/time/rate"
)

type task[T any] struct {
	param       T
	failedCount int
}

type Scheduler[T any] struct {
	name     string
	limit    int
	maxTries int
	wg       sync.WaitGroup
	tasks    chan task[T]
	action   func(param T) error
}

// NewScheduler creates a scheduler that runs the action with the specified rate limit.
// It will retry the action if the action returns a non-nil error.
func NewScheduler[T any](name string, limit, maxTries, chanSize int, action func(param T) error) *Scheduler[T] {
	return &Scheduler[T]{
		name:     name,
		limit:    limit,
		maxTries: maxTries,
		tasks:    make(chan task[T], chanSize),
		action:   action,
	}
}

func (s *Scheduler[T]) AddTask(param T) {
	s.wg.Add(1)
	s.tasks <- task[T]{param: param}
}

func (s *Scheduler[T]) retryLater(t task[T]) {
	s.wg.Add(1)
	s.tasks <- t
}

func (s *Scheduler[T]) Run() {
	lim := rate.NewLimiter(rate.Limit(s.limit), 1)
	for t := range s.tasks {
		t := t
		if err := lim.Wait(context.Background()); err != nil {
			log.Fatalf("wait: %s", err)
			return
		}
		go func() {
			defer s.wg.Done()
			err := s.action(t.param)
			if err != nil {
				log.Printf("task %s, param %v failed: %v", s.name, t.param, err)
				t.failedCount++

				if t.failedCount == s.maxTries {
					log.Printf("task %s, param %v failed with %d tries", s.name, t.param, s.maxTries)
					return
				}

				s.retryLater(t)
			}
		}()
	}
}

func (s *Scheduler[T]) Wait() {
	s.wg.Wait()
	close(s.tasks)
}

func main() {
	s := &server{}
	ts := httptest.NewServer(s)
	defer ts.Close()

	schedulerPost := NewScheduler("post", 20, 3, 1, func(param string) error {
		return post(fmt.Sprintf("%s/%s", ts.URL, param))
	})

	go schedulerPost.Run()

	schedulerGet := NewScheduler("get", 10, 3, 1, func(param int) error {
		id, err := get(fmt.Sprintf("%s/%d", ts.URL, param))
		if err != nil {
			return err
		}

		schedulerPost.AddTask(id)
		return nil
	})

	go schedulerGet.Run()

	for i := 0; i < 100; i++ {
		schedulerGet.AddTask(i)
	}

	schedulerGet.Wait()
	schedulerPost.Wait()

	s.printStats()
}

func get(url string) (string, error) {
	resp, err := http.Get(url)
	if err != nil {
		return "", err
	}
	defer resp.Body.Close()

	if resp.StatusCode != 200 {
		return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
	}

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		return "", err
	}

	return string(body), nil
}

func post(url string) error {
	resp, err := http.Post(url, "", nil)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	if resp.StatusCode != 200 {
		return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
	}

	return nil
}

type server struct {
	gMu  sync.Mutex
	gets []int64

	pMu   sync.Mutex
	posts []int64
}

func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	log.Printf("%s: %s", r.Method, r.URL.Path)

	// collect request stats.
	if r.Method == http.MethodGet {
		s.gMu.Lock()
		s.gets = append(s.gets, time.Now().UnixMilli())
		s.gMu.Unlock()
	} else {
		s.pMu.Lock()
		s.posts = append(s.posts, time.Now().UnixMilli())
		s.pMu.Unlock()
	}

	n := rand.Intn(1000)
	// simulate latency.
	time.Sleep(time.Duration(n) * time.Millisecond)

	// simulate errors.
	if n%10 == 0 {
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	if r.Method == http.MethodGet {
		fmt.Fprintf(w, "%s", r.URL.Path[1:])
		return
	}
}

func (s *server) printStats() {
	log.Printf("GETS (total: %d):\n", len(s.gets))
	printStats(s.gets)
	log.Printf("POSTS (total: %d):\n", len(s.posts))
	printStats(s.posts)
}

func printStats(ts []int64) {
	sort.Slice(ts, func(i, j int) bool {
		return ts[i] < ts[j]
	})

	count := 0
	to := ts[0] + 1000
	for i := 0; i < len(ts); i++ {
		if ts[i] < to {
			count++
		} else {
			fmt.Printf("  %d: %d\n", to, count)
			i-- // push back the current item
			count = 0
			to += 1000
		}
	}
	if count > 0 {
		fmt.Printf("  %d: %d\n", to, count)
	}
}

输出结果如下:

...
2023/03/25 21:03:30 GETS (total: 112):
1679749398998: 10
1679749399998: 10
1679749400998: 10
1679749401998: 10
1679749402998: 10
1679749403998: 10
1679749404998: 10
1679749405998: 10
1679749406998: 10
1679749407998: 10
1679749408998: 10
1679749409998: 2
2023/03/25 21:03:30 POSTS (total: 111):
1679749399079: 8
1679749400079: 8
1679749401079: 12
1679749402079: 8
1679749403079: 10
1679749404079: 9
1679749405079: 9
1679749406079: 8
1679749407079: 14
1679749408079: 12
1679749409079: 9
1679749410079: 4
英文:

This is a producer/consumer pattern. You can use a chan to connect them.

Regarding the rate limiter, I would use the package golang.org/x/time/rate.

Since we have decided to use a chan to connect the producer and the consumer, it's natural to send the failed tasks to the same chan so that the consumer can try it again.

I have encapsulated the logic into the type Scheduler[T]. See the demo below. Please note that the demo is written in a hurry to illustrate the idea only. It's not tested thoroughly.

package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;io&quot;
	&quot;log&quot;
	&quot;math/rand&quot;
	&quot;net/http&quot;
	&quot;net/http/httptest&quot;
	&quot;sort&quot;
	&quot;sync&quot;
	&quot;time&quot;

	&quot;golang.org/x/time/rate&quot;
)

type task[T any] struct {
	param       T
	failedCount int
}

type Scheduler[T any] struct {
	name     string
	limit    int
	maxTries int
	wg       sync.WaitGroup
	tasks    chan task[T]
	action   func(param T) error
}

// NewScheduler creates a scheduler that runs the action with the specified rate limit.
// It will retry the action if the action returns a non-nil error.
func NewScheduler[T any](name string, limit, maxTries, chanSize int, action func(param T) error) *Scheduler[T] {
	return &amp;Scheduler[T]{
		name:     name,
		limit:    limit,
		maxTries: maxTries,
		tasks:    make(chan task[T], chanSize),
		action:   action,
	}
}

func (s *Scheduler[T]) AddTask(param T) {
	s.wg.Add(1)
	s.tasks &lt;- task[T]{param: param}
}

func (s *Scheduler[T]) retryLater(t task[T]) {
	s.wg.Add(1)
	s.tasks &lt;- t
}

func (s *Scheduler[T]) Run() {
	lim := rate.NewLimiter(rate.Limit(s.limit), 1)
	for t := range s.tasks {
		t := t
		if err := lim.Wait(context.Background()); err != nil {
			log.Fatalf(&quot;wait: %s&quot;, err)
			return
		}
		go func() {
			defer s.wg.Done()
			err := s.action(t.param)
			if err != nil {
				log.Printf(&quot;task %s, param %v failed: %v&quot;, s.name, t.param, err)
				t.failedCount++

				if t.failedCount == s.maxTries {
					log.Printf(&quot;task %s, param %v failed with %d tries&quot;, s.name, t.param, s.maxTries)
					return
				}

				s.retryLater(t)
			}
		}()
	}
}

func (s *Scheduler[T]) Wait() {
	s.wg.Wait()
	close(s.tasks)
}

func main() {
	s := &amp;server{}
	ts := httptest.NewServer(s)
	defer ts.Close()

	schedulerPost := NewScheduler(&quot;post&quot;, 20, 3, 1, func(param string) error {
		return post(fmt.Sprintf(&quot;%s/%s&quot;, ts.URL, param))
	})

	go schedulerPost.Run()

	schedulerGet := NewScheduler(&quot;get&quot;, 10, 3, 1, func(param int) error {
		id, err := get(fmt.Sprintf(&quot;%s/%d&quot;, ts.URL, param))
		if err != nil {
			return err
		}

		schedulerPost.AddTask(id)
		return nil
	})

	go schedulerGet.Run()

	for i := 0; i &lt; 100; i++ {
		schedulerGet.AddTask(i)
	}

	schedulerGet.Wait()
	schedulerPost.Wait()

	s.printStats()
}

func get(url string) (string, error) {
	resp, err := http.Get(url)
	if err != nil {
		return &quot;&quot;, err
	}
	defer resp.Body.Close()

	if resp.StatusCode != 200 {
		return &quot;&quot;, fmt.Errorf(&quot;unexpected status code: %d&quot;, resp.StatusCode)
	}

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		return &quot;&quot;, err
	}

	return string(body), nil
}

func post(url string) error {
	resp, err := http.Post(url, &quot;&quot;, nil)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	if resp.StatusCode != 200 {
		return fmt.Errorf(&quot;unexpected status code: %d&quot;, resp.StatusCode)
	}

	return nil
}

type server struct {
	gMu  sync.Mutex
	gets []int64

	pMu   sync.Mutex
	posts []int64
}

func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	log.Printf(&quot;%s: %s&quot;, r.Method, r.URL.Path)

	// collect request stats.
	if r.Method == http.MethodGet {
		s.gMu.Lock()
		s.gets = append(s.gets, time.Now().UnixMilli())
		s.gMu.Unlock()
	} else {
		s.pMu.Lock()
		s.posts = append(s.posts, time.Now().UnixMilli())
		s.pMu.Unlock()
	}

	n := rand.Intn(1000)
	// simulate latency.
	time.Sleep(time.Duration(n) * time.Millisecond)

	// simulate errors.
	if n%10 == 0 {
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	if r.Method == http.MethodGet {
		fmt.Fprintf(w, &quot;%s&quot;, r.URL.Path[1:])
		return
	}
}

func (s *server) printStats() {
	log.Printf(&quot;GETS (total: %d):\n&quot;, len(s.gets))
	printStats(s.gets)
	log.Printf(&quot;POSTS (total: %d):\n&quot;, len(s.posts))
	printStats(s.posts)
}

func printStats(ts []int64) {
	sort.Slice(ts, func(i, j int) bool {
		return ts[i] &lt; ts[j]
	})

	count := 0
	to := ts[0] + 1000
	for i := 0; i &lt; len(ts); i++ {
		if ts[i] &lt; to {
			count++
		} else {
			fmt.Printf(&quot;  %d: %d\n&quot;, to, count)
			i-- // push back the current item
			count = 0
			to += 1000
		}
	}
	if count &gt; 0 {
		fmt.Printf(&quot;  %d: %d\n&quot;, to, count)
	}
}

The output looks like this:

...
2023/03/25 21:03:30 GETS (total: 112):
1679749398998: 10
1679749399998: 10
1679749400998: 10
1679749401998: 10
1679749402998: 10
1679749403998: 10
1679749404998: 10
1679749405998: 10
1679749406998: 10
1679749407998: 10
1679749408998: 10
1679749409998: 2
2023/03/25 21:03:30 POSTS (total: 111):
1679749399079: 8
1679749400079: 8
1679749401079: 12
1679749402079: 8
1679749403079: 10
1679749404079: 9
1679749405079: 9
1679749406079: 8
1679749407079: 14
1679749408079: 12
1679749409079: 9
1679749410079: 4

huangapple
  • 本文由 发表于 2023年3月23日 23:47:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/75825167.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定