从零开始构建的Golang Web服务器:我应该如何实现速率限制?

huangapple go评论74阅读模式
英文:

Golang WebServer from scratch: how should I achieve rate limiting?

问题

我正在开发一个使用Golang编写的Web服务器,以实践我对该语言及其并发模型的学习。

我有一个基于原始套接字的简单Web服务器,它会对基本路径的请求返回一个示例HTML响应。在内部,Web服务器在端口8000上监听,并在循环中接受传入的连接,并将它们转发到具有最大容量为1000的缓冲通道中的等待连接。同时,一个由20个工作线程组成的池处理缓冲通道中的请求,并返回响应。

现在,最初我的Web服务器的Serve方法如下所示:

func (h HttpServer) Serve(host, path string) {
    connection, err := net.Listen("tcp", "0.0.0.0:8000")
    if err != nil { ... }
    
    for true {
        clientConnection, err := connection.Accept()
        if err != nil { ... }

        select {
        case h.connectionsQueue <- clientConnection:
        default:
            errors.TooManyRequestsHandler{}.Handle(clientConnection)
        }
    }
}

其中,Web服务器是这样的结构体:

var maxWorkers = 20
var maxPendingConnections = 1000

type HttpServer struct {
    connectionsQueue chan net.Conn
}

func BuildHttpServer() HttpServer {
    routeDispatcher := routing.MakeRegisterRoute()
    routeDispatcher.RegisterRoute("/", ui.RouteUIIndex{})

    server := HttpServer{
        connectionsQueue: make(chan net.Conn, maxPendingConnections),
    }

    for i := 0; i < maxWorkers; i++ {
        go handler.SpawnHandler(server.connectionsQueue, routeDispatcher)
    }

    return server
}

实际上,这已经实现了接受所有传入连接的行为,直到达到maxPendingConnections或通道已满。如果工作线程不堪重负,将通过TooManyRequestsHandler向客户端返回429 Too Many Requests响应。

但是,如果我想为此Web服务器设置绝对的请求处理速率上限怎么办?这里的目标是确保可预测的性能并避免性能下降。因此,我将Serve函数更改为:

func (h HttpServer) Serve(host, path string) {
    acceptRequestRateTicker := time.NewTicker(200 * time.Microseconds)

    connection, err := net.Listen("tcp", "0.0.0.0:8000")
    if err != nil { ... }

    for true {
        select {
        case <-acceptRequestRateTicker.C:
            clientConnection, err := connection.Accept()
            if err != nil { ... }

            select {
            case h.connectionsQueue <- clientConnection:
            default:
                errors.TooManyRequestsHandler{}.Handle(clientConnection)
            }
        }
    }
}

这里的关键是主goroutine不会接受超过5000个请求/秒的请求速率,每200微秒接受一个连接,之后客户端将在获取与服务器的连接时遇到请求超时。这种策略对于确保可预测的服务性能和期望是否是一个好策略?

英文:

I'm developing a webserver in Golang to put in practice what I'm learning about the language and its concurrency model.

I have a simple raw socket's based webserver which answers to a base path with a sample html response. Internally, the webserver listens on port 8000 and in a loop accepts incoming connections forwarding them to a buffered channel with max-capacity of 1000 pending connections. At the same time, a pool of 20 workers handle the requests in the buffered channel and write back the response.

Now, initially my webservers main Serve method went like this:

func (h HttpServer) Serve(host, path string) {
    
	connection, err := net.Listen(&quot;tcp&quot;, &quot;0.0.0.0:8000&quot;)
    if err != nil { ... }
    
    for true {
		clientConnection, err := connection.Accept()
		if err != nil { ... }

		select {
		case h.connectionsQueue &lt;- clientConnection:
		default:
			errors.TooManyRequestsHandler{}.Handle(clientConnection)
		}
	}
}

With webserver being this struct:

var maxWorkers = 20
var maxPendingConnections = 1000

type HttpServer struct {
	connectionsQueue chan net.Conn
}

func BuildHttpServer() HttpServer {
	routeDispatcher := routing.MakeRegisterRoute()
	routeDispatcher.RegisterRoute(&quot;/&quot;, ui.RouteUIIndex{})

	server := HttpServer{
		connectionsQueue: make(chan net.Conn, maxPendingConnections),
	}

	for i := 0; i &lt; maxWorkers; i++ {
		go handler.SpawnHandler(server.connectionsQueue, routeDispatcher)
	}

	return server
}

