Golang implements various types of traffic limit

Preface

When developing a highly concurrent system, we may encounter an interface that is accessed too often. In order to ensure high availability and stability of the system, it is necessary to do flow limiting, you may be using a Web Server like Nginx to control the requests, or you may be using some popular class library implementation. Flow limiting is a major killer for highly concurrent systems, so let's understand what they are before we design a flow limiting algorithm.

Stream limiting

The purpose of flow limiting is to protect the system by limiting the rate of concurrent access requests, or limiting the rate of requests within a time window, and once the limited rate is reached it can be handled by denying service, queuing or waiting, downgrading, etc. The purpose is to protect the system by imposing a speed limit on concurrent (or within a certain time window) requests, and once the rate limit is reached deny service (directing to an error page or informing that the resource is no longer available), queue or wait (e.g. seconds, comments, order placement), and downgrade (return the pocket or default data).

As the cartoon on the figure, at a certain time the traffic comes up, the frequency of access to the interface of the service may be very fast, if we do not do a limit on the frequency of access to the interface may lead to the server can not withstand the excessive pressure to hang, this time may also produce data loss, so it is necessary to limit the flow of processing.

The flow-limiting algorithm helps us to control how often each interface or program function is called, which is like a fuse to prevent the system from crashing due to exceeding the access frequency or concurrency. We may see a response header like this when calling some third-party interfaces.

X-RateLimit-Limit: 60         // 60 requests per second X-RateLimit-Remaining: 22     //how many requests are currently left X-RateLimit-Reset: 1612184024 //limit reset time

The above HTTP Response is to tell the caller how often the service side is limited through the response header, to ensure that the back-end interface access limit. There are many algorithms to solve the flow limitation problem, and they all have different purposes. The common strategy is to reject the requests that exceed the limit, or let the requests that exceed the limit wait in line.

In general, the common means of handling flow limiting are

Counters

sliding windows

Leaky buckets

Token buckets


Counter

Counter is one of the simplest algorithms to limit the flow, the principle is: in a period of time interval, the request is counted, compared with the threshold value to determine whether the need to limit the flow, once the time threshold, the counter will be cleared to zero.

This is like you go to the same car, the carriage specified how many positions, full will not allow the car, otherwise it is overloaded, caught by the traffic police uncle to be fined, if our system that is not a matter of fines, may be a direct collapse.

Can set a variable count in the program, when a request over I will be this number +1, while recording the request time.

When the next request comes, I will judge whether the count value exceeds the set frequency and whether the current request time and the first request time are within 1 minute.

If it is within 1 minute and exceeds the set frequency, it proves that there are too many requests and the next requests are rejected.

If the interval between the request and the first request is longer than the count period, and the count value is still within the flow limit, the count is reset.

Code implementation:

package main

import (

    "log"

    "sync"

    "time"

)


type Counter struct {

    rate int // the maximum number of requests allowed in the count period

    begin time.Time // start time of the count

    cycle time.Duration // the count period

    count int // the total number of requests received in the count cycle

    lock sync.Mutex

lock sync.}


func (l *Counter) Allow() bool {

    l.lock.Lock()

    defer l.lock.Unlock()


    if l.count == l.rate-1 {

        now := time.Now()

        if now.Sub(l.begin) >= l.cycle {

            // reset the counter within the speed range

            l.Reset(now)

            return true

        } else {

            return false

        }

    } else {

        //not reach rate limit, count plus 1

        l.count++

        return true

    }

}


func (l *Counter) Set(r int, cycle time.Duration) {

    l.rate = r

    l.begin = time.Now()

    l.cycle = cycle

    l.count = 0

}


func (l *Counter) Reset(t time.Time) {

    l.begin = t

    l.count = 0

}


func main() {

    var wg sync.WaitGroup

    var lr Counter

    lr.Set(3, time.Second) // up to 3 requests in 1s

    for i := 0; i < 10; i++ {

        wg.Add(1)

        log.Println("Create request:", i)

        go func(i int) {

          if lr.Allow() {

              log.Println("Responding to request:", i)

          }

          wg.Done()

        }(i)


        time.Sleep(200 * time.Millisecond)

    }

    wg.Wait()

}


OutPut:

2021/02/01 21:16:12 Create request: 0

2021/02/01 21:16:12 Responding to a request: 0

2021/02/01 21:16:12 Created request: 1

2021/02/01 21:16:12 Responding to a request: 1

2021/02/01 21:16:12 Create request: 2

2021/02/01 21:16:13 Create request: 3

2021/02/01 21:16:13 Created request: 4

2021/02/01 21:16:13 Create request: 5

2021/02/01 21:16:13 Responding to a request: 5

2021/02/01 21:16:13 Create request: 6

2021/02/01 21:16:13 Responding to a request: 6

2021/02/01 21:16:13 Created request: 7

2021/02/01 21:16:13 Responding to a request: 7

2021/02/01 21:16:14 Created request: 8

2021/02/01 21:16:14 Created request: 9


As you can see, we set the request to be created every 200ms, which is obviously higher than the limit of 3 requests per second, and after running it, we found that the requests numbered 2, 3, 4, 8 and 9 were dropped, which means that the flow limit was successful.

So the question is, if there is a requirement for a certain interface /query to allow a maximum of 200 accesses per minute, suppose a user sends 200 requests in the last few milliseconds of the 59th second, and when the Counter is cleared at the end of the 59th second, he sends another 200 requests in the next second. This is in line with our design logic, which is a design flaw of the counter method, and the system may be subject to a large number of requests from malicious users, or even break the system.

This method is simple, but there is a big problem that it doesn't handle the unit time bound very well.

Sliding Window

