likes
comments
collection
share

常用限流算法的Java实现

作者站长头像
站长
· 阅读数 16

主要内容为滑动日志,令牌桶,漏桶三种限流算法的Java实现

获取连接许可的接口

public interface Limiter {
    //获取许可
    boolean tryAcquire();
}
1.滑动日志

用一个有序集合来存储所有请求的时间戳,以空间换时间的方式来简化计算

public class CountLimiter implements Limiter{
    //维护一个优先队列记录请求的时间戳
    //将PriorityQueue替换为Redis的ZSet的话可以实现分布式的限流
    private final PriorityQueue<Long> queue = new PriorityQueue<>();
    //限流大小
    private final int limitCount;
    //限流时间范围
    private final Long limitTime;

    public CountLimiter(int limitCount, Long limitTime) {
        this.limitCount = limitCount;
        this.limitTime = limitTime;
    }

    @Override
    public synchronized boolean tryAcquire() {
        long nowTime = System.currentTimeMillis();
        if(queue.size() < limitCount){
            queue.add(nowTime);
            return true;
        }else{
            long preTime = nowTime - limitTime;
            //淘汰已经超过时间限制的请求
            while(!queue.isEmpty() && preTime > queue.peek()){
                queue.poll();
            }
            if(queue.size() < limitCount){
                queue.add(nowTime);
                return true;
            }else{
                return false;
            }
        }
    }
}
2.令牌桶

利用延迟计算来维护令牌数量

public class TokenBucketLimiter implements Limiter{
    //最大容量
    private final long capacity;
    //令牌的生成速率 每rate毫秒生成一个令牌
    private final long rate;
    //下一个令牌的发放时间
    long nextTokenTime = System.currentTimeMillis();
    //当前持有的令牌总数
    private long currentTokens;

    public TokenBucketLimiter(long rate, int capacity,int currentTokens) {
        this.rate = rate;
        this.capacity = capacity;
        //根据需求设置令牌桶中令牌的初始数量
        this.currentTokens = currentTokens;
    }

    @Override
    public boolean tryAcquire() {
        long nowTime = System.currentTimeMillis();
        if(nowTime > nextTokenTime){
            //计算新产生的令牌数
            long newTokens = ((nowTime - nextTokenTime) / rate) + 1;
            //更新当前持有的令牌数量
            currentTokens = Math.min(currentTokens + newTokens,capacity);
            //更新可以获取令牌的时间
            nextTokenTime = nowTime + rate;
        }
        if(currentTokens > 0){
            currentTokens--;
            return true;
        }
        return false;
    }
}
3.漏桶

漏桶算法原理类似于线程池,请求到来先放入等待队列,然后消费端从请求队列中拉取请求,当请求队列超过最大容量后,执行拒绝策略

public class LeakyBucketLimiter<T> implements Limiter{

    private final long capacity;
    //漏桶流出的速率 每rate毫秒放出一个请求
    private final long rate;
    //当前水量
    private AtomicInteger currentWater = new AtomicInteger(0);
    //存放请求的请求队列
    private final BlockingDeque<T> queue;
    //定时任务线程池
    private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);

    public LeakyBucketLimiter(long rate, int capacity,BlockingDeque<T> queue) {
        this.rate = rate;
        this.capacity = capacity;
        this.queue = new LinkedBlockingDeque<>();

        executorService.schedule(new Runnable() {
            @Override
            public void run() {
                //队列中没有请求的时候阻塞等待,防止空转消费资源
                try {
                    //如果是比较耗时间的任务可以定义额外的线程池来处理,将分发任务和执行任务分隔开
                    System.out.println("处理请求:" + queue.take());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                currentWater.addAndGet(-1);
                executorService.schedule(this,rate, TimeUnit.MILLISECONDS);
            }
        },rate,TimeUnit.MILLISECONDS);
    }

    @Override
    public synchronized boolean tryAcquire() {
        //成功增加水池水量,失败则回滚
        if(currentWater.addAndGet(1) <= capacity){
            return true;
        }else{
            currentWater.addAndGet(-1);
            return false;
        }
    }
}