In practice, this already achieved the behaviour of accepting all incoming connections until the maxPendingConnections is reached / channel full. If the workers are overwhelmed, 429 Too Many Requests starts being returned to the client through the TooManyRequestsHandler, which writes that response back.

But what if I want to set an absolute upper-bound to the rate at which requests are dealt in this webserver? The objective here would be to guarantee predictable performance and avoid degradation. I've thus changed my Serve function to:

func (h HttpServer) Serve(host, path string) {
acceptRequestRateTicker := time.NewTicker(200 * time.Microseconds)

connection, err := net.Listen(&quot;tcp&quot;, &quot;0.0.0.0:8000&quot;)
if err != nil { ... }

for true {
	select {
	case &lt;-acceptRequestRateTicker.C:
		clientConnection, err := connection.Accept()
		if err != nil { ... }

		select {
		case h.connectionsQueue &lt;- clientConnection:
		default:
			errors.TooManyRequestsHandler{}.Handle(clientConnection)
		}
	}
}

The point here being that the main goroutine does not accept a higher request rate than 5000 req/s, by accepting a connections every 200 microseconds, after which clients will experience request timeouts in obtaining a connection to the server. Is this a good strategy for guaranteeing predictable service performance and expectations?

答案1

得分: 0

所以,经过一段时间的努力,我已经实现了我想要的,并且简单的解决方案是实现基于令牌的速率限制器。

基本思想很简单,你有一个深度为N的令牌桶,其中包含令牌。每次需要处理请求时,如果有可用的令牌,就取出一个令牌,同时将可用令牌的数量减少1。

如果没有可用的令牌,你有两个选择,要么立即回复429 Too Many Requests,要么将传入的请求排队,等到令牌可用时再处理。

在这两个选择之间,实现速率限制器的原因有所不同。A)你已经实施了它,以控制应用程序在稳定状态下运行的性能边界。B)你已经实施了它,因为客户端对每秒请求的合同有限制。

不排队请求并立即回复429 Too Many Requests适用于强制执行B)。相反,对于A),客户端可能更愿意延迟处理请求,而不是根本不接收响应,因此将受限速的请求排队是正确的选择,但要考虑应用程序的内存限制。

无论如何,令牌算法的关键在于控制令牌再次可用的速率。如果我们想实现300个请求/秒的速率限制,我们希望一个goroutine在每3.33毫秒(1000毫秒/300个请求/秒)内为一个非满的令牌桶补充一个令牌。也就是说,无论传入的请求如何消耗令牌桶,补充都在固定的间隔内进行,每秒300次,或者每3.33毫秒一次。令牌桶的目的是允许正确接受请求的突发,同时仍然强制执行整体速率。

我用以下逻辑实现了这一点:

http_server.go:

const (
    MAX_WORKERS int = 1
)

type HttpServer struct {
    rateLimiter *limiter.Limiter
}

func BuildHttpServer() HttpServer {
    server := HttpServer{
        rateLimiter: limiter.MakeRateLimiter(),
    }

    for i := 0; i < MAX_WORKERS; i++ {
        go handler.SpawnHandler(server.rateLimiter.AcceptedConnectionsQueue)
    }

    return server
}

func (h HttpServer) Serve(host, path string) {
    connection, err := net.Listen("tcp", "0.0.0.0:8000")
    if err != nil { /* ... */ }

    for true {
        clientConnection, err := connection.Accept()
        if err != nil { /* ... */ }

        if proceed, err := h.rateLimiter.ProceedOrBufferConnection(clientConnection); err != nil {
            /* err != nil means connection was rate limited
             * but could not be buffered
             */
            consumer.Consumer{}.ConsumeAndRespond(clientConnection, responses.TooManyRequestsResponse{})
            continue
        } else if !proceed {
            /* proceed equals false means connection 
             * was rate limited
             */
            continue
        }

        select {
        case h.rateLimiter.AcceptedConnectionsQueue <- clientConnection:
        default:
            /* reaching this case means our workers
             * are not able to keep up with the rate at 
             * which we accept connections. You should detect
             * this scenario and increase
             * the number of workers or the 
             * accepted connections buffer size
             */
            consumer.Consumer{}.ConsumeAndRespond(clientConnection, responses.TooManyRequestsResponse{})
        }
    }
}

