【推荐】开源Java窗口函数框架JDFrame
相关文章
0、前言
在各种数据库mysql, 大数据领域的hive、spark中都有非常好用的开窗函数使用, 但是Java却没好用的JVM层级的窗口函数使用,于是乎写了这个,如果能熟练使用开窗函数相信能在业务代码中大大减少我们的统计计算逻辑代码。
1、窗口函数本质
所谓窗口函数的本质就是在矩阵中新增一列,然后这一列的值根据不同的窗口函数去生成不同的值。
新增的这一列我们通过指定窗口和函数去生成具体的值。 主要有三个部分,一个是计算函数、一个是分区,一个是窗口范围。
如果熟悉mysql窗口函数的同学可能就知道。 这对应于 mysql的语法 (partition by xx order by xxx rows between xx and xx)
. partition
用于指定窗口的分区, rows between
用于指定窗口范围, order
用于指定窗口排序。
那么在矩阵中新增一列是如何利用这三部分“配置” 去生成这一列, 首先是分区,分区其实就是分组
,会先将矩阵根据配置的某个字段进行分组, 然后分组后根据配置的排序字段对组内进行排序
, 然后再使用不同的窗口计算函数对组内的配置的窗口范围
内的元素进行统计计算 然后计算结果就是这一列的值。
该列的生成逻可以想象成是滑动窗口算法,窗口的边界就是窗口范围,然后使用窗口计算函数去计算窗口内的值,这一个值就作为这一行的值,然后窗口不断去滑动,每滑动一次就计算一次, 最终所有行的值组成我们的新的一列。
下面简单介绍每个窗口计算函数及其使用案例。
2、引入Maven依赖
<dependency>
<groupId>io.github.burukeyou</groupId>
<artifactId>jdframe</artifactId>
<version>0.0.4</version>
</dependency>
3、窗口函数的API使用
测试代码
static List<WebPvDto> dataList = new ArrayList<>();
static {
dataList.add(new WebPvDto("a",0,1));
dataList.add(new WebPvDto("a",1,5));
dataList.add(new WebPvDto("a",2,7));
dataList.add(new WebPvDto("a",3,3));
dataList.add(new WebPvDto("a",4,2));
dataList.add(new WebPvDto("a",5,4));
dataList.add(new WebPvDto("a",6,4));
dataList.add(new WebPvDto("b",7,1));
dataList.add(new WebPvDto("b",8,4));
dataList.add(new WebPvDto("b",7,6));
dataList.add(new WebPvDto("b",8,2));
}
@Data
public static class WebPvDto {
private String type;
private Integer score;
private Integer pvCount;
public Object value;
}
ROW_NUMBER 窗口函数
生成行号,从1开始。
窗口的指定主要是通过Window
对象去构建,可以配置窗口的分区、排序、窗口范围等信息。
// 等价于 select ROW_NUMBER() over(partition by type order pv_count desc)
SDFrame.read(dataList)
.window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount))
.overRowNumberS(WebPvDto::setValue)
.show(30);
输出结果:
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 4
a 3 3 5
a 4 2 6
a 0 1 7
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
RANK 窗口函数
生成排名号,相同值排名一样,排名不连续 。 如: 1 2 2 2 5 6 7
// 等价于 select rank() over(partition by type order pv_count desc)
SDFrame.read(dataList)
.window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount))
.overRankS(WebPvDto::setValue)
.show(30);
输出结果
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 3
a 3 3 5
a 4 2 6
a 0 1 7
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
DENSE_RANK 窗口函数
生成排名号,相同值排名一样,排名连续 如 1 2 2 2 3 4 5
// 等价于 select DENSE_RANK() over(partition by type order pv_count desc)
SDFrame.read(dataList)
.window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount))
.overDenseRankS(WebPvDto::setValue)
.show(30);
输出结果:
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 3
a 3 3 4
a 4 2 5
a 0 1 6
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
PERCENT_RANK 窗口函数
生成百分比排名号。 使用公式: (rank排名号-1) / (窗口行数-1)
// 等价于 select PERCENT_RANK() over(partition by type order pv_count desc)
SDFrame.read(dataList)
.defaultScale(6)
.window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount))
.overPercentRankS(WebPvDto::setValue)
.show(30);
输出结果
type score pvCount value
a 2 7 0
a 1 5 0.166667
a 5 4 0.333333
a 6 4 0.333333
a 3 3 0.666667
a 4 2 0.833333
a 0 1 1.000000
b 7 6 0
b 8 4 0.333333
b 8 2 0.666667
b 7 1 1.000000
Count窗口函数
计算窗口内行数
// 等价于SQL: select count(*) over(partition by type order by pv_count desc rows between UNBOUNDED PRECEDING and CURRENT ROW)
SDFrame.read(dataList)
.window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount).roundStartRow2CurrentRow())
.overCountS(WebPvDto::setValue)
.show(30);
输出结果:
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 4
a 3 3 5
a 4 2 6
a 0 1 7
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
Sum窗口函数
计算窗口内的和
// 等价于 select sum(pv_count) over(rows between 1 PRECEDING and 2 FOLLOWING)
JDFrame.read(dataList)
.window(Window.roundBetweenBy(Range.BEFORE(1),Range.AFTER(2)))
.overSumS(WebPvDto::setValue,WebPvDto::getPvCount)
.show(30);
输出结果:
type score pvCount value
a 0 1 13
a 1 5 16
a 2 7 17
a 3 3 16
a 4 2 13
a 5 4 11
a 6 4 13
b 7 1 15
b 8 4 13
b 7 6 12
b 8 2 8
Avg窗口函数
计算窗口内的平均值
// 等价于 select avg(pv_count) over(partition by type )
SDFrame.read(dataList)
.defaultScale(4)
.window(Window.groupBy(WebPvDto::getType))
.overAvgS(WebPvDto::setValue,WebPvDto::getPvCount)
.show(30);
输出结果
type score pvCount value
a 0 1 3.7143
a 1 5 3.7143
a 2 7 3.7143
a 3 3 3.7143
a 4 2 3.7143
a 5 4 3.7143
a 6 4 3.7143
b 7 1 3.2500
b 8 4 3.2500
b 7 6 3.2500
b 8 2 3.2500
Max窗口函数
计算窗口内的最大值
// 等价于 select max(pv_count) over(partition by type order pv_count asc)
SDFrame.read(dataList)
.window(Window.groupBy(WebPvDto::getType).sortAsc(WebPvDto::getPvCount))
.overMaxValueS(WebPvDto::setValue,WebPvDto::getPvCount)
.show(30);
输出结果:
type score pvCount value
a 0 1 7
a 4 2 7
a 3 3 7
a 5 4 7
a 6 4 7
a 1 5 7
a 2 7 7
b 7 1 6
b 8 2 6
b 8 4 6
b 7 6 6
Min窗口函数
计算窗口内的最小值
// 等价于 select min(pv_count) over(rows between CURRENT ROW and 2 FOLLOWING)
SDFrame.read(dataList)
.window(Window.roundCurrentRow2AfterBy(2))
.overMinValueS(WebPvDto::setValue,WebPvDto::getPvCount)
.show(30);
type score pvCount value
a 0 1 1
a 1 5 3
a 2 7 2
a 3 3 2
a 4 2 2
a 5 4 1
a 6 4 1
b 7 1 1
b 8 4 2
b 7 6 2
b 8 2 2
Lag窗口函数
获取当前行的前N行数据
// 等价于 select lag(pv_count,2) over(partition by type order pv_count desc)
SDFrame.read(dataList)
.window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount))
.overLagS(WebPvDto::setValue,WebPvDto::getPvCount,2)
.show(30);
输出结果:
type score pvCount value
a 2 7
a 1 5
a 5 4 7
a 6 4 5
a 3 3 4
a 4 2 4
a 0 1 3
b 7 6
b 8 4
b 8 2 6
b 7 1 4
Lead窗口函数
获取当前行的后N行数据
// 等价于 select lead(pv_count,3) over()
SDFrame.read(dataList)
.window()
.overLeadS(WebPvDto::setValue,WebPvDto::getPvCount,3)
.show(30);
输出结果:
type score pvCount value
a 0 1 3
a 1 5 2
a 2 7 4
a 3 3 4
a 4 2 1
a 5 4 4
a 6 4 6
b 7 1 2
b 8 4
b 7 6
b 8 2
NthValue 窗口函数
获取窗口范围内的第N行数据
// 等价于 select NTH_VALUE(pv_count,2) over(rows between 1 PRECEDING and CURRENT ROW)
SDFrame.read(dataList)
.window(Window.roundBefore2CurrentRowBy(3))
.overNthValueS(WebPvDto::setValue,WebPvDto::getPvCount,2)
.show(30);
输出结果:
type score pvCount value
a 0 1
a 1 5 5
a 2 7 5
a 3 3 5
a 4 2 7
a 5 4 3
a 6 4 2
b 7 1 4
b 8 4 4
b 7 6 1
b 8 2 4
FirstValue 窗口函数
获取窗口范围内的第1行数据
// 等价于 select FIRST_VALUE(pv_count) over(rows between 2 PRECEDING and CURRENT ROW)
SDFrame.read(dataList)
.window(Window.roundBetweenBy(Range.BEFORE(2), Range.CURRENT_ROW))
.overFirstValueS(WebPvDto::setValue,WebPvDto::getPvCount)
.show(30);
type score pvCount value
a 0 1 1
a 1 5 1
a 2 7 1
a 3 3 5
a 4 2 7
a 5 4 3
a 6 4 2
b 7 1 4
b 8 4 4
b 7 6 1
b 8 2 4
LastValue 窗口函数
获取窗口范围内的最后一行数据
// 等价于 select LAST_VALUE(pv_count) over(rows between 2 PRECEDING and 2 FOLLOWING)
SDFrame.read(dataList)
.window(Window.roundBeforeAfterBy(2,2))
.overLastValueS(WebPvDto::setValue,WebPvDto::getPvCount)
.show(30);
输出结果
type score pvCount value
a 0 1 7
a 1 5 3
a 2 7 2
a 3 3 4
a 4 2 4
a 5 4 1
a 6 4 4
b 7 1 6
b 8 4 2
b 7 6
b 8 2
Ntile 窗口函数
给窗口尽量均匀的分成N个桶, 每个桶的编号从1开始, 如果分布不均匀,则优先分配给最小的桶,桶之间的大小差值最多不超过1
// 等价于 select Ntile(3) over(partition by type order pv_count desc)
SDFrame.read(dataList)
.window(Window.groupBy(WebPvDto::getType))
.overNtileS(WebPvDto::setValue,3)
.show(30);
输出结果:
type score pvCount value
a 0 1 1
a 1 5 1
a 2 7 1
a 3 3 1
a 4 2 2
a 5 4 2
a 6 4 2
b 7 1 2
b 8 4 3
b 7 6 3
b 8 2 3
Cume_Dist 窗口函数
累积分布值, 统计的是 (小于等于当前排名号的行数 / 窗口行数) 的比率
// select cume_dist() over(partition by type order pv_count desc)
SDFrame.read(dataList)
.window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount))
.overCumeDistS(WebPvDto::setValue)
.show(30);
输出结果
type score pvCount value
a 2 7 0.14
a 1 5 0.29
a 5 4 0.57
a 6 4 0.57
a 3 3 0.71
a 4 2 0.86
a 0 1 1.00
b 7 6 0.25
b 8 4 0.50
b 8 2 0.75
b 7 1 1.00
4 窗口
主要是通过Window对象去构建开窗的信息,包括窗口的分区情况,窗口的排序情况,还有窗口范围。 窗口范围可以通过 Range对象去枚举指定。
如果不指定窗口信息默认窗口范围就是全部行。 众所周知而在 mysql中如果使用了order默认窗口范围就是 rows between UNBOUNDED PRECEDING and CURRENT ROW
, 如果没有使用order也没指定rows between
, 默认窗口范围才是全部。 这点要注意区分
5 最后
1、窗口函数的计算结果的存储有两种方式,一种是直接返回到FI2里, 一种是可以通过指定SetFunction 进行存储, 所有后缀带S的方法就是通过后者的方式的存储, 之所以带S后缀是为了以便于区分,并且是放到第一个方法参数里。
2、除了可以通过单独的window()的方法去指定窗口信息,在每个over方法也可以了单独设置。 没单独设置就使用window()方法里指定的窗口信息
3、在不同窗口范围内的数据计算目前用的是各种滑动窗口算法,时间复杂度基本在O(N)左右
转载自:https://juejin.cn/post/7367306429054959631