likes
comments
collection
share

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

作者站长头像
站长
· 阅读数 32

先看效果

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

  • 支持 PC 端下单,也支持多人通过手机扫码在线下单,可交互性更强。
  • 订单数据写入 OB TP 数据库,并通过 Flink CDC 实时同步到 OB AP 数据库,并从 AP 库中查询最新数据。
  • 不管是 count(*) 计数还是 where + group by + order by 多条件查询,亿级别的数据查询耗时基本都在 10ms ~ 100ms,充分体现了 OB AP 的查询性能。(注: 该耗时包括了后端到数据库之间的网络延时,因此数据库内部的 SQL 耗时还要更低)
  • 只建了主键索引。

准备数据库

  • TP 数据库: OB 4.3.0 行存 + 开启 Binlog 服务
  • AP 数据库: OB 4.3.0 列存

使用 OBCloud 阿里云版本

  • 为了降低搭建成本,方便后续和 Flink 以及应用进行集成,直接使用 OBCloud 阿里云版本,数据库配置如下:

  • OB 版本 4.3.0.1,目前该版本在 OBCloud 需要开通白名单才能购买,具体可以联系官方技术服务同学进行开通。

  • 我这里购买一个按需付费的集群实例,3 节点,节点规格为 14C70G,价格 ¥32/小时。如果只用于测试,可以申请 🎉 OB Cloud 的 30 天免费试用,最低规格的 1C4G 就可以满足需求。

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

  • 在集群实例下创建了 oltp 和 olap 两个租户,用作 TP 和 AP 库,配置如下:

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

  • 在两个租户下分别创建数据库、访问账号,并配置 IP 白名单和公网访问地址,然后就能得到数据库连接串。具体过程不赘述,可以参考我之前的文章 使用阿里云的 OceanBase 云服务

TP 库开通 Binlog 服务

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

行存表和列存表

  • 连接 TP 库,创建 tp_car_orders 表(行存):

    create table tp_car_orders( order_id bigint primary key NOT NULL AUTO_INCREMENT, -- order_time 默认值由数据库自动生成 order_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, customer_name varchar(25) COLLATE utf8mb4_bin NOT NULL, sale_nation varchar(25) COLLATE utf8mb4_bin NOT NULL, sale_region varchar(25) COLLATE utf8mb4_bin NOT NULL, car_color varchar(25) COLLATE utf8mb4_bin NOT NULL, car_price decimal(15,2) NOT NULL );

  • 连接 AP 库,创建 ap_car_orders 表(列存)。

  • 由于两张表需要通过 Flink 进行同步,因此两张表的结构几乎完全一样。

  • 唯二的区别: ① 一个是行存,一个是列存 ② order_time 的默认值不同。

    create table ap_car_orders( order_id bigint primary key NOT NULL AUTO_INCREMENT, -- order_time 从 tp_car_orders 表同步过来 order_time timestamp NOT NULL, customer_name varchar(25) COLLATE utf8mb4_bin NOT NULL, sale_nation varchar(25) COLLATE utf8mb4_bin NOT NULL, sale_region varchar(25) COLLATE utf8mb4_bin NOT NULL, car_color varchar(25) COLLATE utf8mb4_bin NOT NULL, car_price decimal(15,2) NOT NULL ) WITH COLUMN GROUP (each column);

准备 Flink

开通阿里云 Flink

  • 本地安装并使用 Flink 进行同步可以参考 OB 官方文档,我这里为了简单,直接使用 阿里云 Flink 托管版本。包年包月比较贵,可以使用按量付费版本,按照指引开通服务即可。

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

Flink 网络配置

  • 进入 Flink 工作空间,使用前建议使用网络探测功能验证数据库是否可正常连接。

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

  • 如果 Flink 和数据库在同一个 VPC: 可以通过数据库私网地址进行连接。

  • 如果 Flink 和数据库不在同一个 VPC:

  • 配置 跨 VPC 访问

  • 通过数据库公网地址进行连接,但阿里云 Flink 默认不支持访问公网,需要配置 NAT 网关实现 VPC 网络与公网网络互通,详见 文档

  • 保证 Flink 和数据库可以互通之后,网络探测的结果应该如下:

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

Flink 同步配置

  • 进入「配置管理」,修改作业默认配置,将「系统检查点间隔」和「两次系统检查点间最短间隔」两个参数均改为 1 秒,以保证同步的实时性。

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