rate_limiter.go:

const (
    TOKENS_DEPTH_SIZE                int = 1
    ACCEPTED_CONNECTIONS_BUFFER_SIZE int = 20
    PENDING_CONNECTIONS_BUFFER_SIZE  int = 2000
)

type Limiter struct {
    tokensBucketDepth        int
    pendingConnectionsQueue  chan net.Conn
    AcceptedConnectionsQueue chan net.Conn
    tokensMutex              sync.Mutex
}

func MakeRateLimiter() *Limiter {
    limiter := Limiter{
        tokensBucketDepth:        TOKENS_DEPTH_SIZE,
        pendingConnectionsQueue:  make(chan net.Conn, PENDING_CONNECTIONS_BUFFER_SIZE),
        AcceptedConnectionsQueue: make(chan net.Conn, ACCEPTED_CONNECTIONS_BUFFER_SIZE),
        tokensMutex:              sync.Mutex{},
    }
    
    go Refill(&limiter)

    return &limiter
}

func (l *Limiter) ProceedOrBufferConnection(conn net.Conn) (bool, error) {
    l.tokensMutex.Lock()
    if l.tokensBucketDepth > 0 {
        // we have a token, proceed
        l.tokensBucketDepth--
        l.tokensMutex.Unlock()

        return true, nil
    }

    l.tokensMutex.Unlock()

    /* we did not have a token, try to queue
     * the connection in the pending buffer
     */
    select {
    case l.pendingConnectionsQueue <- conn:
    default:
        /* our pending buffer is full, there's nothing
         * we can do here, we should return Too Many Requests
         */
        return false, errors.New("buffer is full, message should be discarded")
    }
    return false, nil
}
    
func Refill(l *Limiter) {
    ticker := time.NewTicker(3333 * time.Microsecond)
    for {
        select {
        case <-ticker.C:
            l.tokensMutex.Lock()
            if l.tokensBucketDepth < TOKENS_DEPTH_SIZE {
                select {
                case conn := <-l.pendingConnectionsQueue:
                    select {
                    case l.AcceptedConnectionsQueue <- conn:
                    default:
                        select {
                        case l.pendingConnectionsQueue <- conn:
                            l.tokensBucketDepth++
                        default:
                            consumer.Consumer{}.ConsumeAndRespond(conn, responses.TooManyRequestsResponse{})
                        }
                    }
                default:
                    l.tokensBucketDepth++
                }
            }

            l.tokensMutex.Unlock()
        default:
        }
    }
}

请注意,在这种情况下,限制器从一个令牌开始。这意味着我们从第一个令牌开始就强制执行速率,并在突发情况下立即排队。你可能想调整这个属性。

运行这个,以下是使用hey的结果:

hey -n 2000 -c 4 -q 1000 -m GET http://localhost:8000/ -t 1

这发送了2000个请求,通过4个工作线程以1000个请求/秒的速率分配。

结果如下:

概要:
  总共:6.6374秒
  最慢:0.0376秒
  最快:0.0001秒
  平均:0.0132秒
  请求/秒:301.3217

  总数据:58000字节
  每个请求大小:29字节

响应时间直方图:
  0.000 [1]	|
  0.004 [23]	|
  0.008 [5]	|
  0.011 [9]	|
  0.015 [1941]	|■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
  0.019 [7]	|
  0.023 [9]	|
  0.026 [2]	|
  0.030 [1]	|
  0.034 [0]	|
  0.038 [2]	|


延迟分布:
  10%在0.0131秒内
  25%在0.0132秒内
  50%在0.0133秒内
  75%在0.0134秒内
  90%在0.0136秒内
  95%在0.0136秒内
  99%在0.0176秒内

详细信息(平均值、最快值、最慢值):
  DNS+拨号:0.0004秒、0.0001秒、0.0376秒
  DNS查找:0.0002秒、0.0000秒、0.0071秒
  请求写入:0.0000秒、0.0000秒、0.0004秒
  响应等待:0.0128秒、0.0001秒、0.0375秒
  响应读取:0.0000秒、0.0000秒、0.0007秒

