限流算法
限流算法
固定窗口
- 把时间切成固定长度的区间(如 1 s)。
- 每个区间维护一个整数计数器。
- 请求进来 ⇒ 计数器 +1;超过阈值立即拒绝。
- 区间结束瞬间计数器归零。
class FixedWindow {
private final long windowMillis;
private final int limit;
private final AtomicLong counter = new AtomicLong(0);
private volatile long windowStart;
boolean tryAcquire() {
long now = System.currentTimeMillis();
if (now - windowStart >= windowMillis) { // 进入新窗口
windowStart = now;
counter.set(0);
}
return counter.incrementAndGet() <= limit;
}
}
缺陷: 边界翻倍:第 999 ms 来 100 个请求,第 1000 ms 立即清零又来 100 个,实际 2 ms 内通过 200 个,两倍流量冲击后端。
优化:
滑动窗口
- 把大窗口再细分为 N 个子窗口(桶),例如 1 s 切成 10 格,每格 100 ms。
- 用循环队列/链表记录每个子窗口的请求次数。
- 每来一次请求:
a. 删掉“过期”的子窗口;
b. 累加剩余子窗口的总数;
c. 总数 ≤ 阈值则通过,并把当前子窗口 +1。
- 缺点:需要记录每条时间戳,内存随请求量线性增长;
优化
- 分桶计数,使用
AtomicLongArray- 1 s 切成 20 格,每格 50 ms;用
long[20] 数组存计数,无对象头开销;数组大小 = 20 × 8 B = 160 B,常驻 L1
- 环形索引 & 原子累加
- 使用对时间戳取余操作,循环使用分桶
- AtomicLongArray的cas方法进行无锁累加
漏桶
漏桶算法模拟 “注水入桶再匀速漏水” 的过程,核心是严格控制请求的输出速率,平滑流量波动:
- 想象一个固定容量的桶,新请求(“水”)不断进入桶中;
- 桶以固定速率(如每秒处理 N 个请求)向外 “漏水”(处理请求);
- 若请求进入速度超过漏水速度,桶会被填满,后续请求会因 “溢出” 被拒绝或缓存。
class LeakyBucket:
def __init__(self, capacity, leak_rate):
self.capacity = capacity # 桶的最大容量(最多缓存的请求数)
self.leak_rate = leak_rate # 漏水速率(每秒处理的请求数)
self.current_water = 0 # 当前桶中的"水量"(缓存的请求数)
self.last_leak_time = current_time() # 上次漏水的时间
def allow_request(self, request_size=1):
# 1. 先计算从上次到现在漏出的水量(处理的请求数)
now = current_time()
elapsed = now - self.last_leak_time
leaked = elapsed * self.leak_rate # 漏出的水量 = 时间差 * 漏水速率
self.current_water = max(0, self.current_water - leaked) # 更新当前水量
self.last_leak_time = now # 更新漏水时间
# 2. 判断新请求是否能加入桶中
if self.current_water + request_size <= self.capacity:
self.current_water += request_size
return True # 允许请求(加入桶中等待处理)
else:
return False # 拒绝请求(桶已满)
令牌桶
令牌桶算法通过 “生成令牌 - 消耗令牌” 控制流量,核心是允许一定的突发流量,同时限制长期平均速率:
- 系统按固定速率(如每秒生成 N 个)向桶中放入 “令牌”;
- 桶有最大容量,令牌满后不再生成(避免无限制积累);
- 每个请求需要消耗 1 个令牌才能被处理,若桶中无令牌,请求被拒绝或等待。
class TokenBucket:
def __init__(self, capacity, token_rate):
self.capacity = capacity # 桶的最大令牌数
self.token_rate = token_rate # 令牌生成速率(每秒生成的令牌数)
self.current_tokens = 0 # 当前桶中的令牌数
self.last_token_time = current_time() # 上次生成令牌的时间
def allow_request(self, tokens_needed=1):
# 1. 先计算从上次到现在应生成的令牌数
now = current_time()
elapsed = now - self.last_token_time
new_tokens = elapsed * self.token_rate # 新生成的令牌 = 时间差 * 生成速率
self.current_tokens = min(self.capacity, self.current_tokens + new_tokens) # 更新令牌数(不超过容量)
self.last_token_time = now # 更新生成时间
# 2. 判断是否有足够的令牌处理请求
if self.current_tokens >= tokens_needed:
self.current_tokens -= tokens_needed
return True # 允许请求(消耗令牌)
else:
return False # 拒绝请求(令牌不足)