分享|开源流数据库 RisingWave 超详细学习使用笔记
该笔记来源于 RisingWave 社区用户投稿,若你对该项目感兴趣或者你要投稿,可评论或者私信留言。
本文希望帮助你快速搭建 RisingWave 学习环境,学习如何连接数据库,通过几个示例快速了解 Risingwave 的基本用法以及一些注意事项:
1. 学习环境部署
环境要求,一台 linux 服务器(可以是非联网环境)
注:本文案例使用的操作系统环境为Almalinux,其他使用yum
包管理工具的发行版(如centos、redhat等)均可参考
一键部署命令
curl <https://risingwave.com/sh> | sh
如遇无法联网或下载缓慢的情况,请参考如下步骤进行安装
-
复制控制台输出
... from URL into ...
中的URL链接,粘贴到浏览器或迅雷等下载工具进行下载,然后上传到当前服务器。 -
下载shell安装脚本到本地
curl <https://risingwave.com/sh> -o deploy.sh
- 修改并运行安装脚本
#在上传安装包文件所在目录下执行
sed -i 's/curl -L "${URL}" | tar -zx || exit 1/tar -zxf "下载的安装包文件名"/' deploy.sh && sh deploy.sh
- 运行数据库
./risingwave
如因操作错误等原因导致数据库异常,可以执行 rm ~/.risingwave -rf
后在重新运行 ./risingwave
启动一个全新的数据库
2. 连接数据库
2.1 命令行连接
方式一:安装psql工具
psql -h localhost -p 4566 -U root dev
方式二:若本机安装了docker,也可使用docker部署postgres容器,然后通过容器访问
docker exec -it postgres psql -h host.docker.internal -p 4566 -U root dev
2.2 客户端连接
以pycharm(professional版本)为例
Pycharm客户端中,由于兼容性问题,有时候一条 risingwave sql 语句会自动被从中间拆开提交执行,从而导致报错,此时可以尝试删除自动分段处的空格再重试,如果还是不行的话,再切换命令行下psql客户端执行。
2.3 python连接
import psycopg2
conn = psycopg2.connect(host='localhost', port=4566, user='root', database='dev')
cur = conn.cursor()
cur.execute("set timezone = 'Asia/Shanghai';")
cur.execute("select current_database(), current_timestamp, version();", )
res = cur.fetchall()[0]
for c in res:
print(c)
cur.close()
conn.close()
3. 测试数据导入
3.1 前置条件
你需要有一个risingwave所在服务器能够访问的mysql数据库,学习环境可以考虑使用docker部署mysql
Risingwave底层使用debezium(v2.4)实现mysql-cdc, Mysql数据库需要进行一定的基础配置,首先需要确保mysql开启了binlog,然后创建有binlog读取权限的采集用户(如果不想使用root用户的话)
CREATE USER 'username'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'username' IDENTIFIED BY 'password';
FLUSH PRIVILEGES;
3.2 Mysql建表
-- 包含了大部分mysql基本类型-- 注意,Risingwave 的 cdc 要求上游数据库源表必须指定主键drop table if exists test;
create table test (
c01 int,
c02 char,
c03 varchar(10),
c04 text,
c05 bigint,
c06 long,
c07 numeric(10, 2),
c08 decimal(20, 8),
c09 date,
c10 datetime,
c11 datetime(3),
c12 timestamp(6),
primary key (c01, c02)
);
insert into test (c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12)
values (1, 'a', 'abc', 'abcdefg', 123456789000, 99999999999999, 123.456, 12345.6789,
current_date,
current_timestamp,
'2024-01-01 18:18:18.888',
'2024-01-01 06:06:06.666666');
select * from test;
3.3 Risingwave 通过 mysql-cdc 接入数据
create schema rwtest; -- 可以在自定义 schema 中操作-- 虽然 create table ... with () 语法也支持创建mysql-cdc表,但每创建一个表都会产生一个mysql的binlog读取连接(可以在mysql端通过show processlist查看),对业务库极不友好,不建议使用。-- CDC SOURCE 可以复用-- CDC SOURCE 不支持级联删除,如有依赖表需要手动删除drop source if exists rwtest.mysql_cdc;
create source rwtest.mysql_cdc
with (
connector = 'mysql-cdc',
hostname = 'localhost',
port = '3306',
username = 'root',
password = '123456',
database.name = 'mysql',
server.id = 16888, -- 一般指定1000以上的随机数字,请确保server.id在数据库中的唯一性 -- 用以下方法指定debezium附加参数配置,一般不建议使用,当且仅当遇到问题时(如数据库版本兼容性问题等)针对性调整 debezium.schema.history.internal.store.only.captured.tables.ddl = 'true',
debezium.schema.history.internal.skip.unparseable.ddl = 'true',
);
show sources from rwtest;
-- 接入CDC表只能用 create table 语法创建实体表,不能用 create source (只支持 append only 数据源)-- CDC 表必须定义主键,且主键字段列表需同源表一致-- 类型映射参考官方文档:<https://docs.risingwave.com/docs/current/ingest-from-mysql-cdc/#data-type-mappingdrop> table if exists rwtest.test_mysql_cdc;
create table rwtest.test_mysql_cdc (
c01 int,
c02 string, -- 字符串类型不需要定义精度 c03 string,
c04 string,
c05 bigint,
c06 bigint,
c07 numeric, -- 数字不需要定义精度 c08 numeric,
c09 date,
c10 timestamp, -- 时间戳类型不需要定义精度 -- c11 timestamptz, -- 可以丢弃主键外的任意源表字段 c12 timestamptz,
primary key (c01, c02)
) from rwtest.mysql_cdc table 'mysql.test';
select * from rwtest.test_mysql_cdc;
-- CDC表建好后,上游mysql中的表的增删改都会即使反映到risingwave中,注意:上游执行 truncate 操作除外-- 关于如何应对上游表结构变更的主题,参考官网相关文档
在实际生产环境中,业务数据库经常不愿随意开放binlog读取权限,此时也可以通过专业的 CDC 数据采集工具先将数据采集到kafka(数据格式一般是json),Risingwave 再通过 kafka 连接器进行消费,也能实现通过 CDC 连接器直连数据库数据进行采集的相同效果。
3.4 apache kafka
下例展示了从 kafka 中消费数据格式为具有固定模式的 json 字符串
drop table if exists user_page_view;
create table user_page_view (
user_id bigint,
page_id bigint,
viewtime timestamp,
user_region varchar,
proc_time timestamptz AS proctime() -- proc_time 生成列目前仅支持appendoly表 -- 这里也可以定义primary key,那么后消费的数据会根据pk对之前已消费主键相同的记录进行覆盖) append only -- 如果定义了primary key,则此处不能同时定义 append only include timestamp as kafka_ts -- 可以附加一些 kafka 元数据列,参考:<https://docs.risingwave.com/docs/current/include-clause/#kafka> with (
connector = 'kafka',
properties.bootstrap.server = 'kafka1:9093',
topic = 'pageviews',
-- kafka集群如果配置了kerberos认证,可参考此处注释内容。所在机器还需要正确配置 krb5.conf、hosts 文件 -- properties.security.protocol='SASL_PLAINTEXT', -- properties.sasl.kerberos.keytab='/path/to/kafka.keytab' scan.startup.mode='latest', -- 默认为 earliest ) format PLAIN ENCODE json;
select * from user_page_view;
3.5 insert 导入
下面这种数据导入方式,在生产环境一般考虑在 python 脚本内通过 psycopg2 api 使用,用于导入一些数据量不太大、变化不频繁的维表数据(截至1.8版本,risingwave 还未实现类似flink中通过 jdbc 连接器batch 导入的功能)
-- 该表用于后续sql语法演示drop table if exists dim_sensor_group cascade;
create table dim_sensor_group (
sensor_id int,
warning_threshold numeric);
comment on table dim_sensor_group is '传感器报警阈值配置表';
comment on column dim_sensor_group.sensor_id is '传感器ID';
comment on column dim_sensor_group.warning_threshold is '报警阈值';
insert into dim_sensor_group
values (0, '95.0'),
(1, '95.0'),
(2, '95.0'),
(3, '95.0'),
(4, '95.0'),
(5, '98.0'),
(6, '98.0'),
(7, '98.0'),
(8, '98.0'),
(9, '98.0');
select * from dim_sensor_group;
4. Risingwave 基本语法和功能点测试
4.1 测试数据准备
为了方便起见,语法测试使用的为 负载生成器(模拟实时接入数据) 和 上面的 dim_sensor_group。
负载生成器生成的数据包含两张表,模拟两组传感器数据:
drop table if exists sensor_grp_1 ;
create table sensor_grp_1 (
id int,
value double, -- 生成小数类型目前只支持double,指定其他类型会报语法错误 ts timestamptz -- 测试时建议用 timestamptz , 若用 timestamp 查询时需要额外的转换) with (
connector = 'datagen',
fields.id.kind = 'random', -- 如果指定为sequence,序列用完后就会停止生成 fields.id.min = '0',
fields.id.max = '4',
fields.id.seed = '1',
fields.value.kind = 'random',
fields.value.min = '40',
fields.value.max = '99',
fields.value.seed = '1',
fields.ts.max_past = '0h', -- 指定为0的话从当前时间开始生成 fields.ts.max_past_mode = 'relative',
fields.ts.seed = '1',
datagen.rows.per.second = '2' ) format plain ENCODE json;
select * from sensor_grp_1;
drop table if exists sensor_grp_2 cascade;
create table sensor_grp_2 (
id int,
value double,
ts timestamptz
) with (
connector = 'datagen',
fields.id.kind = 'random', -- 如果指定为sequence,序列用完后就会停止生成 fields.id.min = '5',
fields.id.max = '9',
fields.id.seed = '1',
fields.value.kind = 'random',
fields.value.min = '40',
fields.value.max = '99',
fields.value.seed = '1',
fields.ts.max_past = '0h', -- 指定为0的话从当前时间开始生成 fields.ts.max_past_mode = 'relative',
fields.ts.seed = '1',
datagen.rows.per.second = '2' ) format plain ENCODE json;
select * from sensor_grp_2 order by ts desc;
4.2 普通join、union、动态时间过滤(temporal filter)、级联删除
业务逻辑:合并两组传感器数据
set timezone = 'Asia/Shanghai'; -- 设置客户端时区,timestamptz才会显示预期值-- 删除一个物化视图或表时,如果有其他物化视图依赖要删除的对象,则会报错,这时可以用 casecade 语法级联删除drop materialized view if exists mv1 casecade;
create materialized view mv1
asselect a.id, b.group_name, b.warning_threshold
, a.value -- round(x_double, y_int) 暂不支持,具体函数清单参考:<https://docs.risingwave.com/docs/current/sql-functions/> , a.ts
, a.value > b.warning_threshold as warning
from sensor_grp_1 a
join dim_sensor_group b on a.id = b.sensor_id
where date_trunc('day', ts) + interval '1' day > now() -- 只保留当天的数据-- 注意:temporal filter 的条件表达式右侧需满足 now() +/- interval 的格式, now 和 current_timestamp 等价-- where ts >= datetrunt('day',now()) 错误的 temporal filter 表达式示例 union allselect a.id, b.group_name, b.warning_threshold, a.value, a.ts, a.value > b.warning_threshold as warning
from sensor_grp_2 a
join dim_sensor_group b on a.id = b.sensor_id
where date_trunc('day', ts) + interval '1' day > now();
4.3 嵌套物化视图、开窗函数、累积时间窗口
Risingwave 中没有特别提出累积时间窗口这个概念,参考flink文档
业务逻辑:每组传感器中历史以来数值最大的三条记录
drop materialized view if exists mv2;
create materialized view mv2
asselect group_name, id, value, ts, rk
from (
select group_name, id, value, ts, dense_rank() over (partition by group_name order by value desc) as rk
from mv1 -- 物化视图可以被多层嵌套引用 ) s
where s.rk <= 3;
select * from mv2 order by group_name, value desc;
4.4 hop时间窗口,分组聚合
risingwave 中包含两种时间窗口的概念:hop(最近多久)和 tumble(每隔多久)
业务逻辑:查询最近5分钟内,每组传感器中,值排前三的纪录
drop materialized view if exists mv3;
create materialized view mv3
asselect * from (
select group_name
, id , value , ts
, warning_threshold
, window_start
, window_end
, rank() over (partition by group_name,window_start order by value desc) as grp_rk
from hop(mv1, ts, interval '1' minute, interval '5' minute) -- 窗口长度参数必须能够被滑动步长整除 ) s
where grp_rk <= 3 and window_end < now() -- hop时间窗口的是由数据驱动推进的,最后一个可能的时间窗口为(离当前时间最近的切分点, start+窗口长度], 如果不加此条件,最后一个窗口实际已发生时间可能小于步长时间 ;
-- mv3中包含了所有窗口的计算结果,如果只想看最后一个窗口,一般还要再做一层子查询过滤或通过limit实现;select * from mv3 order by window_start desc, group_name, grp_rk order by window_start desc, group_name, grp_rk limit 9;
-- 此处共3个组,每个窗口3条记录,limit 9 就是只取各组最后一个窗口的的记录;
4.5 tumble时间窗口、时间窗口和维表连接、时间窗口连接(Windows join)、cte语法,即席查询
业务逻辑:每个小时内,要对比A组和C组中最高纪录值情况
with cte1 as (
select group_name, id, value, ts, window_start, window_end, row_number() over (partition by group_name,window_start order by value desc) as rn
from tumble(mv2, ts, interval '1' hour) a -- tumble 可以作用于 Table/Source/MV,在flink中被称为窗口表值函数(TVF) where group_name = 'A' ),
cte2 as (
select group_name, id, value, ts, window_start, window_end, row_number() over (partition by group_name,window_start order by value desc) as rn
from tumble(mv2, ts, interval '1' hour) a
where group_name = 'C' )
select a.window_start
, a.window_end
, a.group_name || '-' || a.id || '-' || a.value as grp_a_max -- 方便的字符串连接操作 , b.group_name || '-' || b.id || '-' || b.value as grp_c_max
, a.ts as a_ts
, b.ts as c_ts
, now()::varchar -- 方便的类型转换操作符 from cte1 a
join cte2 b
on a.window_end = b.window_end -- 时间窗口关联时,窗口类型和大小必须一致 where a.rn = 1 and b.rn = 1 order by a.window_start desc, a.rn
;
4.6 间隔连接(Interval join)
业务逻辑:找到A组中最近1个小时内峰值记录,查询在此之前的1天中,B、C组中是否有更大值记录出现
with cte1 as (
select group_name, id, value, ts
from mv2
where group_name = 'A' and ts > now() - interval '1' hour -- 取A组中最近1个小时内的峰值记录 order by value desc limit 1 )
select a.ts, a.value, b.* from cte1 a
join mv1 b
on b.ts between a.ts - interval '1' day and a.ts
-- interval join 就是对两个表中时间字段进行条件关联,此处为:限定B/C组中的记录时间为A组峰值记录时间的前一天内 and b.value > a.value where b.group_name in ('B', 'C')
;
4.7 处理时间时态连接(Process-time temporal join)、只读事务
这是Risingwave中最为特殊的一种时间连接,它主要用于实现如下目标,当维表发生变化是,物化视图中的已经计算过的数据,不希望再受影响。
比如,在以下demo中,客户和销售员的绑定关系发生变化时,历史考核明细表中不希望受到影响。
-- 客户员工关系表drop table if exists cust_emp_rel cascade;
create table cust_emp_rel (
cust_id int primary key, -- 必须定义主键 emp_id varchar);
insert into cust_emp_rel values (1, 'emp_1'), (2, 'emp_2'), (3, 'emp_3');
-- 客户交易表drop table if exists cust_trade_detail cascade;
-- 下面这一句在jetbrains客户端中就无法执行(会从with那里自动分成两个statement提交),需要通过python或命令行提交-- 将 only 后面的空格删掉则可以正常执行create table cust_trade_detail (
cust_id int,
trd_amt double precision,
trd_time timestamptz
) append onlywith ( connector = 'datagen',
fields.cust_id.kind = 'random',
fields.cust_id.min = '1',
fields.cust_id.max = '6',
fields.trd_amt.kind = 'random',
fields.trd_amt.min = '1000',
fields.trd_amt.max = '2000',
fields.trd_time.max_past = '0h',
fields.trd_time.max_past_mode = 'relative',
fields.trd_time.seed = '3',
datagen.rows.per.second = '1',
);
select * from cust_trade_detail;
-- 考核明细视图-- 实验组,使用 Process-time temporal join,预期行为是:维表变化时,历史数据不变create materialized view emp_kpi_detail1
asselect b.emp_id, a.cust_id, a.trd_amt, a.trd_time
from cust_trade_detail a
left join cust_emp_rel for SYSTEM_TIME as of PROCTIME() b -- 这一固定语法要求右表必须有主键,否则会报错;注意别名的位置 on a.cust_id = b.cust_id;
-- 对照组,使用普通join,预期行为是:维表变化时,历史数据同步刷新create materialized view emp_kpi_detail2
asselect b.emp_id, a.cust_id, a.trd_amt, a.trd_time
from cust_trade_detail a
left join cust_emp_rel b
on a.cust_id = b.cust_id;
-- 维表无变化时,上面两个视图数据应该完全一致-- 只读事务特性:事务期间的数据库变化对事务不可见begin read only; -- 开启只读事务-- 虽然 cust_trade_detail 表中数据一直在源源不断产生,但在结束只读事务之前:多次查询 emp_kpi_detail1 结果都相同的、emp_kpi_detail1 和 emp_kpi_detail2 的结果是完全一致的select * from emp_kpi_detail1;
select * from emp_kpi_detail2;
select * from emp_kpi_detail1 except select * from emp_kpi_detail2
union allselect * from emp_kpi_detail2 except select * from emp_kpi_detail1; -- 验证两个视图数据无差异commit; -- 结束只读事务-- 修改维表数据update cust_emp_rel set emp_id ='emp_111' where cust_id = 1;
insert into cust_emp_rel values (4, 'emp_4'), (5, 'emp_5'), (6, 'emp_6'), (7, 'emp_7'), (8, 'emp_8');
-- 维表数据变化后,emp_kpi_detail1 中的历史数据不会发生变化,而 emp_kpi_detail2 中则会select * from emp_kpi_detail1 where cust_id in (1, 4, 5, 6) order by trd_time ; -- cust_id 等于1的记录中,emp_id 得值仍然是 emp_1select * from emp_kpi_detail2 where cust_id in (1, 4, 5, 6) order by trd_time ; -- cust_id 等于1的记录中,emp_id 得值全变成 emp_111
5. 关于 RisingWave
转载自:https://juejin.cn/post/7365003815508836393