状态码分布:
  [200]	2000个响应

正如你所看到的,我们实现了300个请求/秒的请求处理上限。

但是,如果我们将补充窗口减半为每1.667毫秒一次,我们得到:

概要:
  总共:3.3454秒
  最慢:0.0196秒
  最快:0.0015秒
  平均:0.0067秒
  请求/秒:597.8337

将我们的速率增加了一倍。

完整仓库的链接:https://github.com/miguelpais/go-http-server-and-cli

希望这对你有所帮助,请批评我的解决方案。

英文:

So, after a while I've achieved what I wanted and the simple solution is to implement a token-based rate limiter.

The basic idea is simple, you have a bucket of depth N containing tokens. Each time a request needs to get processed, you retrieve one of the tokens available if any, reducing the number of available tokens by 1.

If none are available, you have two choices, either respond immediately with 429 Too Many Requests or queue the incoming request for processing only when tokens will be available.

Between the two choices lies different reasons for why a rate limiter was implemented. A) You have it in place to control the performance bounds under which your application runs at a steady state. B) You have it in place due to a contract on requests per second a clients can hit your API.

Not queueing requests and answering immediately with 429 Too Many Requests is suitable for enforcing B). Instead, for A) clients will probably prefer their request to be server with a delay than to receive no response at all, so queueing rate limited requests is the right choice, up to a certain point given by the memory constraints of your application.

In any case, the trick of the token algorithm is in controlling the rate at which tokens become available once again. If we want to achieve a rate limiting of 300 req/s, we would like a goroutine to replenish a single token on a non-full bucket every 3.33 ms (1000 ms / 300 req/s). That is, regardless of how the incoming requests are consuming the bucket, replenishing occurs at fixed intervals, 300 times a second, or every 3.33ms. The purpose of the bucket size is to allow bursts of requests to be properly accepted while still enforcing the overall rate.

I have achieved this with the following logic:

http_server.go:

const (
	MAX_WORKERS int = 1
)

type HttpServer struct {
	rateLimiter *limiter.Limiter
}

func BuildHttpServer() HttpServer {
	server := HttpServer{
		rateLimiter: limiter.MakeRateLimiter(),
	}

	for i := 0; i &lt; MAX_WORKERS; i++ {
		go handler.SpawnHandler(server.rateLimiter.AcceptedConnectionsQueue)
	}

	return server
}

func (h HttpServer) Serve(host, path string) {
	connection, err := net.Listen(&quot;tcp&quot;, &quot;0.0.0.0:8000&quot;)
	if err != nil { /* ... */ }

	for true {
		clientConnection, err := connection.Accept()
		if err != nil { /* ... */ }

		if proceed, err := h.rateLimiter.ProceedOrBufferConnection(clientConnection); err != nil {
			/* err != nil means connection was rate limited
             * but could not be buffered
             */
			consumer.Consumer{}.ConsumeAndRespond(clientConnection, responses.TooManyRequestsResponse{})
			continue
		} else if !proceed {
            /* proceed equals false means connection 
             * was rate limited
             */
			continue
		}

		select {
		case h.rateLimiter.AcceptedConnectionsQueue &lt;- clientConnection:
		default:
            /* reaching this case means our workers
             * are not able to keep up with the rate at 
             * which we accept connections. You should detect
             * this scenario and increase
             * the number of workers or the 
             * accepted connections buffer size
             */
			consumer.Consumer{}.ConsumeAndRespond(clientConnection, responses.TooManyRequestsResponse{})
		}
	}
}

rate_limiter.go:

const (
	TOKENS_DEPTH_SIZE                int = 1
	ACCEPTED_CONNECTIONS_BUFFER_SIZE int = 20
	PENDING_CONNECTIONS_BUFFER_SIZE  int = 2000
)

type Limiter struct {
	tokensBucketDepth        int
	pendingConnectionsQueue  chan net.Conn
	AcceptedConnectionsQueue chan net.Conn
	tokensMutex              sync.Mutex
}

