flink&chunjun远程提交
主题
先说说场景,同样是渐进式的数据清洗能力增强,最先摸到的是taier
,基于在线vscode的交互模式,功能方方面面其实挺好,但得具备flink和纯均的基础,以指定的如数据cdc、增量同步等处理,同时考量大批量同步扩展问题,综合下来就只能去定制了自己去摸查一下远程提交这块,其中包含flink、taier、chunjun等内容
Flink
Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算,这里的教程比较多,不过多做展开。
chunjun
Chunjun是一个基于Flink的批流统一的数据同步工具
- 基于
json
,sql
快速构建数据同步任务,你只需要关注数据源的结构信息即可, 让您节省时间,专注于数据集成的开发。 - 基于flink 原生的input,output 相关接口来实现多种数据源之间的数据传输,同时你可以基于 flink 自己扩展插件。
多种数据源之间数据传输
、断点续传
、增量同步
、实时采集
、脏数据管理
、实时数据还原
两者结合
使用上来说两者结合是常态,chunjun类似于提供了一套增强的语法及提交模式,扩展了一些数据源支持、
chunjun-clients
提供了sh
的远程提交模式,而需要处理的类似于实现flink的远程提交方式+chunjun-core
的调用执行,以下是一些实现的问题枚举。
1.flink standalone模式
- 可以看到flink的demo中
WordCount.jar
中的执行环境获取ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- 构造一个远程提交客户端RestClusterClient
Configuration flinkConfig = new Configuration();
flinkConfig.setString("jobmanager.rpc.address", "192.168.145.130"); // ?�I????JobManager???
flinkConfig.setString("jobmanager.rpc.port", "6123"); // ?�I????JobManager RPC???
// flinkConfig.setString("jobmanager.web.port", "8081");
flinkConfig.setString("rest.address", "192.168.145.130");
flinkConfig.setString("rest.port", "8081");
flinkConfig.setString("classloader.resolve-order", "parent-first");
// flinkConfig.setString("classloader.resolve-order", "child-first");
// flinkConfig.setString("classloader.check-leaked-classloader", "false");
RestClusterClient<StandaloneClusterId> clusterClient = new RestClusterClient<StandaloneClusterId>(
flinkConfig, StandaloneClusterId.getInstance());
- 构造flink PackagedProgram 进行jar包远程提交,进行任务执行,可理解为任务的“地面验证”,只有没有异常问题才会进入下一步。
List<URL> urlList= getJarUrls("F:/chunjun/chunjun-dist"); PackagedProgram packagedProgram = PackagedProgram.newBuilder() .setJarFile(jobJar) .setArguments(programArgs) .setEntryPointClassName(mainClass) .setUserClassPaths(urlList) .setConfiguration(flinkConfig) .build(); int parallelism = flinkConfig.getInteger(DEFAULT_PARALLELISM); // Pipeline pip = PackagedProgramUtils.getPipelineFromProgram(packagedProgram, flinkConfig, parallelism, true); // JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(null, pip, flinkConfig, parallelism); // FactoryUtil.getFactoryHelperThreadLocal JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, flinkConfig, parallelism, false); jobGraph.getClasspaths().clear(); jobGraph.getUserJars().clear(); jobGraph.getUserArtifacts().clear();
- 提交任务
// 任务提交 JobID jobID = clusterClient.submitJob(jobGraph).get(); // 如果是批任务可以查看执行结果,注意,提交时会进行任务执行,可理解为任务的“地面验证” // result = clusterClient.requestJobResult(jobID).get();
2.用反射执行的时候遇到的问题比较多
- 先是对
chunjun-client
进行提交,后来发现这里面完整的职能与当前处理类似,于是与chunjun-client
类似进行core
包的提交。 - 一个setUserClassPaths这个相当于一运行依赖项的引用,这里相对比较友好,总之是有提示抛出,顺利的搞出
json
提交后发现sql
模式有异常,原因是chunjun-core
中对flink进行了一些扩展和封装,死活无法加载这些扩展内容、后来不得不把一些扩展core
的扩展放入当前项目中 - 有关于maven引用的问题
flink-table-uber
、flink-table-uber-blink
类似于包flink-table相关的包集合,反正这两个坑我是遇到了,动态搞了一堆外部的引用,在maven中引用相关引用包即可。
3. 提交结果检测
如此项是对mysql binlog日志的cdc捕获同步操作
CREATE TABLE source
(
id bigint,
`name` varchar
) WITH (
'connector' = 'binlog-x'
,'username' = 'root'
,'password' = 'root'
,'cat' = 'insert,delete,update'
,'url' = 'jdbc:mysql://192.168.145.1:3306/chunjun?useSSL=false'
,'host' = '192.168.145.1'
,'port' = '3306'
,'table' = 'test'
,'timestamp-format.standard' = 'SQL'
);
CREATE TABLE sink
(
id bigint,
`name` varchar
)
WITH (
'connector' = 'stream-x'
);
insert into sink select id,`name` from source u;
CREATE TABLE table_sink
(
id bigint,
`name` varchar,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://192.168.145.1:3306/chunjun?useSSL=false',
'table-name' = 'test_log',
'username' = 'root',
'password' = 'root'
);
insert into table_sink select id,`name` from source u;
总结
看理论的概念和直接上手去摸查实在是相差大,可能跟我的习惯相关, 定义目标->解决方案->实现->遇到问题->解决问题->实现目标,这个思路,反正遇到的问题挺多,桥脑到的问题也多,好在抓狂到最后问题也解决了,许多的问题尽量把自己的韧性提高一些,尽量不要降低目标,可以有PlanB的应急方案,但尽量不要放弃第一目标、
-------------------------------------------------------- 六爻卦起、知而不避,愿你的执着,因代码而美丽~、
转载自:https://juejin.cn/post/7374067795656491045