Flink SQL 开发

  • 进入「SQL 开发-新建-新建空白的留作业草稿」

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

  • 输入以下 SQL 内容:

    -- 创建 TP CDC 表,表结构和源表 tp_car_orders 一致 CREATE TEMPORARY TABLE tp_car_orders_cdc ( order_id bigint primary key NOT ENFORCED, order_time timestamp NOT NULL, customer_name varchar(25) NOT NULL, sale_nation varchar(25) NOT NULL, sale_region varchar(25) NOT NULL, car_color varchar(25) NOT NULL, car_price decimal(15,2) NOT NULL ) WITH ( -- OB Binlog 服务兼容 MySQL BinLog 服务,因此可以使用 mysql-cdc 作为连接器 'connector' = 'mysql-cdc', 'hostname' = , 'port' = , 'username' = <USER_NAME>, 'password' = , 'database-name' = 'oltp', 'table-name' = 'tp_car_orders' );

    -- 创建 AP CDC 表,表结构和目标表 ap_car_orders 一致 CREATE TEMPORARY TABLE ap_car_orders_cdc ( order_id bigint primary key NOT ENFORCED, order_time timestamp NOT NULL, customer_name varchar(25) NOT NULL, sale_nation varchar(25) NOT NULL, sale_region varchar(25) NOT NULL, car_color varchar(25) NOT NULL, car_price decimal(15,2) NOT NULL ) WITH ( -- 使用 jdbc 连接器 'connector' = 'jdbc', 'url' = 'jdbc:mysql://:/olap', 'username' = <USER_NAME>, 'password' = , 'table-name' = 'ap_car_orders', -- 不缓存记录,直接 flush 数据 'sink.buffer-flush.max-rows' = '0', -- flush 数据的时间间隔设为 0,直接 flush 数据 'sink.buffer-flush.interval' = '0' );

    -- 将 TP CDC 表同步到 AP CDC 表 INSERT INTO ap_car_orders_cdc SELECT * FROM tp_car_orders_cdc;

  • 📢 注意: 需要调整 ap_car_orders_cdc 以下两个参数,同样的为了保证同步的实时性,我这里均设为 '0',即不做缓存和间隔,直接 flush 数据。这两个参数的用法详见 文档

  • sink.buffer-flush.max-rows

  • sink.buffer-flush.interval

  • SQL 语法检查和网络连通性校验:

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

  • 启动 SQL 调试,并往 tp_car_orders 表里插入一条数据:

    INSERT INTO tp_car_orders (car_price,car_color,sale_region,sale_nation,customer_name) VALUES (299900,'blue','Washington','America','Lucy');

  • 预期能够捕获到 tp_car_orders 的数据变更,但变更还不会写入 ap_car_orders,需要实际部署到作业才能写入。

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

Flink SQL 部署

  • 部署 SQL 作业:

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

  • 在「作业运维」即可查看对应作业:

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

  • 部署并启动成功后,重新往 tp_car_orders 表里插入一条数据:

    INSERT INTO tp_car_orders (car_price,car_color,sale_region,sale_nation,customer_name) VALUES (299900,'blue','Washington','America','Lucy');

  • 可以看到在 ap_car_orders 表中数据已经同步:

    mysql> select * from ap_car_orders; +----------+---------------------+---------------+-------------+-------------+-----------+-----------+ | order_id | order_time | customer_name | sale_nation | sale_region | car_color | car_price | +----------+---------------------+---------------+-------------+-------------+-----------+-----------+ | 1 | 2024-05-07 17:10:37 | Lucy | America | Washington | blue | 299900.00 | +----------+---------------------+---------------+-------------+-------------+-----------+-----------+ 1 row in set (0.02 sec)

应用配置

  • 拉取应用代码: github.com/dengfuping/…

  • pnpm i 安装项目依赖。

  • 新建 .env 文件并配置 TP、AP 库的数据库连接串:

    OLTP_DATABASE_URL="" OLAP_DATABASE_URL=""

  • pnpm run dev 启动应用。

导入数据

  • clone 代码仓库,通过 npm run seed批量导入脚本,默认导入 1.5 亿条数据,可修改脚本逻辑按需调整:

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

  • 导入数据后需要对 AP 库发起一次合并,才能保证最优的查询性能。

    mysql> ALTER SYSTEM MAJOR FREEZE;

AP SQL 调优(可选)

运行效果

🚀 基于 Flink 的 OceanBase AP 实时分析 demo基于 Flink 的 OceanBase AP

转载自:https://juejin.cn/post/7385350484411088922
评论
请登录