func MakeRateLimiter() *Limiter {
	limiter := Limiter{
		tokensBucketDepth:        TOKENS_DEPTH_SIZE,
		pendingConnectionsQueue:  make(chan net.Conn, PENDING_CONNECTIONS_BUFFER_SIZE),
		AcceptedConnectionsQueue: make(chan net.Conn, ACCEPTED_CONNECTIONS_BUFFER_SIZE),
		tokensMutex:              sync.Mutex{},
	}
    
    go Refill(&amp;limiter)

	return &amp;limiter
}

func (l *Limiter) ProceedOrBufferConnection(conn net.Conn) (bool, error) {
	l.tokensMutex.Lock()
	if l.tokensBucketDepth &gt; 0 {
        // we have a token, proceed
		l.tokensBucketDepth--
		l.tokensMutex.Unlock()

		return true, nil
	}

	l.tokensMutex.Unlock()

    /* we did not have a token, try to queue
     * the connection in the pending buffer
     */
	select {
	case l.pendingConnectionsQueue &lt;- conn:
	default:
        /* our pending buffer is full, there&#39;s nothing
         * we can do here, we should return Too Many Requests
         */
		return false, errors.New(&quot;buffer is full, message should be discarded&quot;)
	}
	return false, nil
}
    
func Refill(l *Limiter) {
	ticker := time.NewTicker(3333 * time.Microsecond)
	for {
		select {
		case &lt;-ticker.C:
			l.tokensMutex.Lock()
			if l.tokensBucketDepth &lt; TOKENS_DEPTH_SIZE {
				select {
				case conn := &lt;-l.pendingConnectionsQueue:
					select {
					case l.AcceptedConnectionsQueue &lt;- conn:
					default:
						select {
						case l.pendingConnectionsQueue &lt;- conn:
							l.tokensBucketDepth++
						default:
							consumer.Consumer{}.ConsumeAndRespond(conn, responses.TooManyRequestsResponse{})
						}
					}
				default:
					l.tokensBucketDepth++
				}
			}

			l.tokensMutex.Unlock()
		default:
		}
	}
}

Notice how the limiter starts with a single token in this scenario. This means we enforce the rate right from the very first token and queue immediately in case of bursts. You might want to play around with this property.

Running this, here are the results with hey:

> hey -n 2000 -c 4 -q 1000 -m GET http://localhost:8000/ -t 1

This sends 2000 requests, divided through 4 workers at 1000 req/s rate.

Instead, the results are:

Summary:
  Total:	6.6374 secs
  Slowest:	0.0376 secs
  Fastest:	0.0001 secs
  Average:	0.0132 secs
  Requests/sec:	301.3217

  Total data:	58000 bytes
  Size/request:	29 bytes

Response time histogram:
  0.000 [1]	|
  0.004 [23]	|
  0.008 [5]	|
  0.011 [9]	|
  0.015 [1941]	|■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
  0.019 [7]	|
  0.023 [9]	|
  0.026 [2]	|
  0.030 [1]	|
  0.034 [0]	|
  0.038 [2]	|


Latency distribution:
  10% in 0.0131 secs
  25% in 0.0132 secs
  50% in 0.0133 secs
  75% in 0.0134 secs
  90% in 0.0136 secs
  95% in 0.0136 secs
  99% in 0.0176 secs

Details (average, fastest, slowest):
  DNS+dialup:	0.0004 secs, 0.0001 secs, 0.0376 secs
  DNS-lookup:	0.0002 secs, 0.0000 secs, 0.0071 secs
  req write:	0.0000 secs, 0.0000 secs, 0.0004 secs
  resp wait:	0.0128 secs, 0.0001 secs, 0.0375 secs
  resp read:	0.0000 secs, 0.0000 secs, 0.0007 secs

Status code distribution:
  [200]	2000 responses

As you've seen, we have thus achieved an upper bound of request processing at 300 req/s.

But if now we half the refill window to every 1.667 ms, we get:

Summary:
  Total:	3.3454 secs
  Slowest:	0.0196 secs
  Fastest:	0.0015 secs
  Average:	0.0067 secs
  Requests/sec:	597.8337

Increasing our rate two-fold.

Link to the complete repo: https://github.com/miguelpais/go-http-server-and-cli

Hope this helps and please do criticise my solution.

huangapple
  • 本文由 发表于 2023年7月6日 14:37:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76626105.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定