Sliding window is a traffic control technique that is used in the TCP protocol to address the critical point defect of the counter. A sliding window divides a fixed time slice and moves it as time passes. A fixed number of movable cells are counted and threshold values are determined.

In the above diagram we use the red dotted line to represent a time window (one minute), each time window has 6 grids and each grid is 10 seconds. Every 10 seconds the time window moves one frame to the right, which can be seen in the direction of the red arrow. We set an independent counter Counter for each grid, if a request is accessed at 0:45 then we will be the fifth grid counter +1 (that is, 0:40 ~ 0:50), in the judgment of the flow limit when you need to add up the count of all the grid and set the frequency of comparison can be.

So how does the sliding window solve the problem we encountered above? Take a look at the following diagram.

When the user sends 200 requests at 0:59 seconds, the counter of the sixth cell will record +200, and when the time window moves to the right in the next second, the counter has already recorded 200 requests sent by the user, so if the user sends again, it will trigger the flow limit and reject the new requests.

In fact, the counter is a sliding window, but there is only one grid, so to make the flow restriction more accurate just divide more grids, in order to be more accurate we do not know how many grids should be set, the number of grids affect the accuracy of the sliding window algorithm, there is still the concept of time slice, can not solve the problem of the critical point.

Related algorithm implementation github.com/RussellLuo/...

Leaky Bucket

The Leaky Bucket algorithm, the principle is a fixed-capacity leaky bucket, according to a fixed rate of outflow of water droplets. If you have used a faucet, you know that when you turn on the faucet, water will flow down and drip into the bucket, and the Leaky Bucket means that there is a loophole under the bucket for water to come out. If the faucet is opened especially large then the water flow rate will be too large, which may lead to the bucket full of water and then overflow.

A bucket with a fixed capacity has water flowing in and out. For the water flowing in, we cannot predict how much water will flow in and how fast the water will flow. But for the water flowing out, the bucket can fix the rate of water flowing out (processing speed), thus achieving the effect of flow shaping and flow control.

Code implementation:

type LeakyBucket struct {

    rate float64 //fixed rate per second

    capacity float64 // capacity of the bucket

    water float64 // current volume of water in the bucket

    lastLeakMs int64 //The bucket's last leak timestamp ms


    lock sync.Mutex

    sync.Mutex 

}

func (l *LeakyBucket) Allow() bool {

    l.lock.Lock()

    defer l.lock.Unlock()

    now := time.Now().UnixNano() / 1e6

    eclipse := float64((now - l.lastLeakMs)) * l.rate / 1000 // perform leak first

    l.water = l.water - eclipse // calculate remaining water

    l.water = math.Max(0, l.water) // bucket is dry

    l.lastLeakMs = now

    if (l.water + 1) < l.capacity {

        // try to add water, and the water is not yet full

        l.water++

        return true

    } else {

        // water is full, refuse to add water

        return false

    }

}

func (l *LeakyBucket) Set(r, c float64) {

    l.rate = r

    l.capacity = c

    l.water = 0

    l.lastLeakMs = time.Now().UnixNano() / 1e6

}

The leaky bucket algorithm has the following characteristics.

The leaky bucket has a fixed capacity and the rate of discharge is a fixed constant (outflow request)

If the bucket is empty, no drops need to flow out

Drops can flow into the leaky bucket at any rate (inflow request)

If the inflow exceeds the bucket's capacity, the inflow overflows (new request is rejected)

The leaky bucket is limited to a constant outflow rate (i.e., the outflow rate is a fixed constant value), so the maximum rate is the rate of outflow, and no bursts of flow can occur.

Token Bucket Algorithm

The Token Bucket algorithm is one of the most commonly used algorithms for network traffic shaping and rate limiting. Typically, the token bucket algorithm is used to control the amount of data sent to the network and to allow bursts of data to be sent.

We have a fixed bucket that holds the tokens. The bucket is empty at first, and the system adds tokens to the bucket at a fixed time (rate) until the bucket is full and the excess requests are discarded. When a request comes in, a token is removed from the bucket, and if the bucket is empty, the request is rejected or blocked.

Implementation code.

type TokenBucket struct {

    rate int64 // fixed rate of token placement, r/s

    capacity int64 // capacity of the bucket

    tokens int64 // number of current tokens in the bucket

    lastTokenSec int64 // timestamp of the last token placed in the bucket s

    lock sync.Mutex

}

func (l *TokenBucket) Allow() bool {

    l.lock.Lock()

    defer l.lock.Unlock()

    now := time.Now().Unix()

    l.tokens = l.tokens + (now-l.lastTokenSec)*l.rate // add tokens first

    if l.tokens > l.capacity {

        l.tokens = l.capacity

    }

    l.lastTokenSec = now

    if l.tokens > 0 {

        // there are still tokens left, get the token

        l.tokens--

        return true

    } else {

        // no token, then reject

        return false

    }

}

func (l *TokenBucket) Set(r, c int64) {

    l.rate = r

    l.capacity = c

    l.tokens = 0

    l.lastTokenSec = time.Now().Unix()

}

The token bucket has the following characteristics.

Tokens are placed in the token bucket at a fixed rate

The bucket holds up to B tokens, and when the bucket is full, newly added tokens are discarded or rejected

If there are less than N tokens in the bucket, the token is not deleted and the request is restricted (discarded or blocked and waited for)

The token bucket limits the average inflow rate (allowing burst requests to be processed as long as there are tokens, supporting 3 tokens at a time, 4 tokens...) and allows some degree of bursty traffic.

Summary

Currently commonly used is the token bucket this, this article introduces several common flow limiting algorithm implementation

Comments

Popular posts from this blog

Python Receiving and parse JSON Data via UDP protocol

ubus lua client method and event registration code demo/example