likes
comments
collection
share

RisingWave 1.8 发布!新增 Iceberg Source 和 MongoDB CDC 连接器

作者站长头像
站长
· 阅读数 8

开源流式数据库 RisingWave 1.8 版本正式发布!此版本包含了功能更新以及错误修复:对 Python UDF、Rust UDF 和连接器做了进一步改进,使用户能够更加灵活地使用数据库。此外,解耦 Sink 方面有了重大变更,请继续阅读以了解如何避免兼容性问题。

1. 重大变更:解耦 Sink

自 1.8 版开始,RisingWave 改用了一种更轻量级的状态清理方法,这会影响到已启用解耦功能的 Sink。1.6 版本首次实现了这一状态清理方法,但我们此后将不再进行 Sink 解耦兼容性的维护。因此,我们建议已启用解耦 Sink 的用户避免直接更新至 1.8 版本,或者在更新前删除所有已启用解耦功能的 Sink。

要检查是否有已启用解耦功能的 Sink,请先更新至 1.7 版本。1.7 版本支持内部表 rw_sink_decouple,可检查所有 Sink 的解耦状态。1.6 更新至 1.7 时不会出现兼容性问题。接下来,运行以下查询。

SELECT * FROM rw_sink_decouple 
WHERE is_decouple AND watermark_vnode_count < 256;

该查询将返回更新时可能会遇到兼容性问题的所有 Sink。如果查询返回空结果,则可以放心更新至 1.8 版本。

2. 嵌入式 UDF

上个月的 1.7 版本新增了对其他语言 UDF 的支持,本月的新版本则继续改进了 UDF 的功能。现在,您可以在 Python 和 Rust 中创建嵌入式 UDF,这意味着这些 UDF 可在 RisingWave 内部定义、编译和运行。这样,就无需安装外部 API。不过,您在使用外部库或文件系统时会受到限制。

3. 嵌入式 Python UDF

创建嵌入式 Python UDF 时,请使用 CREATE FUNCTION 命令。下面是一个示例。

