likes
comments
collection
share

技术内幕|做一个维基百科实时监控玩玩吧!

作者站长头像
站长
· 阅读数 2

在这篇博文中,我会介绍一种流处理解决方案,用于全面且动态地了解维基百科的编辑情况。大家也可以用于其他平台~

  1. 从维基百科 API 中提取数据,并将其传输到部署在 Instaclustr 云中的 Kafka Topic。
  2. RisingWave 创建了 Source 以摄取 Kafka 数据,并创建物化视图进行处理分析。
  3. 利用 Superset 对结果进行可视化,生成各类图表和综合看板。

RisingWave 是一个与 PostgreSQL 兼容的流数据库,具有真正的云原生架构,拥有低成本高效益、可扩展等特点。基于 RisingWave,用户仅使用 SQL 就能从流数据中获取目标信息。

Instaclustr 是一个集成了众多流行开源工具(如 Kafka、PostgreSQL、Cassandra 和 Redis)的完全托管平台。它提供了方便的 Kafka Connect 集成,包括专用的 ZooKeeper 服务。通过这种 100% 开源方案,Instaclustr  提供了无缝使用 Kafka 的体验。

1. 技术栈

我们将从 Wikipedia API 获取实时数据,捕获维基百科文章的编辑和贡献者信息,然后将它们发布到 Kafka Topic。

随后,数据将被导入 RisingWave。基于这些数据,我们将创建物化视图来执行一系列操作,如聚合、时间窗口操作、数据转换等,以从数据中提取有价值的信息。

最后,我们会把处理后的数据从 RisingWave 导出到 Apache Superset,从而将数据可视化,用更具体直观的方式查看贡献者们对维基百科的实时编辑。

技术内幕|做一个维基百科实时监控玩玩吧!

2. 在 Instaclustr 云上部署 Kafka

为开始生成事件,我们需要一个 Kafka 集群。在本文的演示中,我们将用 Instaclustr 云创建一个 Kafka 集群。

2.1 创建 Kafka 集群

首先,请注册免费的 Instaclustr 账号以获得访问 Kafka 服务的权限。您可以通过访问 Instaclustr 云平台来创建账户。

技术内幕|做一个维基百科实时监控玩玩吧!

接着,请参考 Instaclustr 云提供的 Apache Kafka 快速上手指南,在 Instaclustr 云上创建一个 Kafka 集群。

成功创建 Kafka 集群后,请添加您计算机的 IP 地址到集群中,以便产生和使用数据。

技术内幕|做一个维基百科实时监控玩玩吧!

2.2 将维基百科编辑数据传输到 Kafka

我们将首先使用维基百科 Python API 来获取各种信息,如用户贡献、用户详细信息和最近更改。

随后,我们会把这些数据传输到 Instaclustr 云上的 Kafka 集群中,以便后续将数据导入 RisingWave。

我们的 JSON 消息将遵循以下 schema:

"contributor": 维基百科贡献者的名字。
"title": 所编辑的维基百科文章的标题。
"edit_timestamp": 编辑的时间戳。
"registration": 该维基百科用户的注册日期。
"gender": 该维基百科用户的性别。
"edit_count": 该维基百科用户的编辑次数。

以下是一个发送到 Kafka Topic 的消息样本:

{
  "contributor": "Teatreez",
  "title": "Supreme Court of South Africa",
  "edit_timestamp": "2023-12-03 18:23:02",
  "registration": "2006-12-30 18:42:21",

  "gender": "unknown",
  "edit_count": "10752"
}

3. 连接 RisingWave 与 Kafka Topic

要使用 RisingWave,请参考快速上手指南创建一个 RisingWave 集群。

随后,为了 RisingWave 和 Instaclustr 能成功连接,请先前往 Instaclustr,将您的 RisingWave 集群的 NAT 网关 IP 地址添加到 Instaclustr 云中 Kafka 集群的防火墙规则(Firewall Rules)中。这一步有利于避免潜在的连接错误。

