likes
comments
collection
share

CDC 实践|使用 PostgreSQL 和 RisingWave 支持实时电子商务运营

作者站长头像
站长
· 阅读数 5

本文将分享如何使用 PostgreSQL CDC 和 RisingWave 建立强大的电子商务数据管道,使得组织能够实时分析动态市场,做出有效运营。PostgreSQL 可充当事务数据的存储库,CDC 则用于捕捉和跟踪数据库变更,以实现高效更新。RisingWave 则用于实现实时分析,如销售监控和库存管理。

变更数据捕获(Change Data Capture,CDC)是数据管理过程中的关键流程,可跟踪并将 Source 系统(如数据库和数据仓库)的实时数据变更复制到目标系统。CDC 旨在确保所有系统的数据完整性,这对于依赖准确数据的企业来说至关重要。CDC 最初用于提取、转换、加载(ETL)作业,现已成为云迁移的首选方法。它能够无缝连接本地和云环境,促进数据流动,保护基础设施投资。 如需更全面地了解 CDC 及其在各种场景中的优势,请参阅:《What Is Change Data Capture (CDC)? Definition, Benefits, and Use Cases》。

CDC 实践|使用 PostgreSQL 和 RisingWave 支持实时电子商务运营

1. 在 PostgreSQL 中设置 CDC 

要在 PostgreSQL 中启用 CDC、设置所需的表以及配置必要的设置,可以按照以下步骤进行:

1. 安装 PostgreSQL。 设置 CDC 之前,请确保系统中已安装 PostgreSQL。您可以从官方网站下载并安装 PostgreSQL,或使用与操作系统兼容的包管理器。

2. 创建表。 要在 PostgreSQL 实例中创建电子商务用例所需的表,可以使用 PostgreSQL 客户端或命令行界面连接 PostgreSQL 数据库。在示例电子商务场景中,我们将创建这些表:

  • users 表:
    • user_id
    • username
    • email
    • registration_date
    • last_login
    • address
  • products 表:
    • product_id
    • product_name
    • brand
    • category
    • price
    • stock_quantity
  • sales 表:
    • sale_id
    • user_id(外键,引用 users 表)
    • product_id(外键,引用 products 表)
    • sale_date
    • quantity
    • total_price

3. 在 PostgreSQL 中启用 CDC。

要在 PostgreSQL 中为 RisingWave 启用 CDC,请按照以下步骤进行操作:

  1. 运行以下命令检查当前的预写式日志(Write-Ahead Log,WAL)级别:

    SHOW wal_level;
    

    默认情况下,该值设置为 replica。但对于 CDC,需要将其更改为 logical。要执行此操作,请修改数据库配置文件(postgresql.conf)或使用 psql 命令。运行以下命令将 wal_level 设置为 logical

    ALTER SYSTEM SET wal_level = logical;
    

    请注意,更改 wal_level 需要重启 PostgreSQL 实例,并可能影响数据库性能。有关详细信息,请参阅 RisingWave PostgreSQL CDC 文档

  2. 修改 postgresql.conf 和 pg_hba.conf 文件中的配置设置,允许 RisingWave 从本地安装的 PostgreSQL 中读取 CDC 数据。

    在 postgresql.conf 文件中,更新以下行:

    listen_addresses = '*'
    

    在 pg_hba.conf 文件中,添加以下几行,以允许逻辑复制所需的连接:

    host    all             all             0.0.0.0/0               md5
    
    # Allow replication connections from any IP address using scram-sha-256 authentication method.
    host    replication     all             0.0.0.0/0               scram-sha-256
    
  3. 完成这些更改后,重启 PostgreSQL 以应用新设置,RisingWave 便能够使用逻辑复制从 PostgreSQL 读取 CDC 数据。

2. 将 RisingWave Cloud 连接到 PostgreSQL

要开始使用 RisingWave,请在 RisingWave Cloud 免费版本中创建一个 RisingWave 集群。有关详细说明,请参阅 RisingWave Cloud 文档

CDC 实践|使用 PostgreSQL 和 RisingWave 支持实时电子商务运营

成功创建 RisingWave 集群后,可以继续在 RisingWave 中创建 Source,以便从 PostgreSQL 中读取 CDC 数据。

我们将使用以下查询来创建 Source,该 Source 连接到本地运行的 PostgreSQL。请确保填写了相应的认证参数。

CREATE SOURCE postgres_source WITH(
   connector='postgres-cdc',
   hostname='127.0.0.1',
   port='5432',
   username='postgres',
   password='qwerty1245',
   database.name='postgres',
   schema.name='public'
);

接下来,您可以创建一个名为 users 的表,该表与 PostgreSQL 中的 public.users 表相对应。此表包含用户信息列,如用户 ID、用户名、电子邮件、注册日期、最后登录时间戳和地址。其数据来自 RisingWave 中的 postgres_source

CREATE TABLE users (
    user_id INTEGER PRIMARY KEY,
    username VARCHAR,
    email VARCHAR,
    registration_date DATE,
    last_login TIMESTAMP,
    address VARCHAR
    )
FROM postgres_source TABLE 'public.users';

