目录
- 固定窗口
- 滑动窗口
- 漏桶算法
- 令牌桶
- 滑动日志
- 总结
限流是项目中经常需要使用到的一种工具,一般用于限制用户的请求的频率,也可以避免瞬间流量过大导致系统崩溃,或者稳定消息处理速率
这个文章主要是使用Go实现常见的限流算法,代码参考了文章面试官:来,年轻人!请手撸5种常见限流算法! 和面试必备:4种经典限流算法讲解如果需要Java实现或更详细的算法介绍可以看这两篇文章
固定窗口
每开启一个新的窗口,在窗口时间大小内,可以通过窗口请求上限个请求。
该算法主要是会存在临界问题,如果流量都集中在两个窗口的交界处,那么突发流量会是设置上限的两倍。
package limiter | |
import ( | |
"sync" | |
"time" | |
) | |
// FixedWindowLimiter 固定窗口限流器 | |
type FixedWindowLimiter struct { | |
limit int // 窗口请求上限 | |
window time.Duration // 窗口时间大小 | |
counter int // 计数器 | |
lastTime time.Time // 上一次请求的时间 | |
mutex sync.Mutex // 避免并发问题 | |
} | |
func NewFixedWindowLimiter(limit int, window time.Duration) *FixedWindowLimiter { | |
return &FixedWindowLimiter{ | |
limit: limit, | |
window: window, | |
lastTime: time.Now(), | |
} | |
} | |
func (l *FixedWindowLimiter) TryAcquire() bool { | |
l.mutex.Lock() | |
defer l.mutex.Unlock() | |
// 获取当前时间 | |
now := time.Now() | |
// 如果当前窗口失效,计数器清,开启新的窗口 | |
if now.Sub(l.lastTime) > l.window { | |
l.counter = | |
l.lastTime = now | |
} | |
// 若到达窗口请求上限,请求失败 | |
if l.counter >= l.limit { | |
return false | |
} | |
// 若没到窗口请求上限,计数器+,请求成功 | |
l.counter++ | |
return true | |
} |
滑动窗口
滑动窗口类似于固定窗口,它只是把大窗口切分成多个小窗口,每次向右移动一个小窗口,它可以避免两倍的突发流量。
固定窗口可以说是滑动窗口的一种特殊情况,只要滑动窗口里面的小窗口和大窗口大小一样。
窗口算法都有一个问题,当流量达到上限,后面的请求都会被拒绝。
package limiter | |
import ( | |
"errors" | |
"sync" | |
"time" | |
) | |
// SlidingWindowLimiter 滑动窗口限流器 | |
type SlidingWindowLimiter struct { | |
limit int // 窗口请求上限 | |
window int // 窗口时间大小 | |
smallWindow int // 小窗口时间大小 | |
smallWindows int // 小窗口数量 | |
counters map[int]int // 小窗口计数器 | |
mutex sync.Mutex // 避免并发问题 | |
} | |
// NewSlidingWindowLimiter 创建滑动窗口限流器 | |
func NewSlidingWindowLimiter(limit int, window, smallWindow time.Duration) (*SlidingWindowLimiter, error) { | |
// 窗口时间必须能够被小窗口时间整除 | |
if window%smallWindow != { | |
return nil, errors.New("window cannot be split by integers") | |
} | |
return &SlidingWindowLimiter{ | |
limit: limit, | |
window: int(window), | |
smallWindow: int(smallWindow), | |
smallWindows: int(window / smallWindow), | |
counters: make(map[int]int), | |
}, nil | |
} | |
func (l *SlidingWindowLimiter) TryAcquire() bool { | |
l.mutex.Lock() | |
defer l.mutex.Unlock() | |
// 获取当前小窗口值 | |
currentSmallWindow := time.Now().UnixNano() / l.smallWindow * l.smallWindow | |
// 获取起始小窗口值 | |
startSmallWindow := currentSmallWindow - l.smallWindow*(l.smallWindows-) | |
// 计算当前窗口的请求总数 | |
var count int | |
for smallWindow, counter := range l.counters { | |
if smallWindow < startSmallWindow { | |
delete(l.counters, smallWindow) | |
} else { | |
count += counter | |
} | |
} | |
// 若到达窗口请求上限,请求失败 | |
if count >= l.limit { | |
return false | |
} | |
// 若没到窗口请求上限,当前小窗口计数器+,请求成功 | |
l.counters[currentSmallWindow]++ | |
return true | |
} |
漏桶算法
漏桶是模拟一个漏水的桶,请求相当于往桶里倒水,处理请求的速度相当于水漏出的速度。
主要用于请求处理速率较为稳定的服务,需要使用生产者消费者模式把请求放到一个队列里,让消费者以一个较为稳定的速率处理。
package limiter | |
import ( | |
"sync" | |
"time" | |
) | |
// LeakyBucketLimiter 漏桶限流器 | |
type LeakyBucketLimiter struct { | |
peakLevel int // 最高水位 | |
currentLevel int // 当前水位 | |
currentVelocity int // 水流速度/秒 | |
lastTime time.Time // 上次放水时间 | |
mutex sync.Mutex // 避免并发问题 | |
} | |
func NewLeakyBucketLimiter(peakLevel, currentVelocity int) *LeakyBucketLimiter { | |
return &LeakyBucketLimiter{ | |
peakLevel: peakLevel, | |
currentVelocity: currentVelocity, | |
lastTime: time.Now(), | |
} | |
} | |
func (l *LeakyBucketLimiter) TryAcquire() bool { | |
l.mutex.Lock() | |
defer l.mutex.Unlock() | |
// 尝试放水 | |
now := time.Now() | |
// 距离上次放水的时间 | |
interval := now.Sub(l.lastTime) | |
if interval >= time.Second { | |
// 当前水位-距离上次放水的时间(秒)*水流速度 | |
l.currentLevel = maxInt(, l.currentLevel-int(interval/time.Second)*l.currentVelocity) | |
l.lastTime = now | |
} | |
// 若到达最高水位,请求失败 | |
if l.currentLevel >= l.peakLevel { | |
return false | |
} | |
// 若没有到达最高水位,当前水位+,请求成功 | |
l.currentLevel++ | |
return true | |
} | |
func maxInt(a, b int) int { | |
if a > b { | |
return a | |
} | |
return b | |
} |
令牌桶
与漏桶算法的相反,令牌桶会不断地把令牌添加到桶里,而请求会从桶中获取令牌,只有拥有令牌地请求才能被接受。
因为桶中可以提前保留一些令牌,所以它允许一定地突发流量通过。
package limiter | |
import ( | |
"sync" | |
"time" | |
) | |
// TokenBucketLimiter 令牌桶限流器 | |
type TokenBucketLimiter struct { | |
capacity int // 容量 | |
currentTokens int // 令牌数量 | |
rate int // 发放令牌速率/秒 | |
lastTime time.Time // 上次发放令牌时间 | |
mutex sync.Mutex // 避免并发问题 | |
} | |
func NewTokenBucketLimiter(capacity, rate int) *TokenBucketLimiter { | |
return &TokenBucketLimiter{ | |
capacity: capacity, | |
rate: rate, | |
lastTime: time.Now(), | |
} | |
} | |
func (l *TokenBucketLimiter) TryAcquire() bool { | |
l.mutex.Lock() | |
defer l.mutex.Unlock() | |
// 尝试发放令牌 | |
now := time.Now() | |
// 距离上次发放令牌的时间 | |
interval := now.Sub(l.lastTime) | |
if interval >= time.Second { | |
// 当前令牌数量+距离上次发放令牌的时间(秒)*发放令牌速率 | |
l.currentTokens = minInt(l.capacity, l.currentTokens+int(interval/time.Second)*l.rate) | |
l.lastTime = now | |
} | |
// 如果没有令牌,请求失败 | |
if l.currentTokens == { | |
return false | |
} | |
// 如果有令牌,当前令牌-,请求成功 | |
l.currentTokens-- | |
return true | |
} | |
func minInt(a, b int) int { | |
if a < b { | |
return a | |
} | |
return b | |
} |
滑动日志
滑动日志与滑动窗口算法类似,但是滑动日志主要用于多级限流的场景,比如短信验证码1分钟1次,1小时10次,1天20次这种业务。
算法流程与滑动窗口相同,只是它可以指定多个策略,同时在请求失败的时候,需要通知调用方是被哪个策略所拦截。
package limiter | |
import ( | |
"errors" | |
"fmt" | |
"sort" | |
"sync" | |
"time" | |
) | |
// ViolationStrategyError 违背策略错误 | |
type ViolationStrategyError struct { | |
Limit int // 窗口请求上限 | |
Window time.Duration // 窗口时间大小 | |
} | |
func (e *ViolationStrategyError) Error() string { | |
return fmt.Sprintf("violation strategy that limit = %d and window = %d", e.Limit, e.Window) | |
} | |
// SlidingLogLimiterStrategy 滑动日志限流器的策略 | |
type SlidingLogLimiterStrategy struct { | |
limit int // 窗口请求上限 | |
window int // 窗口时间大小 | |
smallWindows int // 小窗口数量 | |
} | |
func NewSlidingLogLimiterStrategy(limit int, window time.Duration) *SlidingLogLimiterStrategy { | |
return &SlidingLogLimiterStrategy{ | |
limit: limit, | |
window: int(window), | |
} | |
} | |
// SlidingLogLimiter 滑动日志限流器 | |
type SlidingLogLimiter struct { | |
strategies []*SlidingLogLimiterStrategy // 滑动日志限流器策略列表 | |
smallWindow int // 小窗口时间大小 | |
counters map[int]int // 小窗口计数器 | |
mutex sync.Mutex // 避免并发问题 | |
} | |
func NewSlidingLogLimiter(smallWindow time.Duration, strategies ...*SlidingLogLimiterStrategy) (*SlidingLogLimiter, error) { | |
// 复制策略避免被修改 | |
strategies = append(make([]*SlidingLogLimiterStrategy,, len(strategies)), strategies...) | |
// 不能不设置策略 | |
if len(strategies) == { | |
return nil, errors.New("must be set strategies") | |
} | |
// 排序策略,窗口时间大的排前面,相同窗口上限大的排前面 | |
sort.Slice(strategies, func(i, j int) bool { | |
a, b := strategies[i], strategies[j] | |
if a.window == b.window { | |
return a.limit > b.limit | |
} | |
return a.window > b.window | |
}) | |
fmt.Println(strategies[], strategies[1]) | |
for i, strategy := range strategies { | |
// 随着窗口时间变小,窗口上限也应该变小 | |
if i > { | |
if strategy.limit >= strategies[i-].limit { | |
return nil, errors.New("the smaller window should be the smaller limit") | |
} | |
} | |
// 窗口时间必须能够被小窗口时间整除 | |
if strategy.window%int(smallWindow) != 0 { | |
return nil, errors.New("window cannot be split by integers") | |
} | |
strategy.smallWindows = strategy.window / int(smallWindow) | |
} | |
return &SlidingLogLimiter{ | |
strategies: strategies, | |
smallWindow: int(smallWindow), | |
counters: make(map[int]int), | |
}, nil | |
} | |
func (l *SlidingLogLimiter) TryAcquire() error { | |
l.mutex.Lock() | |
defer l.mutex.Unlock() | |
// 获取当前小窗口值 | |
currentSmallWindow := time.Now().UnixNano() / l.smallWindow * l.smallWindow | |
// 获取每个策略的起始小窗口值 | |
startSmallWindows := make([]int, len(l.strategies)) | |
for i, strategy := range l.strategies { | |
startSmallWindows[i] = currentSmallWindow - l.smallWindow*(strategy.smallWindows-) | |
} | |
// 计算每个策略当前窗口的请求总数 | |
counts := make([]int, len(l.strategies)) | |
for smallWindow, counter := range l.counters { | |
if smallWindow < startSmallWindows[] { | |
delete(l.counters, smallWindow) | |
continue | |
} | |
for i := range l.strategies { | |
if smallWindow >= startSmallWindows[i] { | |
counts[i] += counter | |
} | |
} | |
} | |
// 若到达对应策略窗口请求上限,请求失败,返回违背的策略 | |
for i, strategy := range l.strategies { | |
if counts[i] >= strategy.limit { | |
return &ViolationStrategyError{ | |
Limit: strategy.limit, | |
Window: time.Duration(strategy.window), | |
} | |
} | |
} | |
// 若没到窗口请求上限,当前小窗口计数器+,请求成功 | |
l.counters[currentSmallWindow]++ | |
return nil | |
} |
总结
- 如果需要一个简单高效的算法,可以使用固定窗口,但是它可能产生两倍的突发流量
- 可以通过滑动窗口避免突发流量问题,但是窗口可能会掐断流量一段时间
- 如果需要更平滑的流量,可以使用漏桶算法搭配生产者消费者模式
- 如果能够处理一定的突发流量,可以使用令牌桶算法
- 遇到多级限流的场景,滑动日志会更加适合