在两个独立的速率限制端点之间同步请求

huangapple go评论147阅读模式
英文:

Synchronizing requests between two Separate Rate Limited Endpoints

问题

我正在使用一些第三方API,每个API都有自己的速率限制。Endpoint 1 的速率限制是每秒10次,Endpoint 2 的速率限制是每秒20次。

我需要通过第一个端点处理我的数据,该端点将返回一个对象数组(2-3000个对象之间)。然后,我需要将每个对象中的一些数据发送到第二个端点,同时遵守第二个端点的速率限制。

我计划使用go routines以每次10个的批量发送第一个端点的请求,确保如果所有10个请求在<1秒内完成,我不会在该1秒窗口内发送更多请求。

最终,我希望能够限制每个端点同时发送的并发响应数量。特别是如果我必须为由于服务器返回500+响应而失败的请求构建重试机制等。

为了提出问题,我使用httpbin请求来模拟下面的场景:

  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "sync"
  9. "time"
  10. )
  11. type HttpBinGetRequest struct {
  12. url string
  13. }
  14. type HttpBinGetResponse struct {
  15. Uuid string `json:"uuid"`
  16. StatusCode int
  17. }
  18. type HttpBinPostRequest struct {
  19. url string
  20. uuid string // Item to post to API
  21. }
  22. type HttpBinPostResponse struct {
  23. Data string `json:"data"`
  24. StatusCode int
  25. }
  26. func main() {
  27. // 准备500个GET请求
  28. var requests []*HttpBinGetRequest
  29. for i := 0; i < 500; i++ {
  30. uri := "https://httpbin.org/uuid"
  31. request := &HttpBinGetRequest{
  32. url: uri,
  33. }
  34. requests = append(requests, request)
  35. }
  36. // 创建GET端点的信号量和速率限制
  37. getSemaphore := make(chan struct{}, 10)
  38. getRate := make(chan struct{}, 10)
  39. for i := 0; i < cap(getRate); i++ {
  40. getRate <- struct{}{}
  41. }
  42. go func() {
  43. // 每1/10秒触发一次
  44. ticker := time.NewTicker(100 * time.Millisecond)
  45. defer ticker.Stop()
  46. for range ticker.C {
  47. _, ok := <-getRate
  48. if !ok {
  49. return
  50. }
  51. }
  52. }()
  53. // 发送GET请求以获取随机UUID
  54. var wg sync.WaitGroup
  55. for _, request := range requests {
  56. wg.Add(1)
  57. // Go函数用于发送请求并接收响应
  58. go func(r *HttpBinGetRequest) {
  59. defer wg.Done()
  60. // 检查速率限制器,如果为空则阻塞
  61. getRate <- struct{}{}
  62. // 在函数完成时添加一个令牌到信号量
  63. getSemaphore <- struct{}{}
  64. // 函数完成后移除令牌
  65. defer func() {
  66. <-getSemaphore
  67. }()
  68. resp, _ := get(r)
  69. fmt.Printf("%+v\n", resp)
  70. }(request)
  71. }
  72. wg.Wait()
  73. // 我需要添加代码,从上面的循环中获取响应数据
  74. // 然后将UUID发送到自己的go routines进行POST请求,遵守类似上面的模式
  75. // postSemaphore := make(chan struct{}, 20)
  76. // postRate := make(chan struct{}, 20)
  77. // for i := 0; i < cap(postRate); i++ {
  78. // postRate <- struct{}{}
  79. // }
  80. }
  81. func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {
  82. httpResp := &HttpBinGetResponse{}
  83. client := &http.Client{}
  84. req, err := http.NewRequest("GET", hbgr.url, nil)
  85. if err != nil {
  86. fmt.Println("error making request")
  87. return httpResp, err
  88. }
  89. req.Header = http.Header{
  90. "accept": {"application/json"},
  91. }
  92. resp, err := client.Do(req)
  93. if err != nil {
  94. fmt.Println(err)
  95. fmt.Println("error getting response")
  96. return httpResp, err
  97. }
  98. // 读取响应
  99. body, err := io.ReadAll(resp.Body)
  100. if err != nil {
  101. fmt.Println("error reading response body")
  102. return httpResp, err
  103. }
  104. json.Unmarshal(body, &httpResp)
  105. httpResp.StatusCode = resp.StatusCode
  106. return httpResp, nil
  107. }
  108. // 发送数据到httpbin的方法
  109. func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {
  110. httpResp := &HttpBinPostResponse{}
  111. client := &http.Client{}
  112. req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
  113. if err != nil {
  114. fmt.Println("error making request")
  115. return httpResp, err
  116. }
  117. req.Header = http.Header{
  118. "accept": {"application/json"},
  119. }
  120. resp, err := client.Do(req)
  121. if err != nil {
  122. fmt.Println("error getting response")
  123. return httpResp, err
  124. }
  125. if resp.StatusCode == 429 {
  126. fmt.Println(resp.Header.Get("Retry-After"))
  127. }
  128. // 读取响应
  129. body, err := io.ReadAll(resp.Body)
  130. if err != nil {
  131. fmt.Println("error reading response body")
  132. return httpResp, err
  133. }
  134. json.Unmarshal(body, &httpResp)
  135. httpResp.StatusCode = resp.StatusCode
  136. fmt.Printf("%+v", httpResp)
  137. return httpResp, nil
  138. }

