likes
comments
collection
share

实践|用 RisingWave、Upstash 和 Metabase 做一个实时航班跟踪系统

作者站长头像
站长
· 阅读数 23

在物流和航空领域,实时数据处理、数据可视化和综合看板至关重要。它们能够提升供应链性能、优化运输并提高效率。特别是对商务航空来说,实时分析可以改善决策并节约成本。它还帮助确保飞行安全、优化空域使用并能协助空中交通管制员。总体而言,实时数据处理可以帮助航空业提升客户体验并改善业务运营。

在本文中,我们使用 RisingWave、Upstash 和 Metabase 建立了一个实时航班跟踪系统。我们利用 Aviationstack API 实时获取航班数据,然后将这些数据传输到 Upstash 的一个 Kafka 主题中。随后,我们将这些数据流摄取到 RisingWave 中,继而创建物化视图 (MV) 以进行深入的航班数据分析。物化视图随时提供最新数据,并可以即时查询。我们还使用 Metabase 创建图表、表格和综合的数据看板,用于实时航班跟踪。

实践|用 RisingWave、Upstash 和 Metabase 做一个实时航班跟踪系统

1. 在 Upstash 中设置 Kafka

Upstash 是一个无服务器平台,提供 Redis、Kafka 和 Qstash 服务,具有可扩展性、高级安全选项和专门支持的优势。Upstash Kafka 使用 Apache Kafka 进行部署,并提供一个无服务器的 Kafka 平台,配备连接器、模式注册表和监控,为有高级需求的客户提供各种计划。

1.1 注册 Upstash 账户

注册免费的 Upstash 云账户,以访问 Kafka 服务。要创建账户,请访问 Upstash Cloud Account[1]。

实践|用 RisingWave、Upstash 和 Metabase 做一个实时航班跟踪系统

1.2 创建 Kafka 集群

登录后,使用以下信息创建 Kafka 集群:

  • Name: 给您的 Kafka 集群一个唯一名称以便识别。
  • Region: 选择您的 Kafka 集群托管的区域。
  • Type: 选择适合您需求的集群类型。

实践|用 RisingWave、Upstash 和 Metabase 做一个实时航班跟踪系统

1.3 设置 Kafka 主题

创建 Kafka 集群后,设置 Kafka 主题。Upstash Kafka 提供一些默认配置,包括分区数量和保留策略,简化了设置过程。

实践|用 RisingWave、Upstash 和 Metabase 做一个实时航班跟踪系统

创建 Kafka 集群和 Kafka 主题后,我们就可以利用 Upstash Kafka 和 RisingWave 的功能来构建流处理应用程序和管道。如果您还想了解更多有关以上流程的信息,请参阅 Upstash Kafka 文档[2]。

我们摄取到 Upstash Kafka 主题的示例数据包含来自航空 API 的实时数据,包括机场名称、航班状态、航班位置等信息。

{
  "flight_date": "2024-05-16",
  "flight_status": "scheduled",
  "departure_airport": "Auckland International",
  "departure_timezone": "Pacific/Auckland",
  "departure_iata": "AKL",
  "departure_icao": "NZAA",
  "departure_terminal": "D",
  "departure_gate": "28",
  "departure_delay": null,
  "departure_scheduled": "2024-05-16T06:30:00+00:00",
  "departure_estimated": "2024-05-16T06:30:00+00:00",
  "departure_actual": null,
  "departure_estimated_runway": null,
  "departure_actual_runway": null,
  "arrival_airport": "Wellington International",
  "arrival_timezone": "Pacific/Auckland",
  "arrival_iata": "WLG",
  "arrival_icao": "NZWN",
  "arrival_terminal": null,
  "arrival_gate": "15",
  "arrival_baggage": null,
  "arrival_delay": null,
  "arrival_scheduled": "2024-05-16T07:40:00+00:00",
  "arrival_estimated": "2024-05-16T07:40:00+00:00",
  "arrival_actual": null,
  "arrival_estimated_runway": null,
  "arrival_actual_runway": null,
  "airline_name": "Singapore Airlines",
  "airline_iata": "SQ",
  "airline_icao": "SIA",
  "flight_number": "SQ4438",
  "flight_iata": "SQ4438",
  "flight_icao": "SIA4438",
  "codeshared_airline_name": "air new zealand",
  "codeshared_airline_iata": "nz",
  "codeshared_airline_icao": "anz",
  "codeshared_flight_number": "401",
  "codeshared_flight_iata": "nz401",
  "flight_info": "Singapore Airlines flight SQ4438 is currently in the air, flying from Auckland International (AKL) to Wellington International (WLG)"
}

2. 将数据从 Upstash Kafka 摄取到 RisingWave

对于摄取和处理流数据,我们有两种可用方法:开源的 RisingWave 和托管服务 RisingWave Cloud。在本文中,我们将重点使用 RisingWave Cloud,它的用户体验更加便捷友好,简化了管理和利用 RisingWave 进行航班跟踪解决方案的操作。

