流数据库——Zero-ETL或者Near-Zero-ETL在第七章中,我们介绍了新兴的混合数据库,这些数据库提供了支持实
在第七章中,我们介绍了新兴的混合数据库,这些数据库提供了支持实时分析的替代解决方案。这些系统减少了基础设施需求,使数据对分析工作负载更具可及性。由于混合系统融合了传统上分布式的系统,因此有一种假设认为混合系统趋向于单体系统。单体系统通常被认为在执行数据工作负载时缺乏模块化和可扩展性。
具有讽刺意味的是,将一个单体数据系统拆解会使我们重新将数据库分解并内部调节,以便特定地扩展单个组件。这并不一定是一种负面的解决方案。在本书中,我们一直在提议将这些系统重新放回数据库中,以减少传统上与大型分布式系统相关的复杂性和成本。
ETL(提取、转换、加载)是我们在系统间移动数据的方式,同时对数据进行转换。到目前为止,我们使用了一种称为流式SQL的ETL形式。在本章中,我们将讨论如何在ETL的实施中平衡复杂性和可扩展性,通过考察现有系统和今天用于分发和扩展数据工作负载的模式。
ETL模型
图8-1展示了现有的ETL解决方案,从顶部的HTAP数据库中的无ETL到底部的将数据库彻底转变的分布式解决方案。解决方案在三角形中的位置越低,其分布式和复杂性越高。同样,位于顶部的解决方案更加集中和单体,而向下移动时,它们变得更加分散和模块化。
在三角形的左侧是事务型数据库,而右侧是列式数据库。三角形中部则是零ETL(Zero-ETL)。
零ETL
零ETL是Amazon Web Services(AWS)首次定义的模式,用于简化从OLTP数据库到OLAP数据库的数据集成。在其提案中,零ETL被定义为:
[一个] 集成集合,旨在消除或最小化构建ETL数据管道的需求。ETL(抽取、转换、加载)是将来自不同来源的数据进行组合、清洗和标准化的过程,以准备好用于分析、人工智能(AI)和机器学习(ML)工作负载。——AWS, “什么是零ETL?”
零ETL指的是一种数据集成和分析的方法或概念,旨在最小化甚至消除对传统ETL流程的需求。传统的ETL流程涉及从源系统中提取数据,转换数据以满足目标系统的要求,然后将其加载到目标系统中。有关架构总结,请参见图8-2。
AWS 的零ETL解决方案相当于在Amazon的OLTP数据库Aurora和数据仓库Redshift之间进行的托管集成。近实时分析可以在几分钟内实施。这是一个完全托管的解决方案,使事务数据在写入Aurora之后可以在Redshift中使用。
AWS能够紧密集成Aurora和Redshift,因为它拥有这两个数据库产品,并且它们仅存在于其云平台上。AWS可以在其提供的任何两个系统之间建立本地集成。缺点是这些解决方案在AWS以外不可用。
虽然零ETL概念在敏捷性、降低延迟和节省成本方面可以提供好处,但它可能并不适合所有场景。大多数组织,特别是那些具有复杂数据集成需求或监管约束的组织,可能仍然需要传统ETL过程的某些元素。
在第2章中,我们指出,在数据仓库中转换数据(也称为ELT)会强制进行批处理语义,这将为任何实时分析用例增加延迟。在图8-2中,Aurora和Redshift之间的集成点不会转换数据。这意味着转换是在Redshift中完成的。HTAP数据库和零ETL解决方案都存在这个问题。它们都需要在数据到达数据仓库后触发批处理转换过程。
另外,数据分析师提交的分析查询可以包括所需的转换,但这会使查询变得非常缓慢。最终,如果没有流处理组件,你不能创建一个物化视图来在推送和拉取查询之间分配分析工作负载。
零ETL挑战了传统的集成方法,并寻求减少ETL相关的复杂性、延迟和资源需求。表8-1列出了零ETL的一些关键方面。
表8-1. 零ETL
关键方面 | 描述 |
---|---|
实时数据集成 | 最小化或消除批处理,以实现实时或近实时的数据集成。这在及时洞察至关重要的场景中特别相关。 |
按需模式 | 采用按需模式,其中数据在ETL过程中不会转换为预定义模式,而是在分析时解释。这允许在处理多样化和变化的数据时具有更多灵活性。 |
数据虚拟化 | 利用数据虚拟化技术,提供跨多个源的统一和虚拟的数据视图,而不实际移动或转换数据。这可以减少创建和维护单独数据仓库的需求。 |
内数据库处理 | 在数据所在的数据库系统中直接执行转换和分析,避免提取和移动大型数据集以进行处理的需要。 |
事件驱动架构 | 采用事件驱动架构,其中数据变化触发即时更新,减少对定期批处理过程的依赖。 |
现代数据架构 | 采用现代数据架构,如数据湖和基于云的解决方案,提供可扩展和经济高效的数据管理和分析选项,而不受传统ETL瓶颈的限制。 |
最终,决定是否采用零ETL方法取决于数据的性质、业务需求和可用的技术环境。重要的是要仔细评估权衡利弊,选择符合组织目标和优先级的方法。
如果你需要更多传统ETL管道提供的灵活性,可以考虑使用近零ETL(near-zero-ETL)。
近零ETL
近零ETL(Near-Zero-ETL)仍试图在限制ETL组件基础设施的同时,不失去支持复杂数据集成需求所需的灵活性。这涉及使用采用混合方法的数据系统。
一种解决方案是利用具有嵌入式功能的OLTP数据库,将数据发送到其他系统,而不需要在单独的基础设施上运行自管理的连接器。
图8-3展示了两个具有嵌入式功能、提供复杂数据集成所需灵活性的数据库。这包括一个OLTP数据库和一个OLAP数据库。PeerDB使Postgres能够将数据流发送到流平台上的主题。Timeplus/Proton在数据被服务之前,提供了所需的转换。Timeplus/Proton提供了物化视图,允许推送和拉取查询的区分。这种方法为快速实时分析查询提供了更多灵活性。
PeerDB
PeerDB 是一个开源解决方案,用于将数据从 Postgres 流式传输到数据仓库、队列/主题和其他存储引擎。它的目标是通过在构建分析系统集成时提供数据库体验来简化 ETL。
在 PeerDB 中,PEER 是指 PeerDB 可以查询的数据库连接。PEER 是通过 CREATE PEER 命令创建的。参见示例 8-1。
示例 8-1. 在 PeerDB 中设置来自另一个 Postgres 数据库的对等连接
CREATE PEER source FROM POSTGRES WITH
(
host = 'catalog',
port = '5432',
user = 'postgres',
password = 'postgres',
database = 'source'
);
CREATE PEER sf_peer FROM SNOWFLAKE WITH
(
account_id = '<snowflake_account_identifier>',
username = '<user_name>',
private_key = '<private_key>',
password = '<password>', -- 仅在私钥加密时提供
database = '<database_name>',
schema = '<schema>',
warehouse = '<warehouse>',
role = '<role>',
query_timeout = '<query_timeout_in_seconds>'
);
-- 查询 Snowflake 中的表
SELECT * FROM sf_peer.MY_SCHEMA.MY_TABLE;
创建一个连接到另一个 Postgres 数据库的对等连接。
创建一个连接到 Snowflake 数据仓库的对等连接。
您可以从对等连接中选择任何数量的表。
对等连接允许您查询对等数据库中的任何表,并将其与事务数据连接起来。系统间的无缝集成是有效的,但当对等数据库存在于远程区域时,会变得复杂。PEER 作为拉取查询,从分析系统中同步地拉取数据。这要求两个系统存在于同一区域或数据中心。
在其他数据库中,包括流式 OLTP 数据库,您必须构建数据管道以从数据仓库或 OLAP 数据库中提取数据,并将其转移到 OLTP 数据库中。在 Postgres 中使用 PeerDB 构建对等连接,使从分析平面提取数据变得更容易。但也有一些限制。
Postgres 无法存储大量数据,也不适合进行分析查询。因此,使用 PeerDB 在 Postgres 上构建物化视图并不是最佳选择。分析数据需要减少到适合 OLTP 数据库容量的大小。这意味着执行重型转换的推送查询需要在流处理器上外部执行。我们将在本章稍后部分讨论这一点。
示例 8-2. 使用 PeerDB 的 MIRROR 创建 ETL 到分析系统
CREATE MIRROR <mirror_name> [IF NOT EXISTS] FROM
<source_peer> TO <target_peer> FOR
$$
SELECT * FROM <source_table_name> WHERE
<watermark_column> BETWEEN {{.start}} AND {{.end}}
$$
WITH (
destination_table_name = '<schema_qualified_destination_table_name>',
watermark_column = '<watermark_column>',
watermark_table_name =
'<table_on_source_on_which_watermark_filter_should_be_applied>',
mode = '<mode>',
unique_key_columns = '<unique_key_columns>',
parallelism = <parallelism>,
refresh_interval = <refresh_interval_in_seconds>,
sync_data_format = '<sync_data_format>',
num_rows_per_partition = 100000,
initial_copy_only = <true|false>,
setup_watermark_table_on_destination = <true|false>
);
不幸的是,镜像不支持转换。转换需要在镜像之前或之后执行。在镜像之前执行转换可能会导致 OLTP 数据库资源的广泛使用,因为它的目的是处理操作工作负载。此外,这个过程将作为批处理过程执行,而不是实时执行。在镜像之后执行转换也会需要批处理语义,除非使用像 Proton 这样的流式 OLAP 数据库。
Proton
第7章中的 Venn 图展示了重叠的混合系统。其中之一是流式 OLAP 数据库。Proton 是一种下一代 RTOLAP 数据库,允许状态流式摄取,并提供两个用于消费实时分析的 API:异步变化流和同步拉取查询。可以在摄取时实现复杂的转换,以构建物化视图。
在前一节的图8-3中,PeerDB 将数据写入 Proton 订阅的流平台中的一个主题。Proton 可以在数据被物化为拉取查询之前执行复杂的推送查询。
首先,我们为流平台(在这种情况下是 Kafka)创建一个 PEER。参见示例 8-3。
示例 8-3. 在 PeerDB 中设置对 Kafka 的对等连接
CREATE PEER <eh_peer_name> FROM KAFKA WITH (
bootstrap_server = '<bootstrap-servers>'
);
CREATE MIRROR <mirror_name> [IF NOT EXISTS] FROM
<source_peer> TO <target_peer> FOR
$$
SELECT * FROM <source_table_name> WHERE
<watermark_column> BETWEEN {{.start}} AND {{.end}}
$$
WITH (
destination_table_name = '<topic>'
);
在 Proton 中,创建一个从 Kafka 读取的流。参见示例 8-4。
示例 8-4. 从 Kafka 到 Proton 创建一个流
CREATE EXTERNAL STREAM frontend_events(raw string)
SETTINGS type='kafka',
brokers='<bootstrap-servers>',
topic='<topic>'
所有流式数据库提供两种输出模式:同步拉取查询和异步推送到主题。这为开发者提供了两种将实时分析带回操作平面的方式,通过在其应用程序中嵌入 OLAP 数据库。
嵌入式 OLAP
有一种趋势是将较小的分析工作负载更接近操作平面。例如,HTAP 数据库如 Hydra 和 SingleStore 提供了用于分析工作负载的列式数据库。然而,由于其容量有限,这些数据库无法容纳像 Snowflake、Databricks、ClickHouse 和 Pinot 等分析系统可以处理的数据量。
相反,将庞大的分析系统引入操作平面以更快地提供实时分析,使得分析系统更难获取历史数据。这些限制造成了操作数据平面和分析数据平面之间的数据鸿沟。
另一种解决方案是将分析数据减少到适合业务领域范围和操作平面容量的大小。
在图 8-4 中,实时操作数据可以通过 PeerDB 发送到一个主题。Proton 可以在飞行中摄取并转换数据,使用推送查询生成一个物化视图。
Proton 中物化视图的变化可以写入一个主题,供原始应用程序消费,并在嵌入式 OLAP 数据库如 DuckDB 或 chDB 中构建本地副本。
DuckDB
DuckDB 是一个嵌入式 OLAP 数据库,旨在支持分析查询工作负载。嵌入式意味着它可以在应用程序内运行,例如在微服务中。DuckDB 使应用程序用户能够对数据进行切片和切块操作。
通过来自 Proton 的变更数据并由微服务消费,可以在 DuckDB 中创建物化视图的本地副本,如示例 8-5 所示。
示例 8-5. 在 Python 微服务中安装 DuckDB
pip install duckdb
示例 8-6. 从 Kafka 读取并写入 DuckDB 的微服务伪代码
import duckdb
from threading import Thread, current_thread
from fastapi import BackgroundTasks, FastAPI
from confluent_kafka import Consumer
app = FastAPI()
duckdb_con = duckdb.connect('my_peristent_db.duckdb')
def upsert(msg):
# 实现 UPSERT 逻辑
def kafka2olap(conf):
consumer = Consumer(conf)
try:
consumer.subscribe("my_data")
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
# 处理错误
else:
upsert(msg)
finally:
# 关闭消费者以提交最终偏移量
consumer.close()
@app.on_event("startup")
async def initialize():
conf = # Kafka 配置
thread = Thread(
target = kafka2olap,
args = (conf,)
)
thread.start()
@app.get("/my_data/")
async def read_item(id:int):
results = local_con.execute("""
SELECT
id,
count(*) AS row_counter,
current_timestamp
FROM my_data
where id = ?
""", (id,)).fetchall()
- 创建 DuckDB 连接。
- 定义一个函数来插入/更新 DuckDB。
- 定义一个函数从 Kafka 读取并写入 DuckDB。
- 创建一个异步线程来从 Kafka 读取并写入 DuckDB。
- 使用 FastAPI 通过 REST API 读取 DuckDB。
DuckDB 有一个功能可以检查插入记录时是否存在冲突。可以使用语句 INSERT OR REPLACE
,如果记录已存在,则会执行更新操作。
示例 8-7. 在 Python 中为 DuckDB 安装 UPSERT
def upsert(msg):
# 反序列化 msg 以获取列值
primary_key, col1_value, col2_value = deserialize_message(msg)
duckdb_con.execute("""
INSERT OR REPLACE INTO t1(id, col1, col2) VALUES(1, ?, ?)
""",
[primary_key, col1_value, col2_value]
);
INSERT OR REPLACE
是在 DuckDB 中处理 UPSERT 的方式。
chDB
类似于 DuckDB,chDB 也是一个可嵌入的 OLAP 数据库。chDB 基于 ClickHouse。ClickHouse 通过利用引擎支持 UPSERT。一个特定的引擎是 ReplacingMergeTree 表引擎。该引擎在合并过程中去除重复记录。ReplacingMergeTree 是模拟 UPSERT 行为的良好选项(即你希望查询返回最后插入的行)。详见 示例 8-8。
示例 8-8. 支持 UPSERT 的 ClickHouse 引擎
CREATE TABLE hackernews_rmt (
id UInt32,
author String,
comment String,
views UInt64
)
ENGINE = ReplacingMergeTree
PRIMARY KEY (author, id);
SELECT *
FROM hackernews_rmt
FINAL;
- ReplacingMergeTree 引擎模拟 UPSERT 行为。
- FINAL 关键字返回最新记录。
示例 8-9 展示了如何使用 Flask 和 chDB 创建微服务。Flask 是用于构建微服务的 FastAPI Python 模型的替代方案。
示例 8-9. 使用 Flask 和 chDB 的微服务包装
from flask import Flask, request
import chdb
import os
# chdb API 服务器示例,支持 GET/POST,与 play 应用程序兼容
# 完整的服务器示例见 https://github.com/metrico/chdb-server
app = Flask(__name__, static_folder="", static_url_path="")
@app.route('/', methods=["GET"])
def clickhouse():
query = request.args.get('query', default="", type=str)
format = request.args.get('default_format', default="JSONCompact", type=str)
if not query:
return "Query not found", 400
res = chdb.query(query, format)
return res.bytes()
@app.route('/', methods=["POST"])
def play():
query = request.data
format = request.args.get('default_format', default="JSONCompact", type=str)
if not query:
return "Query not found", 400
res = chdb.query(query, format)
return res.bytes()
@app.errorhandler(404)
def handle_404(e):
return "Not found", 404
host = os.getenv('HOST', '0.0.0.0')
port = os.getenv('PORT', 8123)
app.run(host=host, port=port)
- 创建一个 DuckDB 连接。
- 定义一个插入/更新函数。
- 定义一个从 Kafka 读取并写入 DuckDB 的函数。
- 创建一个异步线程来处理 Kafka 到 DuckDB 的数据流。
- 使用 Flask 创建支持 GET/POST 的 REST API。
数据引力与复制
通常,分析系统只存在于单一区域或数据中心,因为像 Snowflake 这样的分析基础设施往往成本高昂。这迫使所有操作系统将数据发送到单一区域,这种情况被称为数据引力。
数据引力是指数据具有质量,随着数据量和重要性的增加,移动或复制数据变得困难。这种引力影响数据的创建和交换,并进一步影响应用程序、服务器和其他数据。典型的解决方案是只复制减少后的分析数据。
通过将物化视图中的变更提供给操作平面系统,你可以将实时分析的副本分布到所有用户应用程序部署的区域。
分析数据减少
如何减少表示 PB 级历史数据的分析数据?将分析数据减少到适合操作平面的规模听起来困难,但实际上很简单,因为我们以前做过类似的事情。我们可以使用推送和拉取模式来处理物化视图。
通过在位于分析平面的分析系统中创建物化视图,我们可以将物化视图的变更流式传输到操作平面。Proton 正是可以做到这一点。它可以将物化视图的变更写入一个主题。该主题随后可以被具有嵌入式 OLAP 的应用程序消费,用于分析工作负载。同样,OLTP 流数据库也可以从相同的主题中消费数据,并从其行存储中服务。然而,行存储可能会增加分析查询的延迟。
近零 ETL 方法可以在复杂性和可扩展性之间找到合适的平衡。为了全面了解,让我们看看在 lambda 架构中使用单独的流处理器和 OLAP 数据库来处理分析数据所需的内容。
Lambda 架构
Lambda 架构是一种数据处理架构,旨在结合批处理和实时/流数据处理。它由 Nathan Marz 在其 2011 年的书籍《大数据:可扩展实时数据系统的原则与最佳实践》(Manning)中介绍,用于解决为大数据应用提供强大且可扩展的数据处理的挑战。术语“lambda”源自希腊字母,形状类似于倒置的“y”,代表批处理和实时数据的双重处理路径。
Lambda 架构包括三个主要层级:
批处理层 该层负责以批处理方式处理大量数据。它对整个数据集进行预计算,并将结果存储在批处理服务层中,使其适用于复杂的分析和历史查询。批处理通常使用像 Apache Hadoop 这样的技术,这些技术能够处理大规模分布式数据处理。
速度层 速度层处理实时数据。它专注于低延迟处理,处理尚未被批处理层处理的最新数据。速度层的结果与批处理层的结果相结合,提供数据的完整、最新视图。技术如 Apache Storm 或 Apache Flink 常用于速度层的实时处理。
服务层 服务层将批处理层和速度层的结果结合起来,提供统一的数据视图。它处理来自用户或应用程序的查询和分析请求。服务层通常使用可扩展的 NoSQL 数据库(如 Apache HBase 或 Apache Cassandra)构建,以高效处理读密集型工作负载。
Lambda 架构的优势在于能够处理批处理和实时处理,提供了一个全面的大数据分析解决方案。然而,管理和维护两个独立的处理路径可能引入复杂性,确保批处理和实时视图之间的一致性也可能具有挑战性。一些替代架构,如 Kappa 架构,提出了统一流处理和批处理的方法,旨在简化整体系统设计。
使用单独的流处理器和 OLAP 数据库也是一种选择。例如,你可以使用 Apache Pinot 来处理 PB 级的历史数据,并与使用 Flink(或 Pathway,如果你更喜欢 Python)转化的流数据一起使用。
在图 8-5 中,Lambda 架构被展示出来。图的左侧是批处理层,右侧是速度层,服务层是 Apache Pinot。过去,使用自定义编码将流数据与数据仓库中的历史数据合并是困难的。RTOLAP 系统如 Pinot 更加方便地解决了提供所有数据单一视图的难题。
Apache Pinot 混合表
Pinot 混合表是由两个内部表组成的表,一个是离线表,另一个是实时表,它们共享相同的名称。这使得 Pinot 能够合并流数据和历史数据。
示例 8-10 和 8-11 分别是名为 airlineStats
的表的 REALTIME 和 OFFLINE 表定义。
示例 8-10. Pinot REALTIME 表
{
"tableName": "airlineStats",
"tableType": "REALTIME",
"tenants": {},
"segmentsConfig": {
"timeColumnName": "DaysSinceEpoch",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "5",
"replication": "1"
},
"tableIndexConfig": {},
"routing": {
"segmentPrunerTypes": [
"time"
]
},
"ingestionConfig": {
"streamIngestionConfig": {
"streamConfigMaps": [
{
"streamType": "kafka",
"stream.kafka.topic.name": "flights-realtime",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092",
"realtime.segment.flush.threshold.time": "3600000",
"realtime.segment.flush.threshold.size": "50000"
}
]
},
"transformConfigs": [
{
"columnName": "ts",
"transformFunction": "fromEpochDays(DaysSinceEpoch)"
},
{
"columnName": "tsRaw",
"transformFunction": "fromEpochDays(DaysSinceEpoch)"
}
]
},
"fieldConfigList": [
{
"name": "ts",
"encodingType": "DICTIONARY",
"indexTypes": [
"TIMESTAMP"
],
"timestampConfig": {
"granularities": [
"DAY",
"WEEK",
"MONTH"
]
}
}
],
"metadata": {
"customConfigs": {}
}
}
Kafka 的摄取配置
流式摄取转换
在第 7 章中,我们提到 RTOLAP 系统如 Pinot 正在增加更多有状态的流处理,并且下一代 OLAP 系统将会像流数据库一样运作。Pinot 的星形树索引就是这种趋势的一个例子,我们在第 3 章中简要介绍过。
示例 8-10 显示了从发布-订阅(pub-sub)系统如 Kafka 中获取数据时通常需要的摄取转换。发布到 pub-sub 系统的数据可能需要支持多个订阅者,因此需要提供满足许多订阅者的通用版本数据。每个消费者将需要处理其特定分析工作负载所需的额外转换。
示例 8-11. Pinot OFFLINE 表
{
"tableName": "airlineStats",
"tableType": "OFFLINE",
"segmentsConfig": {
"timeColumnName": "DaysSinceEpoch",
"timeType": "DAYS",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"replication": "1"
},
"tenants": {},
"fieldConfigList": [
{
"name": "ts",
"encodingType": "DICTIONARY",
"indexTypes": [
"TIMESTAMP"
],
"timestampConfig": {
"granularities": [
"DAY",
"WEEK",
"MONTH"
]
}
},
{
"name": "ArrTimeBlk",
"encodingType": "DICTIONARY",
"indexes": {
"inverted": {
"enabled": "true"
}
},
"tierOverwrites": {
"hotTier": {
"encodingType": "DICTIONARY",
"indexes": {
"bloom": {
"enabled": "true"
}
}
},
"coldTier": {
"encodingType": "RAW",
"indexes": {
"text": {
"enabled": "true"
}
}
}
}
}
],
"tableIndexConfig": {
"starTreeIndexConfigs": [
{
"dimensionsSplitOrder": [
"AirlineID",
"Origin",
"Dest"
],
"skipStarNodeCreationForDimensions": [],
"functionColumnPairs": [
"COUNT__*",
"MAX__ArrDelay"
],
"maxLeafRecords": 10
}
],
"enableDynamicStarTreeCreation": true,
"loadMode": "MMAP",
"tierOverwrites": {
"hotTier": {
"starTreeIndexConfigs": [
{
"dimensionsSplitOrder": [
"Carrier",
"CancellationCode",
"Origin",
"Dest"
],
"skipStarNodeCreationForDimensions": [],
"functionColumnPairs": [
"MAX__CarrierDelay",
"AVG__CarrierDelay"
],
"maxLeafRecords": 10
}
]
},
"coldTier": {
"starTreeIndexConfigs": []
}
}
},
"metadata": {
"customConfigs": {}
},
"ingestionConfig": {
"transformConfigs": [
{
"columnName": "ts",
"transformFunction": "fromEpochDays(DaysSinceEpoch)"
},
{
"columnName": "tsRaw",
"transformFunction": "fromEpochDays(DaysSinceEpoch)"
}
]
},
"tierConfigs": [
{
"name": "hotTier",
"segmentSelectorType": "time",
"segmentAge": "3130d",
"storageType": "pinot_server",
"serverTag": "DefaultTenant_OFFLINE"
},
{
"name": "coldTier",
"segmentSelectorType": "time",
"segmentAge": "3140d",
"storageType": "pinot_server",
"serverTag": "DefaultTenant_OFFLINE"
}
]
}
星形树索引:用于预聚合历史数据以构建物化视图。星形树索引可在实时和离线 Pinot 表中使用。
Pinot 具有分层存储功能,允许随着数据老化将其移动到较低的层级,以释放更多容量(见图 8-6)。
与实时表定义不同的是,OFFLINE 表没有摄取配置,因为实时表包括从流平台中的主题读取的配置。为了在数据发布后的毫秒内进行查询,实时表被分布在多个服务器上,这些服务器将流数据存储在易失性内存中。实时表和离线表是分开的,只有在查询引擎中才会将它们结合在一起。
实时表有一个保留期,之后实时服务器中的数据会被卸载到离线服务器上,以防止实时服务器的容量超载。
使用 OLAP 数据库的一个优势是,你不需要将分析数据减少到适合 OLTP 或 HTAP 数据库的容量内。减少分析数据会减少用户访问所有历史数据的灵活性。然而,灵活性—像所有事物一样—会带来更多的复杂性和基础设施成本。
借助 Pinot 的分层存储,你可以将较旧的数据卸载到较低的层级,以释放 Pinot 中的更多容量。如果你需要更多的历史数据来执行临时查询,最好利用能够将历史数据与实时数据合并的 OLAP 系统。
管道配置
Pinot 提供了合并历史数据和实时数据的解决方案。在到达 Pinot 之前,lambda 架构的数据流是通过使用 Debezium CDC 连接器从像 Postgres 这样的 OLTP 数据库中获取数据的。如你所记得,Debezium 捕获了来自许多类型事务数据库的变更事务。示例 8-12 展示了一个 Debezium Postgres 配置。
示例 8-12. 在 Flink 中创建 Kafka 源连接器
{
"name": "postgres",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "0.0.0.0",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "dbserver1",
"schema.include.list": "inventory"
}
}
这个连接器将变更数据写入一个主题。我们可以使用 Flink SQL 作为流处理器来转换数据。示例 8-13 展示了 Flink SQL 从 Kafka 中获取流数据。
示例 8-13. 在 Flink 中创建 Kafka 源连接器
CREATE TABLE KafkaSource (
`id` BIGINT,
`col1` STRING,
`col2` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'my_data',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'abc',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
一旦你从一个主题中获取了数据,你可以使用 SQL 执行转换。在 Flink 中,这些被称为管道。转换完成后,数据可以使用示例 8-14 中的代码写回到一个主题。
示例 8-14. 在 Flink SQL 中创建 Kafka 汇
CREATE TABLE KafkaSink (
`user_id` BIGINT,
`col1` STRING,
`col2` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'my_data_transformed',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
Pinot 使你能够对来自主题的实时数据以及来自数据仓库或对象存储(如 Amazon S3)的历史数据执行临时查询。需要注意的是,涉及实时和历史数据的临时查询的用例并不适合面向用户的应用程序。临时查询是为了那些在业务内部并从数据中发现洞察的数据分析师。这样的能力不应暴露给外部用户,因为外部用户的数量可能会达到数万,这会对 OLAP 系统施加过大的资源压力。
总结
零 ETL 的局限性突显了其无法实现实时分析的缺陷,以及在推送查询时强加批处理语义的问题,缺乏物化视图。为了解决这些问题,我们引入了支持近零 ETL 的替代方案,这些方案也支持实时分析,例如,当使用 Proton 将实时数据和历史数据减少为可以写入 Kafka 主题的物化视图时。我们需要在操作平面上构建 Kafka 消费者,以在嵌入式 OLAP 数据库中构建本地副本。这种方法也可以用 HTAP 数据库而不是嵌入式 OLAP 实现。
本章提出的模式在很大程度上依赖于第 7 章介绍的 Venn 图中的流平面。在第 9 章中,我们将深入探讨流平面及其如何在数据网格等架构模式中得到利用。
转载自:https://juejin.cn/post/7402517513933078578