以上是您提供的代码的翻译。

英文:

I am utilizing some third party APIs that each have their own rate limits. Endpoint 1 has a rate limit of 10/s and Endpoint2 has a rate limit of 20/s.

I need to process my data through endpoint 1 which will return an array of objects (between 2-3000 objects). I then need to take each object and send some data within to the 2nd endpoint while respecting that 2nd endpoint's rate limit.

I plan on sending the first endpoint's request in go routines via a batch of 10 at a time, ensuring that if all 10 requests finish in <1s, I do not exceed by sending more within that 1 second window.

Ultimately, I want to be able to limit how many concurrent responses are going out at a time per endpoint. Especially if I have to build in retries for failed requests due to server 500+ responses, etc.

For purpose of question I am using httpbin requests to simulate the scenario below:

  1. package main
  2. import (
  3. &quot;bytes&quot;
  4. &quot;encoding/json&quot;
  5. &quot;fmt&quot;
  6. &quot;io&quot;
  7. &quot;net/http&quot;
  8. &quot;sync&quot;
  9. &quot;time&quot;
  10. )
  11. type HttpBinGetRequest struct {
  12. url string
  13. }
  14. type HttpBinGetResponse struct {
  15. Uuid string `json:&quot;uuid&quot;`
  16. StatusCode int
  17. }
  18. type HttpBinPostRequest struct {
  19. url string
  20. uuid string // Item to post to API
  21. }
  22. type HttpBinPostResponse struct {
  23. Data string `json:&quot;data&quot;`
  24. StatusCode int
  25. }
  26. func main() {
  27. // Prepare GET requests for 500 requests
  28. var requests []*HttpBinGetRequest
  29. for i := 0; i &lt; 500; i++ {
  30. uri := &quot;https://httpbin.org/uuid&quot;
  31. request := &amp;HttpBinGetRequest{
  32. url: uri,
  33. }
  34. requests = append(requests, request)
  35. }
  36. // Create semaphore and rate limit for the GET endpoint
  37. getSemaphore := make(chan struct{}, 10)
  38. getRate := make(chan struct{}, 10)
  39. for i := 0; i &lt; cap(getRate); i++ {
  40. getRate &lt;- struct{}{}
  41. }
  42. go func() {
  43. // ticker corresponding to 1/10th of a second
  44. ticker := time.NewTicker(100 * time.Millisecond)
  45. defer ticker.Stop()
  46. for range ticker.C {
  47. _, ok := &lt;-getRate
  48. if !ok {
  49. return
  50. }
  51. }
  52. }()
  53. // Send our GET requests to obtain a random UUID
  54. var wg sync.WaitGroup
  55. for _, request := range requests {
  56. wg.Add(1)
  57. // Go func to make request and receive the response
  58. go func(r *HttpBinGetRequest) {
  59. defer wg.Done()
  60. // Check the rate limiter and block if it is empty
  61. getRate &lt;- struct{}{}
  62. // Add a token to the semaphore
  63. getSemaphore &lt;- struct{}{}
  64. // Remove token when function is complete
  65. defer func() {
  66. &lt;-getSemaphore
  67. }()
  68. resp, _ := get(r)
  69. fmt.Printf(&quot;%+v\n&quot;, resp)
  70. }(request)
  71. }
  72. wg.Wait()
  73. // I need to add code that obtains the response data from the above for loop
  74. // then sends the UUID it to its own go routines for a POST request, following a similar pattern above
  75. // To not violate the rate limit of the second endpoint which is 20 calls per second
  76. // postSemaphore := make(chan struct{}, 20)
  77. // postRate := make(chan struct{}, 20)
  78. // for i := 0; i &lt; cap(postRate); i++ {
  79. // postRate &lt;- struct{}{}
  80. // }
  81. }
  82. func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {
  83. httpResp := &amp;HttpBinGetResponse{}
  84. client := &amp;http.Client{}
  85. req, err := http.NewRequest(&quot;GET&quot;, hbgr.url, nil)
  86. if err != nil {
  87. fmt.Println(&quot;error making request&quot;)
  88. return httpResp, err
  89. }
  90. req.Header = http.Header{
  91. &quot;accept&quot;: {&quot;application/json&quot;},
  92. }
  93. resp, err := client.Do(req)
  94. if err != nil {
  95. fmt.Println(err)
  96. fmt.Println(&quot;error getting response&quot;)
  97. return httpResp, err
  98. }
  99. // Read Response
  100. body, err := io.ReadAll(resp.Body)
  101. if err != nil {
  102. fmt.Println(&quot;error reading response body&quot;)
  103. return httpResp, err
  104. }
  105. json.Unmarshal(body, &amp;httpResp)
  106. httpResp.StatusCode = resp.StatusCode
  107. return httpResp, nil
  108. }
  109. // Method to post data to httpbin
  110. func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {
  111. httpResp := &amp;HttpBinPostResponse{}
  112. client := &amp;http.Client{}
  113. req, err := http.NewRequest(&quot;POST&quot;, hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
  114. if err != nil {
  115. fmt.Println(&quot;error making request&quot;)
  116. return httpResp, err
  117. }
  118. req.Header = http.Header{
  119. &quot;accept&quot;: {&quot;application/json&quot;},
  120. }
  121. resp, err := client.Do(req)
  122. if err != nil {
  123. fmt.Println(&quot;error getting response&quot;)
  124. return httpResp, err
  125. }
  126. if resp.StatusCode == 429 {
  127. fmt.Println(resp.Header.Get(&quot;Retry-After&quot;))
  128. }
  129. // Read Response
  130. body, err := io.ReadAll(resp.Body)
  131. if err != nil {
  132. fmt.Println(&quot;error reading response body&quot;)
  133. return httpResp, err
  134. }
  135. json.Unmarshal(body, &amp;httpResp)
  136. httpResp.StatusCode = resp.StatusCode
  137. fmt.Printf(&quot;%+v&quot;, httpResp)
  138. return httpResp, nil
  139. }

答案1

得分: 1

这是一个生产者/消费者模式。你可以使用chan来连接它们。

关于速率限制器,我建议使用golang.org/x/time/rate包。

由于我们决定使用chan来连接生产者和消费者,所以自然而然地将失败的任务发送到同一个chan中,以便消费者可以再次尝试。

我已经将逻辑封装到了Scheduler[T]类型中。请参考下面的示例。请注意,该示例只是为了快速说明思路而编写的,并没有经过全面测试。

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "log"
  7. "math/rand"
  8. "net/http"
  9. "net/http/httptest"
  10. "sort"
  11. "sync"
  12. "time"
  13. "golang.org/x/time/rate"
  14. )
  15. type task[T any] struct {
  16. param T
  17. failedCount int
  18. }
  19. type Scheduler[T any] struct {
  20. name string
  21. limit int
  22. maxTries int
  23. wg sync.WaitGroup
  24. tasks chan task[T]
  25. action func(param T) error
  26. }
  27. // NewScheduler creates a scheduler that runs the action with the specified rate limit.
  28. // It will retry the action if the action returns a non-nil error.
  29. func NewScheduler[T any](name string, limit, maxTries, chanSize int, action func(param T) error) *Scheduler[T] {
  30. return &Scheduler[T]{
  31. name: name,
  32. limit: limit,
  33. maxTries: maxTries,
  34. tasks: make(chan task[T], chanSize),
  35. action: action,
  36. }
  37. }
  38. func (s *Scheduler[T]) AddTask(param T) {
  39. s.wg.Add(1)
  40. s.tasks <- task[T]{param: param}
  41. }
  42. func (s *Scheduler[T]) retryLater(t task[T]) {
  43. s.wg.Add(1)
  44. s.tasks <- t
  45. }
  46. func (s *Scheduler[T]) Run() {
  47. lim := rate.NewLimiter(rate.Limit(s.limit), 1)
  48. for t := range s.tasks {
  49. t := t
  50. if err := lim.Wait(context.Background()); err != nil {
  51. log.Fatalf("wait: %s", err)
  52. return
  53. }
  54. go func() {
  55. defer s.wg.Done()
  56. err := s.action(t.param)
  57. if err != nil {
  58. log.Printf("task %s, param %v failed: %v", s.name, t.param, err)
  59. t.failedCount++
  60. if t.failedCount == s.maxTries {
  61. log.Printf("task %s, param %v failed with %d tries", s.name, t.param, s.maxTries)
  62. return
  63. }
  64. s.retryLater(t)
  65. }
  66. }()
  67. }
  68. }
  69. func (s *Scheduler[T]) Wait() {
  70. s.wg.Wait()
  71. close(s.tasks)
  72. }
  73. func main() {
  74. s := &server{}
  75. ts := httptest.NewServer(s)
  76. defer ts.Close()
  77. schedulerPost := NewScheduler("post", 20, 3, 1, func(param string) error {
  78. return post(fmt.Sprintf("%s/%s", ts.URL, param))
  79. })
  80. go schedulerPost.Run()
  81. schedulerGet := NewScheduler("get", 10, 3, 1, func(param int) error {
  82. id, err := get(fmt.Sprintf("%s/%d", ts.URL, param))
  83. if err != nil {
  84. return err
  85. }
  86. schedulerPost.AddTask(id)
  87. return nil
  88. })
  89. go schedulerGet.Run()
  90. for i := 0; i < 100; i++ {
  91. schedulerGet.AddTask(i)
  92. }
  93. schedulerGet.Wait()
  94. schedulerPost.Wait()
  95. s.printStats()
  96. }
  97. func get(url string) (string, error) {
  98. resp, err := http.Get(url)
  99. if err != nil {
  100. return "", err
  101. }
  102. defer resp.Body.Close()
  103. if resp.StatusCode != 200 {
  104. return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  105. }
  106. body, err := io.ReadAll(resp.Body)
  107. if err != nil {
  108. return "", err
  109. }
  110. return string(body), nil
  111. }
  112. func post(url string) error {
  113. resp, err := http.Post(url, "", nil)
  114. if err != nil {
  115. return err
  116. }
  117. defer resp.Body.Close()
  118. if resp.StatusCode != 200 {
  119. return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  120. }
  121. return nil
  122. }
  123. type server struct {
  124. gMu sync.Mutex
  125. gets []int64
  126. pMu sync.Mutex
  127. posts []int64
  128. }
  129. func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  130. log.Printf("%s: %s", r.Method, r.URL.Path)
  131. // collect request stats.
  132. if r.Method == http.MethodGet {
  133. s.gMu.Lock()
  134. s.gets = append(s.gets, time.Now().UnixMilli())
  135. s.gMu.Unlock()
  136. } else {
  137. s.pMu.Lock()
  138. s.posts = append(s.posts, time.Now().UnixMilli())
  139. s.pMu.Unlock()
  140. }
  141. n := rand.Intn(1000)
  142. // simulate latency.
  143. time.Sleep(time.Duration(n) * time.Millisecond)
  144. // simulate errors.
  145. if n%10 == 0 {
  146. w.WriteHeader(http.StatusInternalServerError)
  147. return
  148. }
  149. if r.Method == http.MethodGet {
  150. fmt.Fprintf(w, "%s", r.URL.Path[1:])
  151. return
  152. }
  153. }
  154. func (s *server) printStats() {
  155. log.Printf("GETS (total: %d):\n", len(s.gets))
  156. printStats(s.gets)
  157. log.Printf("POSTS (total: %d):\n", len(s.posts))
  158. printStats(s.posts)
  159. }
  160. func printStats(ts []int64) {
  161. sort.Slice(ts, func(i, j int) bool {
  162. return ts[i] < ts[j]
  163. })
  164. count := 0
  165. to := ts[0] + 1000
  166. for i := 0; i < len(ts); i++ {
  167. if ts[i] < to {
  168. count++
  169. } else {
  170. fmt.Printf(" %d: %d\n", to, count)
  171. i-- // push back the current item
  172. count = 0
  173. to += 1000
  174. }
  175. }
  176. if count > 0 {
  177. fmt.Printf(" %d: %d\n", to, count)
  178. }
  179. }

