likes
comments
collection
share

京东hotKey -滑动窗口源码讲解

作者站长头像
站长
· 阅读数 14

滑动窗口源码

核心代码类-SlidingWindow

package com.jd.platform.hotkey.worker.tool;


import cn.hutool.core.date.SystemClock;
import org.checkerframework.checker.units.qual.C;

import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 滑动窗口。该窗口同样的key都是单线程计算。
 *
 * @author wuweifeng wrote on 2019-12-04.
 */
public class SlidingWindow {
    /**
     * 循环队列,就是装多个窗口用,该数量是windowSize的2倍
     */
    private AtomicLong[] timeSlices;
    /**
     * 队列的总长度
     */
    private int timeSliceSize;
    /**
     * 每个时间片的时长,以毫秒为单位
     */
    private int timeMillisPerSlice;
    /**
     * 共有多少个时间片(即窗口长度)
     */
    private int windowSize;
    /**
     * 在一个完整窗口期内允许通过的最大阈值
     */
    private int threshold;
    /**
     * 该滑窗的起始创建时间,也就是第一个数据
     */
    private long beginTimestamp;
    /**
     * 最后一个数据的时间戳
     */
    private long lastAddTimestamp;

    public static void main(String[] args) throws InterruptedException {
        //1秒一个时间片,窗口共5个
        SlidingWindow window = new SlidingWindow(2, 40);

        //循环屏障
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
        CountDownLatch latch=new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //步调统一
                        cyclicBarrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    boolean hot = window.addCount(2);
                    System.out.println(hot);
                    window.print();
                    latch.countDown();
                }
            }).start();
        }
        latch.await();
        for (int i = 0; i < 100; i++) {
                System.out.println(window.addCount(2));

            window.print();
            System.out.println("--------------------------");
            try {
                Thread.sleep(102);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    private void print() {
        Arrays.asList(timeSlices).stream().forEach(e->System.out.print(e+" "));
        System.out.println();

    }
    public SlidingWindow(int duration, int threshold) {
        //超过10分钟的按10分钟
        if (duration > 600) {
            duration = 600;
        }
        //要求5秒内探测出来的,
        if (duration <= 5) {
            this.windowSize = 5;
            this.timeMillisPerSlice = duration * 200;
        } else {
            this.windowSize = 10;
            this.timeMillisPerSlice = duration * 100;
        }
        this.threshold = threshold;
        // 保证存储在至少两个window
        this.timeSliceSize = windowSize * 2;

        reset();
    }

    public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) {
        this.timeMillisPerSlice = timeMillisPerSlice;
        this.windowSize = windowSize;
        this.threshold = threshold;
        // 保证存储在至少两个window
        this.timeSliceSize = windowSize * 2;

        reset();
    }

    /**
     * 初始化
     */
    private void reset() {
        beginTimestamp = SystemClock.now();
        //窗口个数
        AtomicLong[] localTimeSlices = new AtomicLong[timeSliceSize];
        for (int i = 0; i < timeSliceSize; i++) {
            localTimeSlices[i] = new AtomicLong(0);
        }
        timeSlices = localTimeSlices;
    }

    /**
     * 计算当前所在的时间片的位置
     */
    private int locationIndex() {
        long now = SystemClock.now();
        //如果当前的key已经超出一整个时间窗口了,那么就直接初始化就行了,不用去计算了
        if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
            reset();
        }

        int index = (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
        if (index < 0) {
            return 0;
        }
        return index;
    }

    /**
     * 增加count个数量
     */
    public synchronized boolean addCount(long count) {
        //当前自己所在的位置,是哪个小时间窗(时间片)
        int index = locationIndex();
//        System.out.println("index:" + index);
        //然后清空自己前面windowSize到2*windowSize之间的数据格的数据
        //譬如1秒分4个窗口,那么数组共计8个窗口
        //当前index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来就是该窗口内的总和

        //后面的一半需要清理的, 前面的一般,是要统计的, 一半是一个时间窗的大小
        clearFromIndex(index);

        int sum = 0;
        // 在当前时间片里继续+1
        sum += timeSlices[index].addAndGet(count);
        //加上前面几个时间片
        for (int i = 1; i < windowSize; i++) {
            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
        }

        lastAddTimestamp = SystemClock.now();

        return sum >= threshold;
    }

    private void clearFromIndex(int index) {
        for (int i = 1; i <= windowSize; i++) {
            int j = index + i;
            if (j >= windowSize * 2) {
                j -= windowSize * 2;
            }
            timeSlices[j].set(0);
        }
    }

}