2.1 创建 RisingWave 集群

要在 RisingWave Cloud[3] 中创建 RisingWave 集群并探索各种功能,您可以注册 Free Plan 来免费测试 RisingWave 的功能。有关如何创建 RisingWave 集群并开始使用的详细说明,请参阅官方 RisingWave 文档[4]。它将为您提供设置和探索 RisingWave 功能的分步指导。如果您需要额外的帮助,请加入我们的 Slack 社区[5]。

实践|用 RisingWave、Upstash 和 Metabase 做一个实时航班跟踪系统

2.2 将数据流摄取到 RisingWave

现在我们已经在 Upstash 中(以 JSON 格式)设置了 Kafka 数据流,我们可以使用以下 SQL 语句连接到这些数据流。有关更多信息,请参阅在 RisingWave 中从 Upstash Kafka 摄取数据[6]。

CREATE SOURCE flight_tracking_source(
    flight_date VARCHAR,
    flight_status VARCHAR,

    departure_airport VARCHAR,
    departure_timezone VARCHAR,
    departure_iata VARCHAR,
    departure_icao VARCHAR,
    departure_terminal VARCHAR,
    departure_gate VARCHAR,
    departure_delay INTERVAL,
    departure_scheduled TIMESTAMP WITH TIME ZONE,
    departure_estimated TIMESTAMP WITH TIME ZONE,
    departure_actual TIMESTAMP WITH TIME ZONE,
    departure_estimated_runway TIMESTAMP WITH TIME ZONE,
    departure_actual_runway TIMESTAMP WITH TIME ZONE,

    arrival_airport VARCHAR,
    arrival_timezone VARCHAR,
    arrival_iata VARCHAR,
    arrival_icao VARCHAR,
    arrival_terminal VARCHAR,
    arrival_gate VARCHAR,
    arrival_baggage VARCHAR,
    arrival_delay INTERVAL,
    arrival_scheduled TIMESTAMP WITH TIME ZONE,
    arrival_estimated TIMESTAMP WITH TIME ZONE,
    arrival_actual TIMESTAMP WITH TIME ZONE,
    arrival_estimated_runway TIMESTAMP WITH TIME ZONE,
    arrival_actual_runway TIMESTAMP WITH TIME ZONE,

    airline_name VARCHAR,
    airline_iata VARCHAR,
    airline_icao VARCHAR,

    flight_number VARCHAR,
    flight_iata VARCHAR,
    flight_icao VARCHAR,

    codeshared_airline_name VARCHAR,
    codeshared_airline_i

ata VARCHAR,
    codeshared_airline_icao VARCHAR,
    codeshared_flight_number VARCHAR,
    codeshared_flight_iata VARCHAR,
    flight_info VARCHAR
)
WITH(
connector='kafka',
topic ='flights_tracking',
properties.bootstrap.server ='delicate-herring-9260-us1-kafka.upstash.io:9092',
properties.sasl.mechanism = 'SCRAM-SHA-256',
properties.security.protocol = 'SASL_SSL',
properties.sasl.username = 'xxxxxx',
properties.sasl.password = 'xxxxxx',
scan.startup.mode ='earliest'
)FORMAT PLAIN ENCODE JSON;

通过 CREATE SOURCE 语句,RisingWave 已连接到数据流,但尚未开始消费数据。为了增量处理和存储数据,我们需要创建物化视图。创建物化视图后,RisingWave 将从指定的偏移量开始消费数据。

2.3 设置物化视图以分析航班数据

我们将创建不同的物化视图,这些视图跟踪并提取与 flight_tracking_source 相关的各种航班信息。这些信息包括航班日期、状态、出发和到达详细信息(机场、时区、IATA 码、ICAO 码、计划和估计时间)、航空公司信息(名称、IATA 码、ICAO 码)、航班号和标识符(IATA 和 ICAO 码)以及一般航班信息。

使用物化视图是因为它们始终提供最新数据。例如,以下查询创建了一个名为 Airline_Flight_Counts 的物化视图,该视图按小时间隔计算每个航空公司的航班数量。它使用了上文创建的 flight_tracking_source ,并按航空公司名称和一小时的时间窗口对数据进行分组。

CREATE MATERIALIZED VIEW Airline_Flight_Counts
SELECT airline_name,
COUNT(airline_name) AS total_flights,
window_start, window_end
FROM TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
GROUP BY airline_name,window_start, window_end
ORDER BY total_flights desc;

以下查询创建了一个名为 Airport_Summary 的物化视图,该视图按小时间隔计算每个机场的到达和出发航班总数。计算结果按机场和一小时的时间窗口进行分组,并按航班总数降序排列。