输出结果如下:

  1. ...
  2. 2023/03/25 21:03:30 GETS (total: 112):
  3. 1679749398998: 10
  4. 1679749399998: 10
  5. 1679749400998: 10
  6. 1679749401998: 10
  7. 1679749402998: 10
  8. 1679749403998: 10
  9. 1679749404998: 10
  10. 1679749405998: 10
  11. 1679749406998: 10
  12. 1679749407998: 10
  13. 1679749408998: 10
  14. 1679749409998: 2
  15. 2023/03/25 21:03:30 POSTS (total: 111):
  16. 1679749399079: 8
  17. 1679749400079: 8
  18. 1679749401079: 12
  19. 1679749402079: 8
  20. 1679749403079: 10
  21. 1679749404079: 9
  22. 1679749405079: 9
  23. 1679749406079: 8
  24. 1679749407079: 14
  25. 1679749408079: 12
  26. 1679749409079: 9
  27. 1679749410079: 4
英文:

This is a producer/consumer pattern. You can use a chan to connect them.

Regarding the rate limiter, I would use the package golang.org/x/time/rate.

Since we have decided to use a chan to connect the producer and the consumer, it's natural to send the failed tasks to the same chan so that the consumer can try it again.

I have encapsulated the logic into the type Scheduler[T]. See the demo below. Please note that the demo is written in a hurry to illustrate the idea only. It's not tested thoroughly.

  1. package main
  2. import (
  3. &quot;context&quot;
  4. &quot;fmt&quot;
  5. &quot;io&quot;
  6. &quot;log&quot;
  7. &quot;math/rand&quot;
  8. &quot;net/http&quot;
  9. &quot;net/http/httptest&quot;
  10. &quot;sort&quot;
  11. &quot;sync&quot;
  12. &quot;time&quot;
  13. &quot;golang.org/x/time/rate&quot;
  14. )
  15. type task[T any] struct {
  16. param T
  17. failedCount int
  18. }
  19. type Scheduler[T any] struct {
  20. name string
  21. limit int
  22. maxTries int
  23. wg sync.WaitGroup
  24. tasks chan task[T]
  25. action func(param T) error
  26. }
  27. // NewScheduler creates a scheduler that runs the action with the specified rate limit.
  28. // It will retry the action if the action returns a non-nil error.
  29. func NewScheduler[T any](name string, limit, maxTries, chanSize int, action func(param T) error) *Scheduler[T] {
  30. return &amp;Scheduler[T]{
  31. name: name,
  32. limit: limit,
  33. maxTries: maxTries,
  34. tasks: make(chan task[T], chanSize),
  35. action: action,
  36. }
  37. }
  38. func (s *Scheduler[T]) AddTask(param T) {
  39. s.wg.Add(1)
  40. s.tasks &lt;- task[T]{param: param}
  41. }
  42. func (s *Scheduler[T]) retryLater(t task[T]) {
  43. s.wg.Add(1)
  44. s.tasks &lt;- t
  45. }
  46. func (s *Scheduler[T]) Run() {
  47. lim := rate.NewLimiter(rate.Limit(s.limit), 1)
  48. for t := range s.tasks {
  49. t := t
  50. if err := lim.Wait(context.Background()); err != nil {
  51. log.Fatalf(&quot;wait: %s&quot;, err)
  52. return
  53. }
  54. go func() {
  55. defer s.wg.Done()
  56. err := s.action(t.param)
  57. if err != nil {
  58. log.Printf(&quot;task %s, param %v failed: %v&quot;, s.name, t.param, err)
  59. t.failedCount++
  60. if t.failedCount == s.maxTries {
  61. log.Printf(&quot;task %s, param %v failed with %d tries&quot;, s.name, t.param, s.maxTries)
  62. return
  63. }
  64. s.retryLater(t)
  65. }
  66. }()
  67. }
  68. }
  69. func (s *Scheduler[T]) Wait() {
  70. s.wg.Wait()
  71. close(s.tasks)
  72. }
  73. func main() {
  74. s := &amp;server{}
  75. ts := httptest.NewServer(s)
  76. defer ts.Close()
  77. schedulerPost := NewScheduler(&quot;post&quot;, 20, 3, 1, func(param string) error {
  78. return post(fmt.Sprintf(&quot;%s/%s&quot;, ts.URL, param))
  79. })
  80. go schedulerPost.Run()
  81. schedulerGet := NewScheduler(&quot;get&quot;, 10, 3, 1, func(param int) error {
  82. id, err := get(fmt.Sprintf(&quot;%s/%d&quot;, ts.URL, param))
  83. if err != nil {
  84. return err
  85. }
  86. schedulerPost.AddTask(id)
  87. return nil
  88. })
  89. go schedulerGet.Run()
  90. for i := 0; i &lt; 100; i++ {
  91. schedulerGet.AddTask(i)
  92. }
  93. schedulerGet.Wait()
  94. schedulerPost.Wait()
  95. s.printStats()
  96. }
  97. func get(url string) (string, error) {
  98. resp, err := http.Get(url)
  99. if err != nil {
  100. return &quot;&quot;, err
  101. }
  102. defer resp.Body.Close()
  103. if resp.StatusCode != 200 {
  104. return &quot;&quot;, fmt.Errorf(&quot;unexpected status code: %d&quot;, resp.StatusCode)
  105. }
  106. body, err := io.ReadAll(resp.Body)
  107. if err != nil {
  108. return &quot;&quot;, err
  109. }
  110. return string(body), nil
  111. }
  112. func post(url string) error {
  113. resp, err := http.Post(url, &quot;&quot;, nil)
  114. if err != nil {
  115. return err
  116. }
  117. defer resp.Body.Close()
  118. if resp.StatusCode != 200 {
  119. return fmt.Errorf(&quot;unexpected status code: %d&quot;, resp.StatusCode)
  120. }
  121. return nil
  122. }
  123. type server struct {
  124. gMu sync.Mutex
  125. gets []int64
  126. pMu sync.Mutex
  127. posts []int64
  128. }
  129. func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  130. log.Printf(&quot;%s: %s&quot;, r.Method, r.URL.Path)
  131. // collect request stats.
  132. if r.Method == http.MethodGet {
  133. s.gMu.Lock()
  134. s.gets = append(s.gets, time.Now().UnixMilli())
  135. s.gMu.Unlock()
  136. } else {
  137. s.pMu.Lock()
  138. s.posts = append(s.posts, time.Now().UnixMilli())
  139. s.pMu.Unlock()
  140. }
  141. n := rand.Intn(1000)
  142. // simulate latency.
  143. time.Sleep(time.Duration(n) * time.Millisecond)
  144. // simulate errors.
  145. if n%10 == 0 {
  146. w.WriteHeader(http.StatusInternalServerError)
  147. return
  148. }
  149. if r.Method == http.MethodGet {
  150. fmt.Fprintf(w, &quot;%s&quot;, r.URL.Path[1:])
  151. return
  152. }
  153. }
  154. func (s *server) printStats() {
  155. log.Printf(&quot;GETS (total: %d):\n&quot;, len(s.gets))
  156. printStats(s.gets)
  157. log.Printf(&quot;POSTS (total: %d):\n&quot;, len(s.posts))
  158. printStats(s.posts)
  159. }
  160. func printStats(ts []int64) {
  161. sort.Slice(ts, func(i, j int) bool {
  162. return ts[i] &lt; ts[j]
  163. })
  164. count := 0
  165. to := ts[0] + 1000
  166. for i := 0; i &lt; len(ts); i++ {
  167. if ts[i] &lt; to {
  168. count++
  169. } else {
  170. fmt.Printf(&quot; %d: %d\n&quot;, to, count)
  171. i-- // push back the current item
  172. count = 0
  173. to += 1000
  174. }
  175. }
  176. if count &gt; 0 {
  177. fmt.Printf(&quot; %d: %d\n&quot;, to, count)
  178. }
  179. }

The output looks like this:

  1. ...
  2. 2023/03/25 21:03:30 GETS (total: 112):
  3. 1679749398998: 10
  4. 1679749399998: 10
  5. 1679749400998: 10
  6. 1679749401998: 10
  7. 1679749402998: 10
  8. 1679749403998: 10
  9. 1679749404998: 10
  10. 1679749405998: 10
  11. 1679749406998: 10
  12. 1679749407998: 10
  13. 1679749408998: 10
  14. 1679749409998: 2
  15. 2023/03/25 21:03:30 POSTS (total: 111):
  16. 1679749399079: 8
  17. 1679749400079: 8
  18. 1679749401079: 12
  19. 1679749402079: 8
  20. 1679749403079: 10
  21. 1679749404079: 9
  22. 1679749405079: 9
  23. 1679749406079: 8
  24. 1679749407079: 14
  25. 1679749408079: 12
  26. 1679749409079: 9
  27. 1679749410079: 4

huangapple
  • 本文由 发表于 2023年3月23日 23:47:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/75825167.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定