常用限流算法的Java实现
主要内容为滑动日志,令牌桶,漏桶三种限流算法的Java实现
获取连接许可的接口
public interface Limiter {
//获取许可
boolean tryAcquire();
}
1.滑动日志
用一个有序集合来存储所有请求的时间戳,以空间换时间的方式来简化计算
public class CountLimiter implements Limiter{
//维护一个优先队列记录请求的时间戳
//将PriorityQueue替换为Redis的ZSet的话可以实现分布式的限流
private final PriorityQueue<Long> queue = new PriorityQueue<>();
//限流大小
private final int limitCount;
//限流时间范围
private final Long limitTime;
public CountLimiter(int limitCount, Long limitTime) {
this.limitCount = limitCount;
this.limitTime = limitTime;
}
@Override
public synchronized boolean tryAcquire() {
long nowTime = System.currentTimeMillis();
if(queue.size() < limitCount){
queue.add(nowTime);
return true;
}else{
long preTime = nowTime - limitTime;
//淘汰已经超过时间限制的请求
while(!queue.isEmpty() && preTime > queue.peek()){
queue.poll();
}
if(queue.size() < limitCount){
queue.add(nowTime);
return true;
}else{
return false;
}
}
}
}
2.令牌桶
利用延迟计算来维护令牌数量
public class TokenBucketLimiter implements Limiter{
//最大容量
private final long capacity;
//令牌的生成速率 每rate毫秒生成一个令牌
private final long rate;
//下一个令牌的发放时间
long nextTokenTime = System.currentTimeMillis();
//当前持有的令牌总数
private long currentTokens;
public TokenBucketLimiter(long rate, int capacity,int currentTokens) {
this.rate = rate;
this.capacity = capacity;
//根据需求设置令牌桶中令牌的初始数量
this.currentTokens = currentTokens;
}
@Override
public boolean tryAcquire() {
long nowTime = System.currentTimeMillis();
if(nowTime > nextTokenTime){
//计算新产生的令牌数
long newTokens = ((nowTime - nextTokenTime) / rate) + 1;
//更新当前持有的令牌数量
currentTokens = Math.min(currentTokens + newTokens,capacity);
//更新可以获取令牌的时间
nextTokenTime = nowTime + rate;
}
if(currentTokens > 0){
currentTokens--;
return true;
}
return false;
}
}
3.漏桶
漏桶算法原理类似于线程池,请求到来先放入等待队列,然后消费端从请求队列中拉取请求,当请求队列超过最大容量后,执行拒绝策略
public class LeakyBucketLimiter<T> implements Limiter{
private final long capacity;
//漏桶流出的速率 每rate毫秒放出一个请求
private final long rate;
//当前水量
private AtomicInteger currentWater = new AtomicInteger(0);
//存放请求的请求队列
private final BlockingDeque<T> queue;
//定时任务线程池
private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
public LeakyBucketLimiter(long rate, int capacity,BlockingDeque<T> queue) {
this.rate = rate;
this.capacity = capacity;
this.queue = new LinkedBlockingDeque<>();
executorService.schedule(new Runnable() {
@Override
public void run() {
//队列中没有请求的时候阻塞等待,防止空转消费资源
try {
//如果是比较耗时间的任务可以定义额外的线程池来处理,将分发任务和执行任务分隔开
System.out.println("处理请求:" + queue.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
currentWater.addAndGet(-1);
executorService.schedule(this,rate, TimeUnit.MILLISECONDS);
}
},rate,TimeUnit.MILLISECONDS);
}
@Override
public synchronized boolean tryAcquire() {
//成功增加水池水量,失败则回滚
if(currentWater.addAndGet(1) <= capacity){
return true;
}else{
currentWater.addAndGet(-1);
return false;
}
}
}
转载自:https://juejin.cn/post/7169631447244865544