CREATE FUNCTION gcd(a int, b intRETURNS int LANGUAGE python AS $$
def gcd(a, b):
    while b != 0:
        a, b = b, a % b
    return a
$$;

函数定义使用 Python 句法编写。您还可以创建表函数,并让函数返回 struct 类型。

目前嵌入式 UDF 只能使用纯计算逻辑,但如果您需要重复使用复杂计算,它还是很有用的。不过,您可以访问 jsondecimalremath 和 datetime 库。创建函数后,您可以像调用其他内置函数一样对其进行调用。

某些内置函数是不允许的。有关完整列表,请参阅下方官方文档。

4. 嵌入式 Rust UDF

同样,可使用 CREATE FUNCTION 命令创建嵌入式 Rust UDF。

CREATE FUNCTION gcd(int, int) RETURNS int LANGUAGE rust AS $$
    fn gcd(mut x: i32mut y: i32-> i32 {
        while y != 0 {
            (x, y) = (y, x % y);
        }
        return x;
    }
$$;

函数体使用 Rust 句法定义。像嵌入式 Python UDF 一样,您可以创建表函数并让其返回 struct 类型。虽然不支持其他外部库,但您仍然可以使用标准库 chronorust_decimal 和 serde_json。定义函数后,就可以像使用其他内置函数一样对其进行使用。

有关句法的详细信息,请参阅下方文档。

更多详细信息,请参阅:

3. 刷新 Schema

过去要刷新使用 Schema Registry 所定义 Source 的 Schema,需要使用 ALTER SOURCE 命令重新定义其 Schema Registry,过程相当繁琐。此外,也不支持刷新表的 Schema。

此版本新增了 REFRESH SCHEMA 句法,使 RisingWave 中表或 Source 的 Schema 更新变得更加容易。请注意,更新 Schema 时不能更改数据 FORMAT 和 ENCODE 选项。刷新 Source Schema 的句法如下。

ALTER SOURCE s REFRESH SCHEMA;

现在,Source s1 的 Schema 将根据对其所做的更改进行更新。

同样,刷新使用外部连接器所创建表的 Schema 的句法如下。

ALTER TABLE t REFRESH SCHEMA; 

如果在刷新 Schema 时删除了某些列,而这些列又被其他下游 Fragment(如物化视图)引用,则该命令无效。

更多详细信息,请参阅:

4. 窗口函数中的 RANGE

在 SQL 查询中使用窗口函数时,可使用 RANGE 子句来指定窗口帧中所包含的相对于当前行的行范围。该子句可根据行的排序对一系列值进行操作。可按如下方式指定 RANGE 子句。

RANGE BETWEEN frame_start AND frame_end

这里,frame_start 和 frame_end 描述了要对哪些行进行计算。

frame_start 可以是 UNBOUNDED PRECEDINGCURRENT ROW、特定数量的行或特定的时间间隔。例如,以下子句包括从前一天到当前行的所有行。

RANGE BETWEEN 1 DAY PRECEDING AND CURRENT ROW 

frame_end 可以是 UNBOUNDED FOLLOWINGCURRENT ROW、特定数量的行或时间间隔。以下子句包括当前行之后的所有行。

RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING

更多详细信息,请参阅:

5. 支持 Ruby 客户端

如果您有一个外部 Ruby 应用,现在就可以在该应用中使用任何第三方 PostgreSQL 驱动程序与 RisingWave 进行交互。我们的官方文档介绍了如何使用 ruby-pg 驱动程序。下面几行 Ruby 代码能让您与 RisingWave 数据库建立连接(假定使用默认的数据库凭据)。

require 'pg'

conn = PG.connect(host'127.0.0.1', port4566, dbname'dev', user'root')

建立连接后就可以像在 RisingWave 中一样运行 SQL 查询,也可使用 ruby-pg 提供的任何内置功能。

ruby-pg 驱动程序可确保最佳性能,并与 PostgreSQL 的功能和数据库操作兼容。这使其适用于构建具有高性能要求的 Ruby 应用。

更多详细信息,请参阅:

6. 新增 Source 连接器

除了对现有的 Source 连接器和 Sink 连接器进行了大量改进外,新版本还新增了两个 Source 连接器,让您可以更灵活地构建流处理管道。

6.1 Iceberg Source

您现在可以使用新增的 Iceberg Source 连接器从 Iceberg Source 中批量读取数据。与 RisingWave 中的所有其他 Source 连接器一样,您可以使用 CREATE SOURCE 命令开始摄取数据。请注意,Iceberg 中的源表必须是 COW 表,不能是已删除的文件。

CREATE SOURCE iceberg_source (
 id bigint,
 user_name varchar
) WITH (
    connector = 'iceberg',
    catalog.type = 'storage',
    warehouse.path = 's3a://hummock001/',
    s3.endpoint = 'http://127.0.0.1:9301',
    s3.access.key = 'admin',
    s3.secret.key = 'admin',
    s3.region = 'us-east-1',
    database.name='db_name',
    table.name='table_name'
);

支持的目录类型包括 storagejdbchive 和 rest

此外,创建 Iceberg Source 时可选择是否指定列。在这种情况下,表中的所有列都会自动派生。

6.2 MongoDB CDC

过去要从 RisingWave 摄取 MongoDB 的 CDC 数据,需要设置一个包括 Debezium 连接器的管道(用于 MongoDB 跟踪数据库变更并将其记录到 Kafka Topic 中)和一个 Kafka 连接器(用于连接到 RisingWave)。而新增的 MongoDB CDC 连接器简化了这一过程,可让您从 RisingWave 直接连接到 MongoDB。只需执行 CREATE TABLE 命令,即可与 MongoDB 建立直接连接。

CREATE TABLE mongocdc(
 _id varchar PRIMARY KEY,
 payload jsonb
) WITH (
 connector = 'mongodb_cdc',
 mongodb.url = 'mongodb://localhost:27017/?replicaSet=rs0',
 collection.name = 'dbname.*, foo.*'
);

您可以选择在 collection.name 参数中从多个数据库的集合中摄取数据,也可从特定集合中摄取数据。

更多详细信息,请参阅:

6.3 SQL 元数据存储

在确保 etcd 的向后兼容的同时, 此次更新将为您带来 PostgreSQL、MySQL 和 SQLite 的技术预览,作为元数据存储的新选项。

PostgreSQL 在处理大量元数据时具有更强的稳健性。例如,当创建超过 1000 个物化视图和 Sink 时,etcd 容易不堪重负且内存不足,而 PostgreSQL 则更为稳定。

在生产环境中,我们建议为 PostgreSQL 部署一个至少配备 2 个 CPU 内核和 4 GB 内存的实例,并进行主动复制以实现高可用性。

更多详细信息,请参阅:

7. 结论

以上只是 RisingWave 1.8 版本新增的部分功能。要查看本次更新的完整列表,包括有关数据格式和系统目录的更多更新,请参阅更为详细的发布说明

8. 关于 RisingWave