CREATE MATERIALIZED VIEW Airport_Summary
WITH ArrivalCounts AS (
    SELECT
        arrival_airport,
        COUNT(arrival_airport) AS total_flights_arrival,
        window_start,
        window_end
    FROM
        TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
    GROUP BY
        arrival_airport,
        window_start,
        window_end
),
DepartureCounts AS (
    SELECT
        departure_airport,
        COUNT(departure_airport) AS total_flights_departure,
        window_start,
        window_end
    FROM
        TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
    GROUP BY
        departure_airport,
        window_start,
        window_end
)
SELECT
    ArrivalCounts.arrival_airport,
    ArrivalCounts.total_flights_arrival,
    DepartureCounts.departure_airport,
    DepartureCounts.total_flights_departure,
    ArrivalCounts.window_start,
    ArrivalCounts.window_end
FROM
    ArrivalCounts
INNER JOIN
    DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
    AND ArrivalCounts.window_end = DepartureCounts.window_end
    AND ArrivalCounts.arrival_airport = DepartureCounts.departure_airport
ORDER BY
    ArrivalCounts.total_flights_arrival DESC,
    DepartureCounts.total_flights_departure DESC;

以下查询创建了一个名为 Timezone_Summary 的物化视图,该视图按小时间隔计算每个时区的到达和出发航班总数。它使用数据源 flight_tracking_source,按时区和一小时的时间窗口对数据进行分组,然后按航班总数降序排列结果。

CREATE MATERIALIZED VIEW Timezone_Summary
WITH ArrivalCounts AS (
    SELECT
        arrival_timezone,
        COUNT(arrival_timezone) AS total_flights_arrival,
        window_start,
        window_end
    FROM
        TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
    GROUP BY
        arrival_timezone,
        window_start,
        window_end
),
DepartureCounts AS (
    SELECT
        departure_timezone,
        COUNT(departure_timezone) AS total_flights_departure,
        window_start,
        window_end
    FROM
        TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL '1 hour')
    GROUP BY
        departure_timezone,
        window_start,
        window_end
)
SELECT
    ArrivalCounts.arrival_timezone,
    ArrivalCounts.total_flights_arrival,
    DepartureCounts.departure_timezone,
    DepartureCounts.total_flights_departure,
    ArrivalCounts.window_start,
    ArrivalCounts.window_end
FROM
    ArrivalCounts
INNER JOIN
    DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
    AND ArrivalCounts.window_end = DepartureCounts.window_end
    AND ArrivalCounts.arrival_timezone = DepartureCounts.departure_timezone
ORDER BY
    ArrivalCounts.total_flights_arrival DESC,
    DepartureCounts.total_flights_departure DESC;

3. 使用 Metabase 可视化

Metabase 是一个开源的商业智能工具,可以将数据可视化并共享数据见解。它让您能用简单的方法,基于数据库数据,创建各种图表、看板和指标。

3.1 将 RisingWave 连接到 Metabase

由于 RisingWave 兼容 PostgreSQL,您可以将 Metabase 连接到 RisingWave 作为数据源,并在流数据上构建分析。

您可以在 Metabase 中使用 RisingWave 作为数据源,使用 RisingWave 中的表和物化视图创建可视化图表和综合看板。要了解具体步骤,请参阅配置 Metabase 以读取 RisingWave 数据[7]。

成功将 RisingWave 连接到 Metabase 后,我们将 RisingWave 中的物化视图作为数据源添加,以创建表格、各种图表和综合看板。

3.2 使用 Metabase 可视化数据

我们使用在 RisingWave 中的物化视图和源(如 flight_tracking_sourceAirline_Flight_CountsAirport_Summary 和 Timezone_Summary)创建这些表格、图表和综合看板。

实践|用 RisingWave、Upstash 和 Metabase 做一个实时航班跟踪系统

实践|用 RisingWave、Upstash 和 Metabase 做一个实时航班跟踪系统

实践|用 RisingWave、Upstash 和 Metabase 做一个实时航班跟踪系统

实践|用 RisingWave、Upstash 和 Metabase 做一个实时航班跟踪系统

以下是一个综合看板,展示了一系列用于实时航班跟踪的图表。它提供了航班操作的整体视图,提供按航空公司、机场和时区分类的航班总数的洞察。此外,它还提供当前航班的详细信息,让用户得以全面监控并提出见解。

实践|用 RisingWave、Upstash 和 Metabase 做一个实时航班跟踪系统

4. 总结

在本文中,我们使用 Upstash、RisingWave 和 Metabase 开发了一个实时航班跟踪系统。因为 RisingWave 提供了广泛的源和目标连接器,配置和连接变得非常简单。我们将实时航班数据摄取到 Upstash 的 Kafka 主题中,然后将其发送到 RisingWave,并创建物化视图以进行深入分析。最后,我们使用 Metabase 创建了可视化图表和实时看板,使用户能够监控航班运营并做出明智的决策。

5. 关于 RisingWave

👨‍🔬加入 RW 社区,欢迎关注公众号:RisingWave 中文开源社区

转载自:https://juejin.cn/post/7375351908896636947
评论
请登录