限流算法

限流算法

固定窗口

  • 把时间切成固定长度的区间(如 1 s)。
  • 每个区间维护一个整数计数器。
  • 请求进来 ⇒ 计数器 +1;超过阈值立即拒绝。
  • 区间结束瞬间计数器归零。
class FixedWindow {
    private final long windowMillis;
    private final int limit;
    private final AtomicLong counter = new AtomicLong(0);
    private volatile long windowStart;

    boolean tryAcquire() {
        long now = System.currentTimeMillis();
        if (now - windowStart >= windowMillis) {   // 进入新窗口
            windowStart = now;
            counter.set(0);
        }
        return counter.incrementAndGet() <= limit;
    }
}

缺陷: 边界翻倍:第 999 ms 来 100 个请求,第 1000 ms 立即清零又来 100 个,实际 2 ms 内通过 200 个,两倍流量冲击后端

优化:

  • LongAdder 拆成 8 个 Cell,避免 CAS 自旋

  • 伪共享填充:计数器前后加 64 byte 填充,解决 CPU 缓存行颠簸

  • 窗口热切换:把“时间比较”换成“位运算”

    • 上面通过比较时间now - windowStart >= windowMillis实现,可以优化成每秒一个counter,多个counter轮流使用实现热切换

    • 每秒使用一个counter实现方式:

      • long idx = (now >>> 10) & 1;   // 0/1 双缓冲,1 s 翻转一次
        LongAdder[] counters = new LongAdder[2];
        counters[idx].xxxxx  // 每秒自动使用其中一个counter
      • 位运算比除法运算快、多个counter可以读写分离

滑动窗口

  • 把大窗口再细分为 N 个子窗口(桶),例如 1 s 切成 10 格,每格 100 ms。
  • 用循环队列/链表记录每个子窗口的请求次数。
  • 每来一次请求: a. 删掉“过期”的子窗口; b. 累加剩余子窗口的总数; c. 总数 ≤ 阈值则通过,并把当前子窗口 +1。
  • 缺点:需要记录每条时间戳,内存随请求量线性增长

优化

  • 分桶计数,使用AtomicLongArray
    • 1 s 切成 20 格,每格 50 ms;用 long[20] 数组存计数,无对象头开销;数组大小 = 20 × 8 B = 160 B,常驻 L1
  • 环形索引 & 原子累加
  • 使用对时间戳取余操作,循环使用分桶
  • AtomicLongArray的cas方法进行无锁累加

漏桶

漏桶算法模拟 “注水入桶再匀速漏水” 的过程,核心是严格控制请求的输出速率,平滑流量波动:

  • 想象一个固定容量的桶,新请求(“水”)不断进入桶中;
  • 桶以固定速率(如每秒处理 N 个请求)向外 “漏水”(处理请求);
  • 若请求进入速度超过漏水速度,桶会被填满,后续请求会因 “溢出” 被拒绝或缓存。
class LeakyBucket:
    def __init__(self, capacity, leak_rate):
        self.capacity = capacity  # 桶的最大容量(最多缓存的请求数)
        self.leak_rate = leak_rate  # 漏水速率(每秒处理的请求数)
        self.current_water = 0  # 当前桶中的"水量"(缓存的请求数)
        self.last_leak_time = current_time()  # 上次漏水的时间

    def allow_request(self, request_size=1):
        # 1. 先计算从上次到现在漏出的水量(处理的请求数)
        now = current_time()
        elapsed = now - self.last_leak_time
        leaked = elapsed * self.leak_rate  # 漏出的水量 = 时间差 * 漏水速率
        self.current_water = max(0, self.current_water - leaked)  # 更新当前水量
        self.last_leak_time = now  # 更新漏水时间

        # 2. 判断新请求是否能加入桶中
        if self.current_water + request_size <= self.capacity:
            self.current_water += request_size
            return True  # 允许请求(加入桶中等待处理)
        else:
            return False  # 拒绝请求(桶已满)

令牌桶

令牌桶算法通过 “生成令牌 - 消耗令牌” 控制流量,核心是允许一定的突发流量,同时限制长期平均速率:

  • 系统按固定速率(如每秒生成 N 个)向桶中放入 “令牌”;
  • 桶有最大容量,令牌满后不再生成(避免无限制积累);
  • 每个请求需要消耗 1 个令牌才能被处理,若桶中无令牌,请求被拒绝或等待。
class TokenBucket:
    def __init__(self, capacity, token_rate):
        self.capacity = capacity  # 桶的最大令牌数
        self.token_rate = token_rate  # 令牌生成速率(每秒生成的令牌数)
        self.current_tokens = 0  # 当前桶中的令牌数
        self.last_token_time = current_time()  # 上次生成令牌的时间

    def allow_request(self, tokens_needed=1):
        # 1. 先计算从上次到现在应生成的令牌数
        now = current_time()
        elapsed = now - self.last_token_time
        new_tokens = elapsed * self.token_rate  # 新生成的令牌 = 时间差 * 生成速率
        self.current_tokens = min(self.capacity, self.current_tokens + new_tokens)  # 更新令牌数(不超过容量)
        self.last_token_time = now  # 更新生成时间

        # 2. 判断是否有足够的令牌处理请求
        if self.current_tokens >= tokens_needed:
            self.current_tokens -= tokens_needed
            return True  # 允许请求(消耗令牌)
        else:
            return False  # 拒绝请求(令牌不足)