核心字段

/**
 * 循环队列,就是装多个窗口用,该数量是windowSize的2倍
 */
private AtomicLong[] timeSlices;
/**
 * 队列的总长度
 */
private int timeSliceSize;
/**
 * 每个时间片的时长,以毫秒为单位
 */
private int timeMillisPerSlice;
/**
 * 共有多少个时间片(即窗口长度)
 */
private int windowSize;
/**
 * 在一个完整窗口期内允许通过的最大阈值
 */
private int threshold;
/**
 * 该滑窗的起始创建时间,也就是第一个数据
 */
private long beginTimestamp;
/**
 * 最后一个数据的时间戳
 */
private long lastAddTimestamp;

SlidingWindow:构造函数

  • 10 是说采集的一个数组下标 的时间是 10*100/1000=1s 也就是除以10 的秒数
  • 小于=5的话 就默认翻倍 除以10 也就是 5 代表着1s 而不是0.5s
  • 1秒一个时间片,窗口共5个 大于1s的时间片窗口是10个 SlidingWindow window = new SlidingWindow(10, 40 );
  • 窗口 我们设置为10个时间片
public SlidingWindow(int duration, int threshold) {
    //超过10分钟的按10分钟
    if (duration > 600) {
        duration = 600;
    }
    //要求5秒内探测出来的,
    if (duration <= 5) {
        this.windowSize = 5;
        this.timeMillisPerSlice = duration * 200;
    } else {
        this.windowSize = 10;
        this.timeMillisPerSlice = duration * 100;
    }
    this.threshold = threshold;
    // 保证存储在至少两个window
    this.timeSliceSize = windowSize * 2;

    reset();
}

我们构造滑动窗口的时候先设置一个时间片的时长,和阈值 一般会统计最近十个时间片的统计数量 和阈值 比较 看是不是热度key

每个可以占用的统计时长为 timeMillisPerSlice * windowSize

locationIndex:计算当前所在的时间片的位置

计算当前所在的时间片的位置源码

/**
 * 计算当前所在的时间片的位置
 */
private int locationIndex() {
    long now = SystemClock.now();
    //如果当前的key已经超出一整个时间窗口了,那么就直接初始化就行了,不用去计算了
    if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
        reset();
    }

    int index = (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
    if (index < 0) {
        return 0;
    }
    return index;
}

这个计算公式 意思就是 从开始 到现在 过了多少个时间片, 因为使用的是 长度为timeSliceSize 的环形buffer 形式,所以再跟队列长度取模下 得到的结果就是 当前所在的时间片的位置

int index = (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);

有两个时间戳 beginTimestamp 和 lastAddTimestamp

/**
 * 该滑窗的起始创建时间,也就是第一个数据 reset 的时候更新
 */
private long beginTimestamp;
/**
 * 最后一个数据的时间戳 addcount的时候更新
 */
private long lastAddTimestamp;

addCount:增加count个数量

