限流算法原理与实践
背景
由于公司降本增效的要求,负责网络带宽的团队一直紧盯各域名的带宽,某天晚上的App发公测引起带宽峰值,需要处理。手头的App公测系统支持定量下发,但没有限速,限制40000的量在头几分钟一下子下发,导致下载App插件的域名出现带宽峰值,需要对公测进行限速/限流处理。借此机会了解调研一下限流算法,并挑选合适算法运用到需求中。
限流算法原理
- 固定窗口
- 滑动窗口
- 漏桶
- 令牌桶
单机限流与分布式限流
单机限流实现
固定窗口
基于定时器的固定窗口限流器
public class SolidWindowLimiter {
private final int windowMaxNum;
private final AtomicInteger counter;
private Cleaner cleaner;
public SolidWindowLimiter(int windowSize, int windowMaxNum) {
this.windowMaxNum = windowMaxNum;
this.counter = new AtomicInteger();
this.cleaner = new Cleaner(windowSize, counter);
}
public boolean tryAcquire() {
return counter.incrementAndGet() <= windowMaxNum;
}
private static class Cleaner {
public Cleaner(int windowSize, AtomicInteger counter){
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
scheduledExecutorService.scheduleAtFixedRate(() -> counter.set(0), windowSize, windowSize, TimeUnit.MILLISECONDS);
}
}
}
滑动窗口
public class FloatWindowLimiter {
/**
* 限制
*/
private final int limit;
/**
* 滑动窗口
*/
private final SlideWindow slideWindow;
public FloatWindowLimiter(int windowDuration, int windowCount, int limit) {
this.limit = limit;
this.slideWindow = new SlideWindow(windowCount, windowDuration / windowCount);
}
public synchronized boolean tryAcquire() {
//滑动窗口与清理
slideWindow.slideWindow(System.currentTimeMillis());
int count = slideWindow.count();
if (count >= limit) {
return false;
}
System.out.println(slideWindow.increaseAndGet());
return true;
}
private static class SlideWindow {
private final int[] counters;
private int index;
private long startTs;
private final int windowNum;
private final int windowSec;
public SlideWindow(int windowNum, int windowSec) {
index = 0;
startTs = System.currentTimeMillis() / 1000;
counters = new int[windowNum];
this.windowNum = windowNum;
this.windowSec = windowSec;
}
public int slideWindow(long ts) {
long now = ts / 1000;
int step = Math.min((int) (now - startTs) / windowSec, windowNum);
//clear
for (int i = 0; i < step; i++) {
index = (index + 1) % windowNum;
counters[index] = 0;
}
startTs += (long) step * windowSec;
return index;
}
public int count() {
return Arrays.stream(counters).sum();
}
public int increaseAndGet() {
return ++counters[index];
}
}
}
令牌桶
信号量实现的简单令牌桶(含有定时器线程)
public class TokenBucket {
private final Semaphore semaphore;
public TokenBucket(int maxProcess, int recoverMilliSec) {
semaphore = new Semaphore(maxProcess, true);
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleAtFixedRate(semaphore::release, recoverMilliSec, recoverMilliSec, TimeUnit.MILLISECONDS);
}
public boolean tryAcquire(){
return semaphore.tryAcquire();
}
}
基于请求时间戳的改造
public class TokenBucketWithoutTimer {
private int tokens;
private final int maxProcess;
private final int recoverMilliSec;
private long lastTime;
public TokenBucketWithoutTimer(int maxProcess, int recoverMilliSec) {
this.tokens = maxProcess;
this.maxProcess = maxProcess;
this.recoverMilliSec = recoverMilliSec;
this.lastTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
//计算恢复的令牌数
long curTime = System.currentTimeMillis();
int step = (int) (curTime - lastTime) / recoverMilliSec;
tokens = Math.min(tokens + step, maxProcess);
//执行判断
if(tokens <= 0){
return false;
}
tokens--;
lastTime = curTime;
return true;
}
}