成功创建 RisingWave 集群后,我们在 RisingWave 中创建一个 Source,用于从 Instaclustr 云中的 Kafka Topic 导入数据到RisingWave。

请使用以下查询创建一个连接到 Instaclustr 云中 Kafka Topic 的 Source,注意,各认证参数需要准确填写对应的值。

CREATE SOURCE wiki_source (
  contributor VARCHAR,
  title VARCHAR,
  edit_timestamp TIMESTAMPTZ,
  registration TIMESTAMPTZ,
  gender VARCHAR,
  edit_count VARCHAR
) WITH (
  connector = 'kafka',
  topic='Insta-topic',
  properties.bootstrap.server = 'x.x.x.x:9092',
  scan.startup.mode = 'earliest',
  properties.sasl.mechanism = 'SCRAM-SHA-256',
  properties.security.protocol = 'SASL_PLAINTEXT',
  properties.sasl.username = 'ickafka',
  properties.sasl.password = 'xxxxxx'
) FORMAT PLAIN ENCODE JSON;

然后,我们基于 Source  wiki_source 创建一个名为 wiki_mv 的物化视图。注意,以下代码中,我们过滤掉了带有空值的行。

CREATE MATERIALIZED VIEW wiki_mv AS
SELECT
  contributor,
  title,
  CAST(edit_timestamp AS TIMESTAMPAS edit_timestamp,
  CAST(registration AS TIMESTAMPAS registration,
  gender,
  CAST(edit_count AS INTAS edit_count
FROM wiki_source
WHERE timestamp IS NOT NULL
  AND registration IS NOT NULL
  AND edit_count IS NOT NULL;

现在,我们可以查询物化视图,获取 Source 中的最新数据:

SELECT * FROM wiki_mv LIMIT 5;

返回结果将类似如下:

contributor    |   title                     |     edit_timestamp        |       registration        | gender  | edit_count
---------------+-----------------------------+---------------------------+---------------------------+---------+-----------

Omnipaedista   | Template:Good and evil      | 2023-12-03 10:22:02+00:00 | 2008-12-14 06:02:32+00:00 | male    | 222563
PepeBonus      | Moshi mo Inochi ga Egaketara| 2023-12-03 10:22:16+00:00 | 2012-06-02 13:39:53+00:00 | unknown | 20731
Koulog         | Ionikos F.C.                | 2023-12-03 10:23:00+00:00 | 2023-10-28 05:52:35+00:00 | unknown | 691
Fau Tzy        | 2023 Liga 3 Maluku          | 2023-12-03 10:23:17+00:00 | 2022-07-23 09:53:11+00:00 | unknown | 4697
Cavarrone      | Cheers (season 8)           | 2023-12-03 10:23:40+00:00 | 2008-08-23 11:13:14+00:00 | male    | 83643

(5 rows)

接下来,我们再创建几个查询:

以下查询创建了一个新的物化视图 gender_mv,将物化视图 wiki_mv 中的贡献按一分钟间隔进行聚合。该物化视图计算了多个数据,包括每个时间窗口内的总贡献数、未知性别者的贡献数,以及已知性别者的贡献数。基于此物化视图,我们可以更方便地基于性别对贡献模式进行分析和监控。

CREATE MATERIALIZED VIEW gender_mv AS
SELECT COUNT(*AS total_contributions,
COUNT(CASE WHEN gender = 'unknown' THEN 1 ENDAS contributions_by_unknown,
COUNT(CASE WHEN gender != 'unknown' THEN 1 ENDAS contributions_by_male_or_female,
window_start, window_end
FROM TUMBLE (wiki_mv, edit_timestamp, INTERVAL '1 MINUTES')
GROUP BY window_start, window_end;

以下查询创建了物化视图 registration_mv ,它同样将物化视图 wiki_mv 中的贡献按一分钟间隔进行聚合,计算的信息包括:总贡献数、2020年1月1日之前注册账户的贡献数,以及2020年1月1日之后注册账户的贡献数。

CREATE MATERIALIZED VIEW registration_mv AS
SELECT COUNT(*AS total_contributions,
COUNT(CASE WHEN registration < '2020-01-01 01:00:00'::timestamp THEN 1 ENDAS contributions_by_someone_registered_before_2020,
COUNT(CASE WHEN registration > '2020-01-01 01:00:00'::timestamp THEN 1 ENDAS contributions_by_someone_registered_after_2020,
 window_start, window_end
FROM TUMBLE (wiki_mv, edit_timestamp, INTERVAL '1 MINUTES')
GROUP BY window_start, window_end;

以下查询创建了物化视图 count_mv,将 wiki_mv 物化视图中的贡献按一分钟间隔进行聚合,然后计算:总贡献数、编辑次数少于 1000 次的贡献者的贡献数,以及编辑次数大于等于 1000 次的贡献者的贡献数。

CREATE MATERIALIZED VIEW count_mv AS
SELECT
    COUNT(*AS total_contributions,
    COUNT(CASE WHEN edit_count < 1000 THEN 1 ENDAS contributions_less_than_1000,
    COUNT(CASE WHEN edit_count >= 1000 THEN 1 ENDAS contributions_1000_or_more,
    window_start, window_end
FROM TUMBLE(wiki_mv, edit_timestamp, INTERVAL '1 MINUTES')
GROUP BY window_start, window_end;

4. 将数据从 RisingWave 导出到 Apache Superset 进行可视化

Superset 是一个用于创建看板和可视化内容的开源工具。接下来我们将配置 Superset,从 RisingWave 读取数据并导出到 Superset 进行可视化。

4.1 连接 RisingWave 和 Superset

请按照 RisingWave 文档中的指南,配置 Superset 从 RisingWave 读取数据。在此过程中,我们会把 RisingWave 作为数据源添加到 Apache Superset 中,并使用其中的表和物化视图创建可视化和看板。

成功将 RisingWave 连接到 Apache Superset 后,您可以按照该指南剩下部分的指导,将 RisingWave 中的物化视图作为数据集进行添加,创建表、图表和整合后的看板。

4.2 可视化结果展示:表、图表和看板

下表使用 wiki_mv 数据集生成,显示了 Wikipedia 贡献者的名字、注册日期、性别、编辑次数以及贡献者编辑过的 Wikipedia 文章等信息。

技术内幕|做一个维基百科实时监控玩玩吧!

以下面积图使用 count_mv 数据集创建,展示了指定时间窗口内的:总贡献数、编辑次数少于 1000 次的贡献者的贡献数,以及编辑次数大于等于 1000 次的贡献者的贡献数。技术内幕|做一个维基百科实时监控玩玩吧! 以下折线图基于 gender_mv 数据集生成,展示了指定时间窗口内的:总贡献数、未知性别者的贡献,以及已知性别者的贡献。

技术内幕|做一个维基百科实时监控玩玩吧!

下图则使用 registration_mv 数据集创建,在 1 分钟的时间窗口内可视化了各种类型的贡献计数,包括:总贡献数、2020年1月1日之前注册的用户的贡献数,以及2020年1月1日之后注册的用户的贡献数。

技术内幕|做一个维基百科实时监控玩玩吧!

最后,以下是一个整合了以上各项图表的看板,让您可以全面、实时地监控 Wikipedia 编辑信息,以全面地挖掘贡献者及其所编辑文章相关的重要信息。

技术内幕|做一个维基百科实时监控玩玩吧!

5. 关于 RisingWave

RisingWave 是一款分布式 SQL 流处理数据库,旨在帮助用户降低实时应用的的开发成本。作为专为云上分布式流处理而设计的系统,RisingWave 为用户提供了与 PostgreSQL 类似的使用体验,并且具备比 Flink 高出 10 倍的性能以及更低的成本。