/**
     * 增加count个数量
     */
    public synchronized boolean addCount(long count) {
        //当前自己所在的位置,是哪个小时间窗(时间片)
        int index = locationIndex();
//        System.out.println("index:" + index);
        //然后清空自己前面windowSize到2*windowSize之间的数据格的数据
        //譬如1秒分4个窗口,那么数组共计8个窗口
        //当前index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来就是该窗口内的总和

        //后面的一半需要清理的, 前面的一般,是要统计的, 一半是一个时间窗的大小
        clearFromIndex(index);

        int sum = 0;
        // 在当前时间片里继续+1
        sum += timeSlices[index].addAndGet(count);
        //加上前面几个时间片
        for (int i = 1; i < windowSize; i++) {
            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
        }

        lastAddTimestamp = SystemClock.now();

        return sum >= threshold;
    }

滑动窗口存在的高并发问题

hotkey 热度计算的高并发问题

作者在进行热度计算的时候,为了提升 计算效率 消费队列使用了多线程(cpu核数)进行消费

@Bean
public Consumer consumer() {
    int nowCount = CpuNum.workerCount();
    //将实际值赋给static变量
    if (threadCount != 0) {
        nowCount = threadCount;
    } else {
        if (nowCount >= 8) {
            nowCount = nowCount / 2;
        }
    }

    List<KeyConsumer> consumerList = new ArrayList<>();
    for (int i = 0; i < nowCount; i++) {
        KeyConsumer keyConsumer = new KeyConsumer();
        keyConsumer.setKeyListener(iKeyListener);
        consumerList.add(keyConsumer);
        threadPoolExecutor.submit(keyConsumer::beginConsume);
    }
    return new Consumer(consumerList);
}

public void beginConsume() {
    while (true) {
        try {
            HotKeyModel model = QUEUE.take();
            if (model.isRemove()) {
                iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);
            } else {
                iKeyListener.newKey(model, KeyEventOriginal.CLIENT);
            }

            //处理完毕,将数量加1
            totalDealCount.increment();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
@Override
public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
    //cache里的key
    String key = buildKey(hotKeyModel);
    //判断是不是刚热不久
    Object o = hotCache.getIfPresent(key);
    if (o != null) {
        return;
    }
    SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);
    //看看hot没
    boolean hot = slidingWindow.addCount(hotKeyModel.getCount());

    。。。。。。。

    }

}
/**
     * 增加count个数量
     */
    public synchronized boolean addCount(long count) {
        //当前自己所在的位置,是哪个小时间窗(时间片)
        int index = locationIndex();
        clearFromIndex(index);
        int sum = 0;
        // 在当前时间片里继续+1
        sum += timeSlices[index].addAndGet(count);
        //加上前面几个时间片
        for (int i = 1; i < windowSize; i++) {
            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
        }

        lastAddTimestamp = SystemClock.now();

        return sum >= threshold;
    }

每个线程获取到HotKeyModel以后 都在进行 iKeyListener.newKey(model, KeyEventOriginal.CLIENT); 热度计算。最终都会调用

boolean hot = slidingWindow.addCount(hotKeyModel.getCount());

进而调用

sum += timeSlices[index].addAndGet(count);

如果多个线程 对同一个key进行热度计算,也就会获取同一个 滑动窗口,但是

sum += timeSlices[index].addAndGet(count);

是并发不安全的

好在 热度计算这种结果数据结果正确性要求不严格,可以一定程度接受, 就像caffiene缓存 进行内存淘汰的时候,对缓存key 的调用次数计算使用W-TinyLFU 算法,也是不精确计算,一般这种热度计算或者点击次数计算,不要求 绝对精度的业务场景,要优先考虑的时候 内存占用 计算复杂度等

解决方案

可以参考 caffiene缓存 条状环形buff 替换掉目前的环形buff 循环队列,

/**
 * 循环队列,就是装多个窗口用,该数量是windowSize的2倍
 */
private AtomicLong[] timeSlices;

这样每个线程的计算节点都会被分散到不同的索引位置 ,同时这样也增加的设计和计算的复杂度。

转载自:https://juejin.cn/post/7247787521392590903
评论
请登录