限流算法原理与实践

限流算法原理与实践

Posted by Link on October 9, 2022

限流算法原理与实践

背景

由于公司降本增效的要求,负责网络带宽的团队一直紧盯各域名的带宽,某天晚上的App发公测引起带宽峰值,需要处理。手头的App公测系统支持定量下发,但没有限速,限制40000的量在头几分钟一下子下发,导致下载App插件的域名出现带宽峰值,需要对公测进行限速/限流处理。借此机会了解调研一下限流算法,并挑选合适算法运用到需求中。

带宽峰值

限流算法原理

  1. 固定窗口
  2. 滑动窗口
  3. 漏桶
  4. 令牌桶

单机限流与分布式限流

单机限流实现

固定窗口

基于定时器的固定窗口限流器

public class SolidWindowLimiter {

    private final int windowMaxNum;

    private final AtomicInteger counter;

    private Cleaner cleaner;

    public SolidWindowLimiter(int windowSize, int windowMaxNum) {
        this.windowMaxNum = windowMaxNum;
        this.counter = new AtomicInteger();
        this.cleaner = new Cleaner(windowSize, counter);
    }

    public boolean tryAcquire() {
        return counter.incrementAndGet() <= windowMaxNum;
    }

    private static class Cleaner {
        public Cleaner(int windowSize, AtomicInteger counter){
            ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
            scheduledExecutorService.scheduleAtFixedRate(() -> counter.set(0), windowSize, windowSize, TimeUnit.MILLISECONDS);
        }
    }
}

滑动窗口

public class FloatWindowLimiter {

    /**
     * 限制
     */
    private final int limit;

    /**
     * 滑动窗口
     */
    private final SlideWindow slideWindow;

    public FloatWindowLimiter(int windowDuration, int windowCount, int limit) {
        this.limit = limit;
        this.slideWindow = new SlideWindow(windowCount, windowDuration / windowCount);
    }

    public synchronized boolean tryAcquire() {
        //滑动窗口与清理
        slideWindow.slideWindow(System.currentTimeMillis());
        int count = slideWindow.count();
        if (count >= limit) {
            return false;
        }
        System.out.println(slideWindow.increaseAndGet());
        return true;
    }


    private static class SlideWindow {
        private final int[] counters;

        private int index;

        private long startTs;

        private final int windowNum;

        private final int windowSec;

        public SlideWindow(int windowNum, int windowSec) {
            index = 0;
            startTs = System.currentTimeMillis() / 1000;
            counters = new int[windowNum];
            this.windowNum = windowNum;
            this.windowSec = windowSec;
        }

        public int slideWindow(long ts) {
            long now = ts / 1000;
            int step = Math.min((int) (now - startTs) / windowSec, windowNum);
            //clear
            for (int i = 0; i < step; i++) {
                index = (index + 1) % windowNum;
                counters[index] = 0;
            }
            startTs += (long) step * windowSec;
            return index;
        }

        public int count() {
            return Arrays.stream(counters).sum();
        }

        public int increaseAndGet() {
            return ++counters[index];
        }
    }
}

令牌桶

信号量实现的简单令牌桶(含有定时器线程)

public class TokenBucket {
    private final Semaphore semaphore;

    public TokenBucket(int maxProcess, int recoverMilliSec) {
        semaphore = new Semaphore(maxProcess, true);

        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
        executorService.scheduleAtFixedRate(semaphore::release, recoverMilliSec, recoverMilliSec, TimeUnit.MILLISECONDS);
    }

    public boolean tryAcquire(){
        return semaphore.tryAcquire();
    }
}

基于请求时间戳的改造

public class TokenBucketWithoutTimer {
    private int tokens;

    private final int maxProcess;

    private final int recoverMilliSec;

    private long lastTime;

    public TokenBucketWithoutTimer(int maxProcess, int recoverMilliSec) {
        this.tokens = maxProcess;
        this.maxProcess = maxProcess;
        this.recoverMilliSec = recoverMilliSec;
        this.lastTime = System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        //计算恢复的令牌数
        long curTime = System.currentTimeMillis();
        int step = (int) (curTime - lastTime) / recoverMilliSec;
        tokens = Math.min(tokens + step, maxProcess);

        //执行判断
        if(tokens <= 0){
            return false;
        }
        tokens--;
        lastTime = curTime;
        return true;
    }
}

分布式限流实现

最终方案