在漏桶算法中,当队列不满时,实现固定速率的正确逻辑是什么?

huangapple go评论95阅读模式
英文:

In leaky-bucket algorithm, when the queue is not full what's the correct logic to achieve fixed rate?

问题

我正在学习漏桶算法,并希望通过使用Redis和Golang的HTTP编写一些简单的代码来实践一下。

当我使用关键词redis、leaky和bucket进行搜索时,发现有很多类似的问题,如[1]所示,这很好。然而,我在阅读这些帖子和维基百科[2]后发现自己对整个逻辑的理解有问题。我猜想可能有些地方我没有理解或者没有意识到。所以我想在这里重新表述一下,并请纠正我如果我理解错了。

伪代码如下:

key := "ip地址、令牌或任何可以代表客户端的东西"
redis_queue_size := 5
interval_between_each_request := 7
request := 从某个地方获取HTTP请求()

if check_current_queue_size() < redis_queue_size:
    if is_queue_empty()
        add_request_to_the_queue() // zadd "ip1" now() now() // now()是秒、毫秒或纳秒等,例如 t = 1
        process_request(request)
    else
        now := 获取当前时间()
        // add_request_to_... 检索队列中的第一个元素
        // 计算执行请求的预期时间戳及其当前时间
        // 例如 zadd "ip1" <队列中第一个元素的时间 + interval_between_each_request> now
        add_request_to_redis_queue_with_timestamp(now, interval_between_each_request) // 例如 zadd "ip" <时间戳作为分数> <请求允许执行的时间戳>
        // 下面的函数 check_the_time_left...() 将检查当前请求需要等待多长时间才能执行。
        // 例如,第一个请求存储在队列中的命令为
        //    zadd "ip1" 1 1  // t = 1
        // 第二个请求在 t = 4 到达,但允许在 t = 8 执行
        //    zadd "ip1" 8 4 // 这里的 4 是当前时间,8 是 1 + interval_between_each_request
        // 所以 N 将为 4
        N := check_the_time_left_for_the_current_request_to_execute(now, interval_between_each_request) 
        sleep(N) // 现在请求在处理之前等待 4 秒
        process_request(http_request_obj)
else
    return // 丢弃请求

我理解当队列已满时,后续的请求将被丢弃。然而,我想我可能误解了当队列未满时,如何调整传入的请求以便以固定速率执行。

我感谢任何建议。

[1]. https://stackoverflow.com/search?q=redis+leaky+bucket+&s=aa2eaa93-a6ba-4e31-9a83-68f791c5756e

[2]. https://en.wikipedia.org/wiki/Leaky_bucket#As_a_queue

英文:

I am learning leaky bucket algorithm and want to get my hand dirty by writing some simple code with redis plus golang http.

When I searched here with the keyword redis, leaky, bucket. There are many similar questions as shown in [1], which is nice. However I find I have a problem to understand the entire logic after going through those threads and wiki[2]. I suppose there is something I do not understand and am also not aware of it. So I would like to rephrase it again here; and please correct me if I get it wrong.

The pseudo code:

key := &quot;ip address, token or anything that can be the representative of a client&quot;
redis_queue_size := 5
interval_between_each_request := 7
request := obtain_http_request_from_somewhere()

if check_current_queue_size() &lt; redis_queue_size:
    if is_queue_empty()
        add_request_to_the_queue() // zadd &quot;ip1&quot; now() now() // now() is something like seconds, milliseconds or nanoseconds e.g. t = 1
        process_request(request)
    else
        now := get_current_time()
        // add_request_to_... retrieves the first element in the queue
        // compute the expected timestamp to execute the request and its current time
        // e.g. zadd &quot;ip1&quot; &lt;time of the first elment in the queue + interval_between_each_request&gt; now
        add_request_to_redis_queue_with_timestamp(now, interval_between_each_request) // e.g. zadd &quot;ip&quot; &lt;timestamp as score&gt; &lt;timestamp a request is allowed to be executed&gt;
        // Below function check_the_time_left...() will check how many time left at which the current request need to wait. 
        // For instance, the first request stored in the queue with the command
        //    zadd &quot;ip1&quot; 1 1  // t = 1
        // and the second request arrives at t = 4 but it is allowed t be executed at t = 8
        //    zadd &quot;ip1&quot; 8 4 // where 4 := now, 8 := 1 + interval_between_each_request
        // so the N will be 4 
        N := check_the_time_left_for_the_current_request_to_execute(now, interval_between_each_request) 
        sleep(N) // now the request wait for 4 seconds before processing the request
        process_request(http_request_obj)
else
    return // discard request

I understand the part when queue is full, then the following requests will be discarded. However I suppose I may misunderstand when the queue is not full, how to reshape the incoming request so it can be executed in a fixed rate.

I appreciate any suggestions

[1]. https://stackoverflow.com/search?q=redis+leaky+bucket+&amp;s=aa2eaa93-a6ba-4e31-9a83-68f791c5756e

[2]. https://en.wikipedia.org/wiki/Leaky_bucket#As_a_queue

答案1

得分: 0

有几种方法可以实现一个漏桶算法,但是整个过程应该分为两个独立的部分。一个部分是将数据放入桶中,另一个部分是在设定的时间间隔内将数据移除。

你可以使用一个单独的goroutine在设定的时间间隔内消费消息。这样可以简化你的代码,因为在一个代码路径中,你只需要查看队列的大小并丢弃数据包,而另一个代码路径只需要消费桶中的数据。

英文:

There are several ways you can implement a leaky bucket but there should be two separate parts for the process. One that puts things in the bucket and another that removes them at a set interval if there is anything to remove.

You can use a separate goroutine that would consume the messages at a set interval. This would simplify your code since on one code path you would only have to look into the queue size and drop packets and another code path would just consume whatever there is.

答案2

得分: 0

1)如果这是用于简单的速率限制,使用排序集合的滑动窗口方法是大多数Redis用户实现的方式。你可以在这个链接中找到一个示例:https://github.com/Redislabs-Solution-Architects/RateLimitingExample/blob/sliding_window/app.py

2)如果你坚持使用漏桶算法,你可以考虑按照每个消费者ID(apiToken/IP地址等)使用一个Redis流的方式来实现。具体步骤如下:

当有一个针对某个消费者ID的请求到达时,
XADD requests-[consumerID] MAXLEN [BUCKET SIZE]
如果需要的话,生成一个go协程来处理该消费者ID的请求
获取当前时间
如果requests-[consumerID]的长度为0,则退出go协程

XREAD COUNT [每个时间段的请求数量] BLOCK [时间段-1毫秒] STREAMS requests-[consumerID]
获取当前时间,并休眠至时间段结束

关于Redis流的详细信息可以参考:https://redis.io/commands#stream

英文:
  1. If this is for simple rate-limiting the sliding window approach using a sorted set is what we see implemented by most Redis users https://github.com/Redislabs-Solution-Architects/RateLimitingExample/blob/sliding_window/app.py

  2. If you are set on leaky bucket you might consider using a redis stream per consumerID (apiToken/IP Address etc) as follows

request comes in for consumerID

XADD requests-[consumerID] MAXLEN [BUCKET SIZE]

spawn a go routine if necessary for that consumerID
get current time
if XLEN of requests-[consumerID] is 0 exit go routine

XREAD COUNT [number_of_requests_per_period] BLOCK [time period - 1 ms] STREAMS requests-[consumerID]
get the current time and sleep for the remainder of the time period

https://redis.io/commands#stream details how streams work

huangapple
  • 本文由 发表于 2021年9月9日 12:59:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/69112450.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定