青春"Flink"少年不会梦到"性能优化"学姐
前言
许久未更新了,最近还是挺忙的,搞一些杂活,支援各种项目,还被安排去做了下AI相关的东西,当然很基础呀,就是提示词工程师。本期是五月初开始写的,拖拖拉拉写了快一个月,主要目的是呈现一下我钻研的结果,当然也是相对比较简单的东西,记录学习过程发现新的提升点,这是我自认为自己一个比较好的习惯。
众所周知,我是写偏硬核技术博客的,宁缺毋滥,Flink项目的第五次迭代为什么成为了本期的素材呢,之前明明只有两次迭代有机会成为素材?哈哈,当然是我觉得做了些还算有意思的东西,一是维表动态更新这个东西,网上相关的案例和我需求契合的比较少,所以我自己琢磨了一套写法出来,整个解决的过程很有意思,所以我觉得有必要分享出来。二是这次的性能优化是一个偏向于业务的优化,还是蛮少见的,当然我也对常规的性能优化做了总结。那么,读者大人,该开启命运的大门了!
项目背景
业务背景
项目原计划是初期做一个供应链专用的数仓,后期进行拓展,但是由于开发资源紧张和数据组团队的完善,扩展被卡住了,目前只服务于供应链相关的项目,其他由数据组团队提供。大数据平台的底座是leader搭建的,数仓启动时有我和另一个同事一起去设计和开发,后来同事跑路了,然后赶上裁员潮,也没招人,就剩我一人负责全流程运作。我是兼职大数据开发,本职还是Java开发,要负责团队项目的开发,所以只能抽其他项目之间的时间缝缝补补。之前这个小数仓强行推广过一阵,在这个基础上搭建了一个基础数据服务,给外部提供宽表数据接口,但是由于接入表的范围,只能给供应链相关的项目提供数据。TiDB倒是推广开了,数据量大的新项目都上了TIDB,特别是看板类和计算型的。
架构简析
目前整个Hadoop平台资源总量是128核400G,大概用了一半左右的资源,利用率一般,部分大数据套件例如HBase也没用上。TiDB从ERP和PLM等外部系统接收了200+表,再加上部分业务系统,存储量来到了2TB左右,部署是经典的3PD3TIDB3TIKV2TIFLASH,负载均衡用的官方推荐的HAProxy。
Flink主要分为两部分,对接OGG的Flink和对接MySql的Flink-CDC,都是部署在Yarn上,测试是用的Flink集群。资源还是充足的,架构也是标准的,就是人手不足,什么都操作不起来,只能由我做个缝补匠。
发现问题
性能极端情况下变慢
最核心的问题就是处理过慢导致背压严重,从而TM断连导致资源重启,多来几次任务就挂了。所以隔三岔五我就需要手动重启项目,并且通过DataX重跑数据,很烦,脚本写了一麻袋。因为这半年表从一开始的100+膨胀到了200+,原先分了五个任务慢慢得不够用了,之前的处理方式是利用时间窗口并且将SQL转换成批量操作,因为需要保证SQL顺序,所以会分段截取,逻辑如下:
之前第四版就是这个批量的优化解决了性能问题,我第一时间也没想到会是这里除了问题,但是问题确实是最后这步算子导致的背压。于是我在本地接入正式的数据进行了测试,发现慢的时候会执行几分钟,而我是10秒的时间窗口......所以为什么会慢呢?根据接入数据分析,原先的逻辑是为了保证顺序,如果前后语句类型(增删改)不一致,就会直接运行当前List转化的批量SQL。但是某些表的变化实在是太频繁,如果是IUDIUD这样过来,极端情况下就和一条条执行没有区别。为了保证高效,每个表有独立的线程执行,但同时为了顺序,必须等待这次时间窗口内的所有表都执行完毕才行,因此还是会出现性能问题。
表动态变化
之前的版本是初始化获取数据源的数据,也就是所有表信息,但是问题就是,如果我新增了某个表,或者我想要调整某个表到另一个算子,那就只能重启任务。重启带来的中断肯定是不能接受的,所以需要动态变化。
解决问题
维表
在大数据领域,维表通常指的是包含有关业务维度信息的数据表。维表与事实表相对应。事实表通常包含了业务事件的事实信息,比如销售额、订单数量、网站访问量等。而维表则包含了用于描述事实表中记录的上下文、属性或维度信息。例如,如果事实表包含了销售订单的数据,那么维表可能包含了与订单相关的维度信息,比如订单日期、订单状态、客户信息、产品信息等。简单来说就是,我们在流处理事实表数据时需要用维表数据来筛选或者填充。 这里的维表数据是Flink需要处理的命名空间、表名和对应的主键,要从Kafka的数据流中筛选出真正需要处理的数据。
这里Process算子的功能就是通过维表数据筛选出当前Flink任务需要处理的数据,这里过滤了一半多的数据,很有效果。维表的数据当然不可能是一成不变的,最简单的方法就是修改一次规则就重启一次任务,但是势必会造成中断或者数据丢失,这是肯定不能的。因此该表数据需要更新,如何更新呢?有两种比较常见且简单的方式,分别是
- 实时获取维表数据。细分还可以分为同步数据库、异步数据库后更新和同步缓存三种模式。这种只适合数据量相对较小的情况,或者业务支持能开窗转批量执行,因为过高的QPS会直接爆破数据库,
- 预加载维表数据。初始化一份数据,同时定时更新这个数据,适用于数据变更频率低的情况,对数据库友好。
还有比较复杂的模式,暂时没接触到,这里我根据自己的场景选择了第二种。Flink作为流处理引擎,对于对象的权责范围管控很严格,这是最让人头大的,好多想法并不能拍脑袋按照常规直接写,也缺乏优秀的案例。需要对Flink的API有足够细致的理解,才能按照想法去实施,这里我用到了广播流和状态变量两种技术。 在Apache Flink中,广播流(Broadcast Stream)是一种特殊类型的流,它允许将一个数据流的内容广播到所有并行任务中。通常情况下,Flink中的流处理操作是并行执行的,每个操作符会在不同的并行任务中处理数据。但是,有时候我们需要在所有任务中使用相同的配置、参考数据或者模型参数,这时候广播流就非常有用了。
定义维表数据源
import com.xxx.basedata.pojo.ErpTable;
import com.xxx.basedata.pojo.InitData;
import com.xxx.basedata.util.Jasypt;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
public class TableSource implements SourceFunction<InitData> {
private final ParameterTool parameterTool;
private final int flag;
public TableSource(ParameterTool parameterTool, int flag) {
this.parameterTool = parameterTool;
this.flag = flag;
}
@Override
public void run(SourceContext<InitData> ctx) throws Exception {
while (true) {
Map<String, ErpTable> data = new HashMap<>();
List<String> tableList = new ArrayList<>();
String topicSql = "select TABLE_PREIX,TABLE_NAME,TOPIC,ROW_KEY,KEY_GROUP from ptm_base_table where ROW_FLAG = " + flag + ";";
try (Connection connection = DriverManager.getConnection(parameterTool.getRequired("jdbcUrl"),
parameterTool.getRequired("username"), Jasypt.decyptPwd(parameterTool.getRequired("password")));
PreparedStatement statement = connection.prepareStatement(topicSql);
ResultSet rs = statement.executeQuery()) {
while (rs.next()) {
ErpTable erpTable = new ErpTable();
erpTable.setTablePrefix(rs.getString("TABLE_PREIX"));
erpTable.setTableName(rs.getString("TABLE_NAME"));
erpTable.setTopic(rs.getString("TOPIC"));
erpTable.setRowKey(rs.getString("ROW_KEY"));
erpTable.setKeyGroup(rs.getInt("KEY_GROUP"));
data.put(erpTable.getTableName(), erpTable);
if (!tableList.contains(erpTable.getTableName())) {
tableList.add(erpTable.getTablePrefix() + "." + erpTable.getTableName());
}
}
} catch (SQLException e) {
log.error("初始化数据获取基本信息出现异常", e);
}
ctx.collect(new InitData(data, tableList));
Thread.sleep(5000);
}
}
@Override
public void cancel() {
}
}
广播流肯定还是需要一个正常的数据流的,实现SourceFunction接口,根据环境变量获取ParameterTool中对应的数据库参数。因为实现了AutoCloseable,所以可以使用try resource语法糖,交给JDK控制他的生命周期。定时的话,我想着没必要开定时线程池浪费资源,直接sleep5秒,while(true)永远循环即可。重点是ctx.collect(new InitData(data, tableList));不停地往流中输入数据。
定义广播流
MapStateDescriptor<Void, InitData> globalDataDescriptor =
new MapStateDescriptor<>("globalData", Types.VOID, Types.POJO(InitData.class));
BroadcastStream<InitData> globalDataStream = env.addSource(new TableSource(parameterTool, flag))
.broadcast(globalDataDescriptor);
第一行代码创建了一个 MapStateDescriptor 对象,用于描述一个键值对形式的状态,这个状态被命名为 "globalDataDescriptor"。在这里,键的类型是 Void,这表示这个状态是一个无键的状态,而值的类型是 InitData 类型的 POJO。定义为无键也是因为后面我使用的是无键广播处理流BroadcastProcessFunction,没必要写这个Key。
第二行代码首先通过 env.addSource() 方法创建了一个数据流,其中使用了一个名为 TableSource 的源,这是前面定义的维表数据源。然后,通过 .broadcast(globalDataDescriptor) 方法将这个数据流转换成了一个广播流,其中参数 globalDataDescriptor 指定了要广播的状态描述符。
定义广播流处理逻辑
主流中加入广播流
DataStream<DataNewDTO> filterStream = dataStream.connect(globalDataStream)
.process(new GlobalVariableUpdater(globalDataDescriptor, tableMap))
.returns(DataNewDTO.class).name(realJobName + "-data-Process");
广播流的处理逻辑
import com.alibaba.fastjson2.JSON;
import com.xxx.basedata.pojo.DataDTO;
import com.xxx.basedata.pojo.DataNewDTO;
import com.xxx.basedata.pojo.ErpTable;
import com.xxx.basedata.pojo.InitData;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Map;
@Slf4j
public class GlobalVariableUpdater extends BroadcastProcessFunction<String, InitData, DataNewDTO> {
private final MapStateDescriptor<Void, InitData> globalDataDescriptor;
private final Map<String, ErpTable> tableMap;
public GlobalVariableUpdater(MapStateDescriptor<Void, InitData> globalDataDescriptor, Map<String, ErpTable> tableMap) {
this.globalDataDescriptor = globalDataDescriptor;
this.tableMap = tableMap;
}
@Override
public void processElement(String value, BroadcastProcessFunction<String, InitData, DataNewDTO>.ReadOnlyContext ctx,
Collector<DataNewDTO> out) throws Exception {
ReadOnlyBroadcastState<Void, InitData> broadcastState = ctx.getBroadcastState(globalDataDescriptor);
InitData initData = broadcastState.get(null);
Map<String, ErpTable> tableMapReal;
if (initData != null) {
tableMapReal = initData.getTableMap();
} else {
tableMapReal = tableMap;
}
if (value.contains("table")) {
DataDTO dataRow = JSON.parseObject(value, DataDTO.class);
if (dataRow != null) {
String[] splitTable = dataRow.getTable().split("\\.");
String tableName = splitTable[1];
ErpTable erpTable = tableMapReal.get(tableName);
if (erpTable != null) {
Object data = dataRow.getAfter();
if (data == null) {
data = dataRow.getBefore();
}
String rowKey = erpTable.getRowKey();
if (StringUtils.isNotBlank(rowKey)) {
DataNewDTO dataNew = new DataNewDTO();
dataNew.setPos(dataRow.getPos());
dataNew.setRealData(data);
dataNew.setOpTs(dataRow.getOp_ts());
dataNew.setCurrentTs(dataRow.getCurrent_ts());
dataNew.setTableName(tableName);
dataNew.setOpType(dataRow.getOp_type());
dataNew.setKeyGroup(erpTable.getKeyGroup());
dataNew.setRowKey(rowKey);
out.collect(dataNew);
}
}
}
}
}
@Override
public void processBroadcastElement(InitData value, BroadcastProcessFunction<String, InitData, DataNewDTO>.Context ctx,
Collector<DataNewDTO> out) throws Exception {
// 更新广播状态中的全局变量数据,value就是TableSource中发送过来的数据
BroadcastState<Void, InitData> broadcastState = ctx.getBroadcastState(globalDataDescriptor);
broadcastState.clear();
broadcastState.put(null, value);
}
}
这里就是流处理逻辑,BroadcastProcessFunction和普通的ProcessFunction相比就是多了processBroadcastElement需要重写。这里逻辑几乎是固定的写法,清空当前广播流的状态变量,将最新维表数据value填充进去。上面processElement这里做了一点小小的优化,之前是直接将数据的所有字段全量转换DataDTO.class,现在是将后置的逻辑提前了,并删除了无用的字段精简为DataNewDTO.class。
这里有个坑点,就是if (initData != null) 这一步的作用是防止广播流在主流处理时还没有获取到数据,因此在主流运行之前初始化数据还是有必要的。
思考和弯路
其实在这之前做了很多无意义的尝试,当时并没有太理解所谓流处理的代码运行逻辑。比如我在主流前面加了一个单例的全局变量,然后写了一个定时线程池更新这个全局变量,当然这是没有用的,变量进入流处理阶段就固化了,后面更新也没啥用。我并没有深刻理解在这个流处理过程中,内部数据的周转和外部传入参数的生命周期,写起来就很费劲。后面在实现了RichSinkFunction的Sink端定义一个类静态变量,重写Open方法时初始化同时定义一个定时器更新,直接报错,说我违规更新变量,哈哈。
反正就是步步试错吧,在写的过程中参考了ChatGPT的回答,当然我建议使用ChatGPT进行开发时要注意同步在官网或者网上搜索一下现成或者相似的案例,不然真有可能被AI带歪了。一开始对大数据不熟嘛,网上请教的时候,把场景一说,有人就跟我说这不就是维表嘛,然后我就顺着这个关键词去搜索解决方案。后面也是重写回顾了广播流和状态变量,网课都有,但是真到用的时候不一定能记得起来这里可以用,这才是学习一门新技术最让人头疼的地方,而且这也是老司机的意义所在。
也就是因为在Sink端做维表数据维护比较麻烦,我想的是不要在每一个slot上都做相同的定时维护,这样很浪费性能。我想的是所有Sink端的Slot共用同一个全局变量,并且有个地方能更新这个全局变量,这就是我最开始的方法,从最末端开始思考。但是上面两段思考也体现了我思维和能力的局限性,最后还是选择在处理流中直接传递Sink端用到的数据,相当于在流数据中冗余了一份数据,用额外的空间来实现了这个功能。
性能优化
根据业务规则优化
性能优化这块也是结合维表逻辑修改带来的流数据变更进行了相应的修改,原先转换批量SQL和对特殊数据处理的逻辑没有变化,主要是针对我找到的问题进行修改,也就是极端情况下批量转换失效,如果是IUDIUD这样过来,就和一条条执行没有区别。那么整合规则,可以得到以下逻辑
- 根据表+主键进行拆分,没有主键的走原来的逻辑确保顺序
- 如果只有单一的I/U/D,就加入批量集合ALL,无需先后顺序
- 如果是I->U...->D或者U...->D,尾部删除的这种数据日志记录后直接丢弃
- 如果是I->U...,这种需要顺序执行,I并入批量集合IA,U并入批量UA等待IA执行后执行
- ........
主要还是SQL执行这种特殊的业务逻辑相对来说规则固定,所以可以根据业务进行优化。核心优化逻辑还是经典的多线程和批量,之前也有讲过,代码里就这两种操作最立竿见影。
优化细则
因为从上个版本到现在差不多三个月的时间了,中间陆陆续续有别的项目,也不考证具体花费了多少时间了,这里列一下具体优化和变更的点。
- 初始化维表数据的时候,之前是开了Druid,但是实际上没有复用场景。因此直接开链接一次性获取,节省创建连接池开销。
- 由于引入了广播流和状态变量,为了保证数据的传递,前置了原本在Sink端的处理逻辑。同时对传递的数据量进行了优化,例如筛选和减少类属性字段。
- Sink端重写后,针对不同业务类型的数据进一步开了多线程并行处理,最后.join()聚合,避免串行引发的性能问题。
我在工作中处理大数据量的问题还蛮多的,因为涉及多数据源,本身供应链的数据量也很大,报表场景很多,下面是部分表的截图,这些都是稍微大点的。
报表场景的结果集或者中间结果集一般都是百万左右的量级,随着时间增长差不多千万上亿的量级。
真实处理中涉及到的数据会更多,例如重要数据来源之一的ERP,就是上面这种,部分表不是数据大就是量大,处理起来就很麻烦。在之前的处理中会根据情况使用一些历史数据迁移分库分表之类的操作,或者有些业务可以上NoSQL的,比如ElasticSearch做搜索和分析啥的。现在迁移到TiDB,说实话千万级的量级处理可以接受了,优化下SQL应付看板就很方便。 平时大数据量会存在的问题,一般是三种问题:
嵌入了好几篇之前写的文章,主要是写烂了,再废话有点水了,不如直接贴下文章,也感谢读者大大赏脸看了。
为什么要用TiDB?
我发现好多人问我这个问题,切TiDB的原因是什么,为啥不用其他NoSQL?因为我们是从MySQL迭代过来,所以势必要说和MySQL的对比。
- 提升运维效率。版本升级、分库分表、主从同步、维护高可用等运维问题在MySQL上存在步骤多、修改面大的问题。TiDB版本升级和扩缩容直接用TiUP命令即可,分布式架构自带高可用,也不用修改代码适配分库分表。
- 更强的可观测性,自带看板TiDB DashBoard,硬件和SQL的监控分析都很到位。
可能存在的问题
- 分布式带来的网络开销,在轻量级数据下不如单体架构的MySQL。
- 兼容性存在问题,比如部分函数、触发器、存储过程。而且代码上分页必须排序才能获取正确的数据。
- 更多的硬件资源需求,docs.pingcap.com/zh/tidb/sta…。举个磁盘占用的例子,同样表数据大小的情况下,磁盘占用量TiDB略大于MySQL。
写在最后
我最近有收获一些好事,也有不如意的事情,人之不如意事十之八九,虽然有些夸张,但是最近落差略大有些遗憾,当然我还需要继续努力,去证明自己。对啦对啦,最近遇到了一些不错的人,有人给我推荐了孤独摇滚这部老番,周末看完有被治愈到,又有了重新振作出发( ̄︶ ̄)↗ 的勇气了,哈哈。
转载自:https://juejin.cn/post/7373592723863158799