同样,您可以创建一个名为 products 的表,其中包含产品详细信息列,如产品 ID、名称、品牌、类别、价格和库存数量,主键设为产品 ID。其数据也来自 RisingWave 中的 postgres_source 和名为 public.products 的 PostgreSQL 表:

CREATE TABLE products (
    product_id INTEGER,
    product_name VARCHAR,
    brand VARCHAR,
    category VARCHAR,
    price NUMERIC,
    stock_quantity INTEGER,
    PRIMARY KEY (product_id)
)
FROM postgres_source TABLE 'public.products';

最后,您可以创建一个名为 sales 的表,其中包含销售详细信息列,如销售 ID、用户 ID、产品 ID、销售日期、销售数量和总价,主键设为销售 ID。其数据将来自 RisingWave 中的 postgres_source 和名为 public.sales 的 PostgreSQL 表:

CREATE TABLE sales (
    sale_id INTEGER PRIMARY KEY,
    user_id INTEGER,
    product_id INTEGER,
    sale_date DATE,
    quantity INTEGER,
    total_price NUMERIC   
)
FROM postgres_source TABLE 'public.sales';

3. RisingWave 中的数据分析

成功创建 postgres_source Source 以及相关的 users 表、products 表和 sales 表后,就可以在 RisingWave 中分析 CDC 数据了。

要计算 1 分钟间隔内产品的销售数据合计,并从结果中选择特定列进行分析或报告,可以使用以下查询。该查询使用名为 product_sales 的通用表表达式(Common Table Expression,CTE)来执行计算并检索所需的列:

WITH product_sales AS (
    SELECT
        window_start,
        window_end,
        s.sale_id,
        p.product_id,
        SUM(s.total_price) AS total_sales,
        AVG(s.total_price) AS average_revenue_per_sale
    FROM
        TUMBLE (sales, sale_timestamp, INTERVAL '1 MINUTES') as s
    JOIN
        products p ON s.product_id = p.product_id
    GROUP BY
        s.sale_id,
        p.product_id,
        s.window_start,
        s.window_end
)
SELECT
    ps.sale_id,
    ps.product_id,
    ps.total_sales,
    ps.average_revenue_per_sale,
    ps.window_start,
    ps.window_end
FROM
    product_sales ps;

要根据最后登录时间计算 1 分钟间隔内不同活跃用户的数量,可以使用以下查询。该查询使用 TUMBLE 函数定义时间窗口,并按用户 ID、窗口开始时间和窗口结束时间对结果进行分组,以计算每个间隔内的活跃用户总数:

SELECT
    COUNT(DISTINCT user_id) AS total_active_users,
    window_start,
    window_end
FROM 
     TUMBLE (users, last_login, INTERVAL '1 MINUTES')
GROUP BY
    user_id,
    window_start,
    window_end;

要检索 1 分钟间隔内按销售总量排名前五的产品,可以使用以下查询。该查询使用 TUMBLE 函数,根据销售时间戳定义时间窗口,并连接 sales 表和 products 表以收集有关产品类别、名称和销售数量的数据。然后按类别、产品名称、窗口开始时间和窗口结束时间对结果进行分组。查询按总销售量降序排序,且仅限于前五种产品:

SELECT
    p.category,
    p.product_name,
    SUM(s.quantity) AS total_quantity_sold,
    window_start,
    window_end
FROM
    TUMBLE (sales, sale_timestamp, INTERVAL '1 MINUTES') as s
JOIN
    products p
ON
    s.product_id = p.product_id
GROUP BY
    p.category, p.product_name, 
    window_start,
    window_end
ORDER BY
    total_quantity_sold DESC
LIMIT
    5;

要根据 1 分钟间隔内的消费总额计算前 10 名用户,可以使用以下查询。该查询使用 TUMBLE 函数,根据用户登录时间定义时间窗口,并连接 users 表和 sales 表以收集用户信息和销售数据。然后按用户 ID、用户名、电子邮件、窗口开始时间和窗口结束时间对结果进行分组。查询按总消费金额降序排序,且仅限于前 10 名用户:

SELECT
        u.user_id,
        u.username,
        u.email,
        SUM(s.total_price) AS total_spent,
        window_start,
        window_end
    FROM
        TUMBLE (users, last_login, INTERVAL '1 MINUTES') as u
    JOIN
        sales s
    ON
        u.user_id = s.user_id
    GROUP BY
        u.user_id,
        u.username,
        u.email,
        window_start,
        window_end
    ORDER BY
    total_spent DESC
LIMIT
    10;

4. 结论

本文探讨了如何将 PostgreSQL CDC 和 RisingWave 结合起来,以建立强大的电子商务数据管道。PostgreSQL 可作为事务数据的存储,CDC 可有效捕获和跟踪变更,RisingWave 则用于实现实时分析,如销售监控和库存管理。此外,RisingWave 还可用于数据预处理,以支持下游个性化推荐系统。

这种架构使得电子商务系统能够灵活应对动态的市场条件,提高企业的即时洞察力,并提升其明智决策的能力。通过 PostgreSQL、CDC 和 RisingWave 的协同工作,企业可以建立数据管道,从而有效支持电子商务运营。

5. 关于 RisingWave