流数据库——实时数据的未来状态你害怕进入的洞穴藏有你寻求的宝藏。 —— 约瑟夫·坎贝尔 在深入探讨流数据处理数据库的部署
你害怕进入的洞穴藏有你寻求的宝藏。
—— 约瑟夫·坎贝尔
在深入探讨流数据处理数据库的部署选项后,本章将稍作停顿,展望实时数据的未来状态,这一状态由本书的核心主题之一塑造:流数据与数据库的加速融合。流数据处理数据库正是这一趋势的表现之一。但这里还有许多值得至少触及的证据。
我们首先从图数据库及其不断向流处理领域迈进的旅程开始(例如,Memgraph 和 thatDot),接着是现今在 ChatGPT 生成式 AI 突破之后极其流行的向量数据库(例如,Milvus 和 Weaviate)。接下来,我们继续探讨流数据和数据库融合的领域,重点介绍将流数据数据库的一个核心方面——物化视图(即 IVM)——带入传统数据库的工具(如 Feldera、PeerDB 和 Epsio)。在本章的最后,我们研究了 MongoDB 等成熟数据库供应商的流处理功能,并逐渐将焦点转向分析领域,探讨如 BigQuery、Redshift 和 Snowflake 等数据仓库如何扩展其流处理功能。我们以对 Apache Iceberg、Apache Hudi、Delta Lake 和 Apache Paimon 驱动的数据湖房架构融合的调查来结束本章——这一趋势不仅在流数据领域,而且在大数据领域整体上都是最具前景的宏观趋势之一。
数据平面融合
在我们开始这一旅程之前,让我们通过再次查看第7章的维恩图(图11-1)来设定背景。
该图展示了三个数据平面的融合:
- 运营数据平面
- 分析数据平面
- 流数据平面
在本章的前几节中,我们将重点关注运营数据平面,即那些更常用于运营而非分析环境中的数据库。我们将详细说明各种类型的运营数据库如何逐步向流数据平面靠拢,也就是运营数据平面和流数据平面重叠的阴影区域。
接下来,我们将看看来自分析数据平面的供应商(包括数据仓库如 Snowflake 和湖仓如 Databricks)如何通过增加流处理功能来扩展他们的产品组合,从而接近分析数据平面和流数据平面相交的阴影区域。尤其在这一背景下,湖仓和流处理架构的持续融合是非常值得关注的——这注定将成为未来几年大数据领域的主要话题之一,因此也将成为本章的高潮部分。
图数据库
图数据库使用底层的图结构来支持在图上的查询,这些查询在关系数据库中通常难以高效实现。它们是2000年代和2010年代NoSQL运动的重要组成部分,显著拓宽了数据库的应用范围。常见的图数据库包括Neo4J、ArangoDB和TigerGraph。图数据库显然属于运营数据平面;它们通常用于运营应用,这使得它们与在分析平面上使用的图计算引擎有所区别。
图数据库的典型用例包括:
社交网络 图数据库在建模和查询社交网络方面表现出色,被广泛使用,例如在Meta、X或LinkedIn中。典型的设置是,节点代表用户,边代表它们之间的关系,如“用户A关注用户B”或“用户A是用户B的朋友”。
推荐系统 在这里,用户与产品/内容之间的关系被建模和查询,以根据用户的偏好提取个性化推荐。
知识图谱 在这种用例中,图被用来建模知识片段之间的关系,以促进更准确的语义搜索。
欺诈检测 如果与AI/ML结合,图数据库可以显著提高欺诈检测机制的准确性,通过分析实体之间的连接模式来改进分析。
供应链 在此背景下,节点可以代表位置、产品和实体,边代表供应链中的关系和流动。在这种背景下,图数据库可以显著优化供应链。
虽然像Neo4J和ArangoDB这样已建立的图数据库可以通过例如Kafka Connect连接到流平台,但一些新兴供应商如Memgraph和thatDot已经开始提供更紧密集成的功能。
Memgraph
Memgraph提供了可以直接连接到流平台(如Kafka和Pulsar)以摄取数据的功能,并且还支持所谓的转换模块,以正确处理传入的消息。转换模块可以使用C API或Python API。
示例11-1展示了Memgraph如何直接从Kafka中摄取数据。
示例11-1. 在Memgraph中直接从Kafka摄取数据流
CREATE KAFKA STREAM <stream name>
TOPICS <topic1> [, <topic2>, ...]
TRANSFORM <transform procedure>
[CONSUMER_GROUP <consumer group>]
[BATCH_INTERVAL <batch interval duration>]
[BATCH_SIZE <batch size>]
[BOOTSTRAP_SERVERS <bootstrap servers>]
[CONFIGS { <key1>: <value1> [, <key2>: <value2>, ...]}]
[CREDENTIALS { <key1>: <value1> [, <key2>: <value2>, ...]}];
START STREAM <stream name> [BATCH_LIMIT <count>] [TIMEOUT <milliseconds>];
示例11-2展示了使用Python API转换传入的消息的转换模块。
示例11-2. 使用Memgraph Python API转换来自流平台的传入消息
import mgp
@mgp.transformation
def transformation(context: mgp.TransCtx,
messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
result_queries = []
for i in range(messages.total_messages()):
message = messages.message_at(i)
payload_as_str = message.payload().decode("utf-8")
result_queries.append(mgp.Record(
query=f"CREATE (n:MESSAGE {{timestamp: '{message.timestamp()}', payload:
'{payload_as_str}', topic: '{message.topic_name()}'}})",
parameters=None))
return result_queries
thatDot/Quine
Quine是thatDot公司推出的开源图(数据库)。Quine允许将多个事件流合并到一个图中,查询复杂的事件关系,并实时对其采取行动。Quine的命名方式没有将“数据库”包括在产品名称中:公司实际上称其技术为“数据管道的流式图”。从本书的角度来看,Quine完全符合(基于图的)流式数据库的定义:
- 它提供了(基于图的)流处理(持久化查询),从流中读取数据,并将处理结果写回到流中。
- 它提供了物化视图(图)。
- Quine使得图处理的规模远大于现有的图数据库,如Neo4J或TigerGraph,因为它是一个完全分布式的系统,基于actor的概念(使用Scala库Pekko实现,是Akka 2.6.1的分支)。它还提供了可插拔的存储层,支持RocksDB、MapDB(本地)和Cassandra(远程)。
在示例11-3中,我们展示了如何使用“服务器端事件”摄取流连接到Wikipedia页面修订的实时流。
示例11-3. 使用Quine摄取Wikipedia页面修订的“服务器端事件”流
curl -X "POST" "http://127.0.0.1:8080/api/v1/ingest/wikipedia-revision-create" \
-H 'Content-Type: application/json' \
-d $'{
"format": {
"query": "CREATE ($that)",
"parameter": "that",
"type": "CypherJson"
},
"type": "ServerSentEventsIngest",
"url": "https://stream.wikimedia.org/v2/stream/mediawiki.revision-create"
}'
在下一步(示例11-4)中,我们使用Neo4J的Cypher图数据库语言创建摄取查询,将单个事件加载到流式图数据库中的节点中。
示例11-4. 使用Cypher编写的摄取查询将单个事件加载到Quine图数据库中的节点
MATCH (revNode),(pageNode),(dbNode),(userNode),(parentNode)
WHERE id(revNode) = idFrom('revision', $that.rev_id)
AND id(pageNode) = idFrom('page', $that.page_id)
AND id(dbNode) = idFrom('db', $that.database)
AND id(userNode) = idFrom('id', $that.performer.user_id)
AND id(parentNode) = idFrom('revision', $that.rev_parent_id)
SET revNode = $that,
revNode.bot = $that.performer.user_is_bot,
revNode:revision
SET parentNode.rev_id = $that.rev_parent_id
SET pageNode.id = $that.page_id,
pageNode.namespace = $that.page_namespace,
pageNode.title = $that.page_title,
pageNode.comment = $that.comment,
pageNode.is_redirect = $that.page_is_redirect,
pageNode:page
SET dbNode.database = $that.database,
dbNode:db
SET userNode = $that.performer,
userNode.name = $that.performer.user_text,
userNode:user
CREATE (revNode)-[:TO]->(pageNode),
(pageNode)-[:IN]->(dbNode),
(userNode)-[:RESPONSIBLE_FOR]->(revNode),
(parentNode)-[:NEXT]->(revNode)
在从流源建立图之后,Quine允许设置持久化查询来监控流中指定的模式。根据这些模式的识别,可以采取行动,例如通过创建新节点或边来更新图,将结果写入Kafka或Amazon Kinesis,或将结果发布到Webhook。Quine中持久化查询的一个关键特性是,它们不需要指定时间窗口,这与例如Flink不同。持久化查询本质上对应于基于图的流处理。
我们在示例11-5中展示了一个持久化查询。持久化查询由模式和输出组成。模式定义要匹配的内容,输出定义要采取的行动。持久化查询同样使用Cypher编写。
示例11-5. 使用Cypher编写的持久化查询,匹配模式并对其采取行动(使用文件接收器)
{
"pattern": {
"query": "MATCH (n)-[:has_father]->(m) WHERE exists(n.name) AND exists(m.name)
RETURN DISTINCT strId(n) AS kidWithDad",
"type": "Cypher"
},
"outputs": {
"file-of-results": {
"path": "kidsWithDads.jsonl",
"type": "WriteToFile"
}
}
}
要获取最后10个修订创建事件节点,现在可以像示例11-6那样查询Quine。
示例11-6. 从Quine获取最后10个修订创建事件节点
MATCH (userNode:user {user_is_bot: false})-[:RESPONSIBLE_FOR]->(revNode:revision
{database: 'enwiki'})
RETURN DISTINCT strid(userNode) as NodeID,
revNode.page_title as Title,
revNode.performer.user_text as User
LIMIT 10
我们在图11-2中展示了该查询的结果。
Memgraph——尤其是thatDot/Quine——处于图数据库和流处理不断融合的最前沿,因此,它们是操作层面和流处理层面融合的一个完美例子。
向量数据库
简而言之,向量数据库是为存储和查询向量而优化的,也就是固定长度的数字列表(通常每个向量包含50到1500个数字)。向量数据库中的查询通常相当于由近似最近邻(ANN)算法实现的相似性搜索。存储在向量数据库中的向量是高维的,代表着非结构化数据(如文本、声音和视频)的特征。流行的向量数据库包括Milvus、Weaviate、Pinecone、Vespa和Qdrant。有趣的是,越来越多的现有数据库也开始采用向量数据库功能,例如ClickHouse、Rockset、PostgreSQL、Cassandra、Elastic、Redis和SingleStore。
将非结构化数据(如文本、声音和视频)转化为向量嵌入的想法可以追溯到路德维希·维特根斯坦的“语言游戏”理论。到了2010年代,Google Research的Word2vec和斯坦福大学的GloVe这两种成功的方法推动了向量嵌入的普及——并间接促成了向量数据库这一新类别的出现。大型语言模型(LLMs),如OpenAI的GPT模型,也可以生成向量嵌入。
以下是向量数据库的一些应用场景:
推荐系统 相似性搜索可以在推荐引擎中使用,例如在线商店中,用于找到与用户正在寻找的产品类似的商品。相似性搜索甚至可以是多模态的(例如,文本相似性和图像相似性)。
欺诈检测 在这种情况下,相似性搜索用于标记例如欺诈性交易。
聊天机器人/生成式AI 向量数据库对于意图分类、检索增强生成(RAG)等非常有用。
与图数据库类似,向量数据库最常用于操作性应用,因此与操作层面紧密相连。现在,我们的问题是,回顾图11-1,向量数据库是如何向操作层面和流处理层面重叠的方向发展的?
在向量数据库中实现流式数据的摄取变得越来越普遍,例如,使用Kafka Connect将数据导入像Pinecone和Milvus这样的向量数据库中。但向量数据库与流处理之间还有其他接触点。例如,Milvus 2.x相比1.x版本进行了完全的架构重构,甚至在其底层加入了一个流处理平台。另一方面,一些实时OLAP数据库(如ClickHouse和Rockset)也越来越多地支持向量搜索。
Milvus 2.x:以流处理为核心
与早期较为单体化的“单二进制”版本相比,Milvus 2.x是一个完全重新设计的架构,具有水平扩展性和云就绪的特性。新架构中一个非常有趣的方面是增加了一个“消息存储”或“日志代理”,负责流数据持久化、可靠的异步查询执行、事件通知和查询结果返回。它还确保在工作节点从系统崩溃中恢复时,增量数据的完整性。Milvus甚至遵循“日志即数据”原则:它不维护物理表,而是通过日志持久化和快照日志来保证数据可靠性(见图11-3)。
Milvus 2.x还通过实现统一的批处理和流处理,采用集成增量和历史数据处理的lambda架构。为了将无界(流)数据分解为有界窗口,Milvus引入了一种新的水印机制,该机制根据写入时间或事件时间将流数据切分为多个消息包,并维护一个时间线,供用户按时间进行查询。
总的来说,Milvus 2.x以一种令人兴奋的方式展示了新一代数据库的设计如何能够在流处理领域深入发展。在Milvus 2.x架构中,流处理/发布-订阅是整个系统的主干,保证了向量数据库的可扩展性和弹性。未来,看到其他向量数据库也遵循这一趋势,将会是一件令人期待的事情。
RTOLAP数据库:增加向量搜索功能
越来越多的数据库供应商正在为其现有的数据库产品添加向量搜索功能。MongoDB、Elastic和SingleStore就是一些例子。这个趋势同样也被实时OLAP数据库(如ClickHouse和Rockset)所遵循。
ClickHouse
ClickHouse使用数组列类型(Array<Float32>
)来建模向量嵌入,并提供了计算搜索向量与列值之间距离的函数,例如cosineDistance和L2Distance。ClickHouse还提供了专门优化的ANN搜索算法,如Annoy索引。
与专门的向量数据库不同,ClickHouse和其他RTOLAP数据库允许将向量搜索与额外的元数据过滤或元数据聚合相结合。一个示例用例是对非版权保护的图像进行向量搜索——结合向量搜索与基于版权元数据的过滤。
我们在示例11-7中展示了一个结合向量搜索和元数据过滤的ClickHouse SQL查询示例。
示例11-7. 在ClickHouse中结合向量搜索和元数据过滤
SELECT
url,
caption,
L2Distance(image_embedding, [<embedding>]) AS score
FROM laion_100m
WHERE (width >= 300) AND (height >= 500) AND (copyright = '') AND similarity > 0.3
ORDER BY score ASC
LIMIT 10
FORMAT Vertical
正如你所看到的,ClickHouse允许无缝结合元数据过滤(这里是搜索图像的宽度和高度加上版权元数据)与向量搜索(使用L2Distance)。
Rockset
Rockset是另一款支持向量搜索的实时OLAP数据库,支持更简单的KNN(K-最近邻)和ANN搜索。在Rockset中,可以像示例11-8那样设置一个ANN索引。
示例11-8. 在Rockset中设置ANN索引
CREATE SIMILARITY INDEX book_catalog_embeddings_ann_index
ON FIELD commons.book_dataset:book_embedding DIMENSION 1536 as 'faiss::IVF256,Flat';
这里我们创建了一个名为book_catalog_embeddings_ann_index的索引。我们指定了输入向量的维度以及要创建的索引类型(在本例中是Faiss的IVF [倒排文件]索引)。
使用这个索引在Rockset中查询的方式如示例11-9所示。
示例11-9. 在Rockset中使用向量搜索进行查询
SELECT
book_dataset.title,
book_dataset.author
FROM
book_dataset ds
JOIN book_metadata m ON ds.isbn = m.isbn
WHERE
m.publish_date > DATE(2010, 12, 26)
and m.rating >= 4
and m.price < 50
ORDER BY
APPROX_DOT_PRODUCT(:target_embedding, ds.book_embedding) DESC
LIMIT
30
这里有趣的部分在于ORDER BY子句,其中相似性搜索用于根据内容的相似性对查询结果进行排序。
增量视图维护
增量视图维护(IVM)是一种在关系数据库中以增量方式保持物化视图最新的方法,其中变化是不断计算并应用到视图上的,而不是从头重新计算视图的内容。IVM比重新计算更高效地更新物化视图,因为每次只会改变视图的小部分。
在视图维护的时间上有两种方法:即时维护和延迟维护。在即时维护中,视图会在基础表被修改的同一个事务中被更新。而在延迟维护中,视图会在事务提交后更新,例如,响应用户显式命令(如REFRESH MATERIALIZED VIEW)、定期在后台更新,或仅在访问视图时更新。
IVM的即时维护形式是操作层面与流处理层面交汇的另一个途径。在过去的几年中,许多解决方案已经出现在这个领域,例如pg_ivm(PostgreSQL的插件)、Hydra、PeerDB、Epsio,以及在某种程度上也包括Feldera/DBSP。
pg_ivm
pg_ivm为PostgreSQL添加了即时维护的物化视图。当基础表被修改时,物化视图会在AFTER触发器中立即更新。示例11-10提供了一个例子。
示例11-10. 在PostgreSQL中使用pg_ivm创建即时维护的物化视图
SELECT create_immv('myview', 'SELECT * FROM mytab');
pg_ivm的优势在于它可以完美地与底层数据库集成,因此无需设置和维护额外的外部系统。
另一方面,pg_ivm与操作数据库(PostgreSQL)的紧密耦合也意味着它必须与数据库本身共享资源(计算、内存)。由于增量物化视图维护可能是昂贵的,将其外部化到一个独立系统中可能更为明智。对于需要列式表格式的用例,PostgreSQL基于行的表设置还可能导致性能问题。
Hydra
Hydra支持使用行和列表的传统物化视图,以及由pg_ivm驱动的增量物化视图。示例11-11显示了如何在Hydra中设置基于行和列的物化视图。
示例11-11. 在Hydra中创建即时维护的物化视图
CREATE TABLE heap_table (...) USING heap;
CREATE TABLE columnar_table (...) USING columnar;
Hydra的额外灵活性意味着它可以满足需要行表结构和需要列存储的用例。还可以在PostgreSQL中创建一个外部化表,并以模拟零ETL的方式同步回Hydra。
Epsio
Epsio是一个用于增量物化视图维护的工具,可以插入现有的PostgreSQL数据库,并在底层数据发生变化时不断地、增量地更新查询结果。通过其增量操作模式,Epsio无需重新计算整个数据集,能够为复杂查询提供即时且始终最新的结果。Epsio支持SQL语法的很大一部分,包括大多数类型的JOIN、CTE、子查询、GROUP BY等。
Epsio在实践中是如何工作的呢?在示例11-12中,我们定义了一个简单的Epsio物化视图。
示例11-12. 在Epsio中定义一个物化视图
CALL epsio.create_view('epsio_view',
'SELECT SUM(SALARY), d.name FROM employee_salaries e
JOIN departments d on e.department_id = d.id
GROUP BY d.name');
在示例11-13中,你可以看到如何查询物化视图(与直接在PostgreSQL数据库中查询完全相同)。
示例11-13. 在Epsio中查询物化视图
SELECT * FROM epsio_view;
使用像Epsio这样独立系统的优点是多方面的:
- Epsio中的物化视图不断以增量方式更新,无需重新计算整个数据集。
- 源数据库的性能不会因为其自身的物化视图维护或连续查询而受到影响。
Feldera
Feldera是一个“连续分析平台”,允许用户直接在流数据上运行连续查询。本质上,它持续处理查询并产生输出:每当有变化到达时,Feldera会以增量方式重新计算查询结果,然后将更改后的查询结果发送到其输出。
从某种意义上说,它是一个基于流处理的实时ETL平台,由DBSP(数据库流处理器)引擎驱动。在许多方面,Feldera与流数据库(如Materialize或RisingWave)有重叠,但更侧重于将数据从源头带到目标,并且不提供物化视图支持。它也可以被比作一个纯粹的流处理系统,如Flink,但提供了更多的一致性保证,例如Materialize(DBSP在许多方面类似于Materialize的底层流处理引擎Differential Dataflow)。
让我们看看Feldera在实践中是如何工作的。首先,在示例11-14中,我们声明了一些输入表。
示例11-14. 在Feldera中声明输入表
create table VENDOR (
id bigint not null primary key,
name varchar,
address varchar
);
create table PART (
id bigint not null primary key,
name varchar
);
create table PRICE (
part bigint not null,
vendor bigint not null,
price decimal
);
这些声明并未指定具体的数据源。VENDOR、PART和PRICE表的记录可能来自Kafka流、数据库或HTTP请求。可以在Feldera UI中实例化“SQL程序”并关联数据源(甚至多个数据源)。
在第二步中,我们现在可以在输入数据上编写连续查询。我们在示例11-15中展示了这一点。
示例11-15. 在Feldera中编写连续查询
-- 所有供应商中每个零件的最低可用价格。
create view LOW_PRICE (
part,
price
) as
select part, MIN(price) as price from PRICE group by part;
-- 每个零件的最低可用价格以及零件和供应商的详细信息。
create view PREFERRED_VENDOR (
part_id,
part_name,
vendor_id,
vendor_name,
price
) as
select
PART.id as part_id,
PART.name as part_name,
VENDOR.id as vendor_id,
VENDOR.name as vendor_name,
PRICE.price
from
PRICE,
PART,
VENDOR,
LOW_PRICE
where
PRICE.price = LOW_PRICE.price AND
PRICE.part = LOW_PRICE.part AND
PART.id = PRICE.part AND
VENDOR.id = PRICE.vendor;
在Feldera中,查询被写为SQL视图,可以基于表或其他视图定义。在示例11-15中,PREFERRED_VENDOR视图是基于LOW_PRICE视图定义的。这种灵活性使Feldera能够表达诸如深度嵌套查询等复杂查询。
PeerDB
PeerDB是一个开源工具,用于将数据从PostgreSQL流式传输到数据仓库(如Snowflake、BigQuery)、队列(如Azure Event Hubs)和存储引擎(如S3、GCS)。与Feldera类似,PeerDB不直接提供物化视图维护功能,但可以通过将PostgreSQL数据库实时同步到例如Snowflake中,实现类似的效果,而“物化视图”则存在于Snowflake中。
PeerDB有四种复制模式:
- 基于日志的
- 基于游标的(时间戳或整数)
- 基于XMIN的
- 流式查询
在基于日志、游标和XMIN的模式下,PeerDB从PostgreSQL(“源对等体”)表中获取表的变化,并将其传送到“目标对等体”。与类似的CDC工具相比,PeerDB声称性能更高(快10倍)。PeerDB还支持“流式查询”复制,以满足更复杂的复制需求。在这种模式下,你可以在将源对等体的数据传送到目标对等体之前,对源数据进行复杂的转换。
让我们看看PeerDB的工作方式。第一步是创建源和目标对等体(示例11-16)。
示例11-16. 在PeerDB中创建源对等体和目标对等体
CREATE PEER source FROM POSTGRES WITH
(
host = 'catalog',
port = '5432',
user = 'postgres',
password = 'postgres',
database = 'source'
);
CREATE PEER target FROM POSTGRES WITH
(
host = 'catalog',
port = '5432',
user = 'postgres',
password = 'postgres',
database = 'target'
);
在示例11-17中,我们启动了基于日志的CDC,通过创建一个“镜像”。
示例11-17. 在PeerDB中设置基于日志的CDC
CREATE MIRROR cdc_mirror FROM source TO target
WITH TABLE MAPPING (public.test:public.test);
这个“镜像”现在负责将所有DML命令(INSERT、UPDATE、DELETE)从源复制到目标,并利用表映射。
设置流式查询会稍微复杂一些(示例11-18)。
示例11-18. 在PeerDB中设置流式查询
CREATE MIRROR qrep_mirror FROM source TO target
FOR $$
SELECT id, hashint4(c1) hash_c1, hashint4(c2) hash_c2, md5(t) AS hash_t
FROM test WHERE id BETWEEN {{.start}} AND {{.end}}
$$ WITH (
watermark_table_name='public.test',
watermark_column='id',
num_rows_per_partition = 10000,
destination_table_name='public.test_transformed',
mode='append'
);
这个流式查询在将数据发送到目标之前,通过对c1、c2和t列进行哈希处理来掩盖它们。
数据包装和Postgres Multicorn
在第7章中,我们讨论了PostgreSQL普及的原因,其中我们归因于多个因素,如其开源性质、开发者社区和可扩展性(如pg_ivm)。此外,PostgreSQL普及的一个关键因素是其出色的可扩展性。这意味着它可以被定制和扩展到其核心功能之外,包括以下特性:
外部数据包装器(FDW) 允许与外部数据源的无缝集成,使其成为各种数据的中央枢纽。
Multicorn扩展 简化了用Python构建自定义FDW的过程,进一步降低了数据集成的入门门槛。这些功能,结合一个强大的社区推动扩展开发,使用户能够根据他们的特定需求定制PostgreSQL,使其成为许多用例中的一个有价值的工具。
PostgreSQL的FDW允许你访问驻留在PostgreSQL之外的数据,就像它是数据库中的一个常规表一样。FDW使你能够结合来自各种来源的数据,如其他数据库、文件、API、流处理平台和基于列式的数据库,所有这些都可以使用熟悉的SQL查询。
Multicorn是一个用于创建自定义FDW的Postgres扩展,它使用Python提供了一个用户友好的框架,抽象了FDW实现的复杂底层细节。使用Multicorn,你可以专注于与外部数据源交互所需的特定逻辑,而与Postgres的通信则交给框架来处理。FDW和Multicorn的结合解锁了灵活的数据集成,并简化了在PostgreSQL环境中处理各种数据源的开发。
以下是如何使用PostgreSQL、Multicorn扩展和OLAP数据库来实现实时分析的分步骤说明:
设置PostgreSQL和Multicorn:
-
安装PostgreSQL。确保你的系统上已经安装并运行了PostgreSQL。你可以在PostgreSQL官方网站上找到安装说明。
-
启用Multicorn扩展。一旦PostgreSQL设置完成,使用以下命令启用Multicorn扩展:
CREATE EXTENSION IF NOT EXISTS multicorn;
实施数据摄取管道:
-
建立连接。使用合适的库,如Python的psycopg2或Node.js的pg,创建应用程序与PostgreSQL数据库之间的连接。
-
实时捕获数据。在应用程序中实现逻辑以实时捕获数据。这可能涉及:
- 流式数据。使用支持从各种来源(如Apache Kafka或消息队列RabbitMQ)进行实时数据摄取的库/框架。
- WebSockets/服务器发送事件(SSE)。与客户端建立实时连接,并通过WebSockets或SSE接收数据更新。
- API调用。如果数据来自外部API,请定期调用API以检索和更新数据库。
队列、处理和存储数据:
- 队列数据。考虑在PostgreSQL数据库中使用临时表或队列来缓冲传入的数据,然后再进行处理。
- 内存中处理。为了更快地处理实时数据,可以探索内存数据库或在PostgreSQL中使用物化视图等技术。然而,这可能不适用于大数据集。
- 转换和存储数据。在应用程序中或使用PostgreSQL中的触发器,将接收的数据转换为所需的格式并存储在适当的表中。这可能涉及用于存储静态属性的维度表和用于存储度量/指标的事实表。
连接到OLAP数据库:
- 选择一个OLAP解决方案。选择一个符合你特定需求和数据量的OLAP数据库,如Apache Druid、Apache Kylin或ClickHouse。
- 建立连接。使用你选择的OLAP解决方案提供的库/连接器从应用程序连接到数据库。
- 定期数据传输。设置定时任务或在PostgreSQL中使用触发器,定期将处理后的数据从PostgreSQL数据库传输到OLAP数据库。这确保了OLAP数据库通过最新数据进行高效的分析查询。
你还需要考虑以下几点:
-
错误处理和日志记录
- 实现适当的错误处理机制,以确保整个管道中的数据一致性和数据质量。
-
监控和扩展
- 随着数据量的增加,监控系统和OLAP数据库的性能。根据需要扩展你的基础设施,以处理实时数据流。
-
安全性
- 通过实施身份验证、授权和加密机制,确保数据管道的安全性。
请记住,这是一个通用的指南,具体的实施细节将取决于你选择的工具、技术以及你的实时分析用例的具体需求。
经典数据库
本章——以及整本书——的反复主题是流处理与数据库的不断融合,换句话说,就是动态数据与静态数据的统一。这一融合的最新指标之一是MongoDB推出的新功能Atlas Stream Processing,这是一款已经成为“经典”的NoSQL数据库。Atlas Stream Processing扩展了MongoDB的流处理能力,使MongoDB越来越接近成为一个流处理数据库。在我们的维恩图(图11-1)中,这种融合的位置正是操作层面与流处理层面的重叠——正是流处理数据库如Materialize和RisingWave的切入点。
Atlas Stream Processing为MongoDB添加了流处理功能,它可以从像Kafka这样的流处理平台读取数据,并将处理后的数据写回流处理平台。此外,Atlas Stream Processing还允许将MongoDB集合用作处理数据的物化视图。我们在图11-4中展示了一个粗略的架构图。
通过嵌入在MongoDB中,Atlas Stream Processing允许在使用动态数据(通过流处理)和静态数据(像以前一样使用MongoDB集合)之间实现无缝集成。消息(通常为JSON格式)与MongoDB文档(也为类似JSON格式)之间的相似性使得这种集成更加顺畅——相比于将流处理与关系模型集成时更加顺畅,如第5章介绍的基于SQL的流处理数据库。然而,Atlas Stream Processing仍处于早期阶段,这可以从MongoDB官网上的功能矩阵中看出。例如,各类JOIN操作目前尚不支持。它能否在拥有Flink等成熟解决方案的流处理市场中占据一席之地,以及如何成为Materialize和RisingWave等流处理数据库的直接竞争对手,还有待观察。
让我们看看Atlas Stream Processing在实践中的一些实例。示例11-19中的JavaScript代码使用MongoDB查询API,展示了如何查询一个Kafka主题并将结果存入MongoDB集合中。
示例11-19. 使用Atlas Stream Processing查询Kafka主题并将结果存入MongoDB集合
// 从连接注册表中定义一个源
var source = { $source: {
connectionName: 'kafkaprod',
topic: 'stocks'
} }
// 创建其他一些阶段
var match = { $match: { 'exchange':'NYSE'} }
// 创建一个目标
var sink = { $merge: {
into: {
connectionName: 'mongoprod',
db: 'StockDB',
col: 'TransactionHistory'
}
} }
// 尝试执行!
var myProcessor = [source, match, sink]
sp.process(myProcessor)
在这里,source阶段连接到Kafka主题“stocks”,match阶段仅匹配那些其exchange字段与字符串“NYSE”匹配的Kafka消息,而sink阶段将匹配的消息存入一个物化视图,即名为“TransactionHistory”的MongoDB集合。最后一行以sp.process
开头的代码实际上启动了流处理器。
另一个示例展示了如何使用Atlas Stream Processing进行窗口聚合(见示例11-20)。
示例11-20. 在Atlas Stream Processing中执行窗口聚合
// 定义一个滚动窗口
{
$tumblingWindow: {
interval: {
size: NumberInt(60), unit: 'second'},
pipeline: [{
$group: {
_id: '$ip_source',
count_connection_reset: { $sum: 1 }
}
}]
}
},
// 输出有投影以便于使用
{
_id: '127.0.0.1',
count_connection_reset: 60,
_stream_meta: {
sourceType: 'kafka',
windowStartTimestamp: 2023-05-18T17:07:00.000Z,
windowEndTimestamp: 2023-05-18T17:08:00.000Z
}
}
在示例11-20的代码片段中,我们首先定义了一个60秒的滚动窗口,并使用它来对Kafka主题中的消息进行计数。
数据仓库
流处理不仅在数据库领域取得了重大进展,也在数据仓库领域占据了重要位置。在本节中,我们将探讨三个主要玩家:BigQuery、Redshift和Snowflake——它们位于我们的维恩图(图11-1)中分析层面和流处理层面重叠的部分。
BigQuery
谷歌的BigQuery通过其SDK也支持流式数据摄取。类似于Redshift,这使得BigQuery能够覆盖近实时分析的用例,在这些用例中,低延迟至关重要。通过流式摄取进入BigQuery的数据在第一次流式插入到表中的几秒内即可用于实时分析。
与Redshift不同的是,BigQuery中的流式摄取不是通过SQL实现的,而是通过Java和Python的SDK实现的。在示例11-21中,我们展示了使用Python代码以这种方式插入表数据。
示例11-21. 使用Python SDK进行BigQuery的流式摄取(插入行)
def stream_data(dataset_name, table_name, json_data):
bigquery_client = bigquery.Client()
dataset = bigquery_client.dataset(dataset_name)
table = dataset.table(table_name)
data = json.loads(json_data)
# 重新加载表以获取架构。
table.reload()
rows = [data]
errors = table.insert_data(rows)
if not errors:
print('Loaded 1 row into {}:{}'.format(dataset_name, table_name))
else:
print('Errors:')
pprint(errors)
Google Cloud Platform(GCP)还提供了BigQuery与其基于Apache Beam的产品Cloud Dataflow的集成。数据可以通过Cloud Dataflow写入BigQuery,也可以将BigQuery中的数据输出到Cloud Dataflow。此外,谷歌最近宣布推出Apache Kafka for BigQuery,方便以无服务器方式将Apache Kafka与BigQuery结合部署。
Redshift
Redshift支持从Kinesis或Kafka(Amazon Managed Service for Apache Kafka,或MSK)向物化视图进行低延迟的流式数据摄取。物化视图可以使用SQL语句进行配置,构成从输入流中接收数据的着陆区。数据在到达时即被处理——例如,来自Kinesis数据流或Kafka主题的JSON值可以被消费并映射到物化视图的数据列。
这一功能甚至使Redshift能够覆盖近实时分析的用例,这些用例需要在数据连续流式生成后短时间内处理数据。示例来源包括物联网(IoT)设备、系统遥测数据或点击流。与仅通过Kinesis Data Firehose将数据暂存到S3中间接获取数据相比,直接的流式摄取减少了复杂性并降低了延迟。
以下是Redshift中流式摄取的实际工作方式。关键部分是设置一个物化视图,以便从例如Kafka主题中消费数据,如示例11-22所示。
示例11-22. 在Redshift中创建物化视图以从Kafka主题中消费数据
CREATE MATERIALIZED VIEW MyView AUTO REFRESH YES AS
SELECT
kafka_partition,
kafka_offset,
kafka_timestamp_type,
kafka_timestamp,
kafka_key,
JSON_PARSE(kafka_value) as Data,
kafka_headers
FROM
MySchema."mytopic"
WHERE
CAN_JSON_PARSE(kafka_value);
在Redshift中,物化视图和流式摄取类似于流处理数据库中的物化视图,如Materialize和RisingWave,尤其是在使用AUTO REFRESH而不是MANUAL REFRESH时。然而,由于Redshift的底层架构并不是基于流处理的,这与流处理数据库仍然不完全相同——与流处理数据库完全增量处理不同,Redshift的刷新仍然是定期进行的,而非连续的,这使得Redshift的延迟比完全增量处理的流处理数据库更高。
Snowflake
Snowflake最近在构建流处理相关功能方面投入了大量精力。因此,Snowflake现在提供以下基于流处理的功能:
- 连续数据加载 这涉及流式摄取,类似于Redshift和BigQuery。为此,Snowflake提供了Snowpipe Streaming和用于Kafka Connect的Snowflake Connector。
- 连续数据转换 Snowflake引入了一个名为动态表的功能,用于以声明方式实现自动化数据管道,简化了数据摄取与处理的结合。
- 变更数据跟踪 这是为普通Snowflake表和动态表实现CDC(变更数据捕获)的功能。
首先让我们更深入地了解Snowpipe Streaming的流式摄取功能。类似于BigQuery,Snowflake提供了一个SDK(用于Java)来实现这一功能。因此,为了实现流式摄取,用户需要编写一个Java应用程序。示例11-23展示了一个简化的实践示例。
示例11-23. 使用基于Java的SDK进行Snowflake的流式摄取(插入行)
[package, imports]
public class SnowflakeStreamingIngestExample {
[setup]
// 使用insertRows API插入行
final int totalRowsInTable = 1000;
for (int val = 0; val < totalRowsInTable; val++) {
Map<String, Object> row = new HashMap<>();
// c1对应于表中的列名
row.put("c1", val);
// 使用当前的offset_token插入行
InsertValidationResponse response = channel1.insertRow(row, String.valueOf(val));
if (response.hasErrors()) {
// 如果有异常,可以抛出异常,或者你可以处理有错误的行
throw response.getInsertErrors().get(0).getException();
}
}
// 如果需要,你可以检查Snowflake中注册的offset_token
// 以确保所有内容都已提交
final int expectedOffsetTokenInSnowflake = totalRowsInTable - 1;
// 基于0的offset_token
final int maxRetries = 10;
int retryCount = 0;
do {
String offsetTokenFromSnowflake = channel1.getLatestCommittedOffsetToken();
if (offsetTokenFromSnowflake != null
&& offsetTokenFromSnowflake.equals(String.valueOf
(expectedOffsetTokenInSnowflake))) {
System.out.println("SUCCESSFULLY inserted " + totalRowsInTable + " rows");
break;
}
retryCount++;
} while (retryCount < maxRetries);
[close]
}
}
}
Snowflake引入了动态表,以声明方式构建数据摄取管道,使用SQL实现。动态表在许多方面类似于Redshift的流式摄取的物化视图,也类似于流处理数据库如Materialize和RisingWave中的物化视图。然而,正如Redshift一样,其底层架构是批处理而非流处理。因此,即使是动态表的自动刷新也是定期进行的,而不是像流处理数据库中那样严格的连续刷新,导致延迟大大增加。
Lakehouse
流处理引擎和流处理系统为数据湖/Lakehouse提供更无缝支持的趋势正迅速增长。像Confluent、Redpanda和WarpStream这样的流处理引擎正逐步提供直接从其“冷”对象存储层中流式处理数据的功能,使用Apache Iceberg等开放表格式。开源Kafka也在朝这个方向发展,而Apache Paimon是Apache Flink的一个分支,它将基于Flink的流处理引入了数据湖,即所谓的“Streamhouse”。Confluent已经在其专有的Confluent Cloud/Kora流处理平台中直接提供了Iceberg支持,并创造了“多模式流”的术语,以描述其在流和表格式中提供数据的能力。通过这一功能,Flink也可以被用来访问流和表/批数据。Delta Lake,另一种开放表格式中的数据,也可以使用流处理进行处理(如Spark Structured Streaming)。
在本节中,我们将进一步探讨分析层面和流处理层面的这种融合形式(见图11-1中的维恩图),并最终讨论流处理技术与Lakehouse的当前及未来关系。
Delta Lake
与另外两种最流行的开放表格式(Apache Iceberg和Apache Hudi)一样,Delta Lake基于列式Parquet文件格式。除了Parquet文件,它还提供了一个基于文件的事务日志,用于实现ACID事务和可扩展的元数据处理。由于它与Apache Spark API兼容(Spark和Delta Lake主要由Databricks开发),Delta Lake也与Spark Structured Streaming紧密集成,用于大规模的数据流和批处理。请注意,Spark Structured Streaming在底层使用的是微批处理,因此严格来说,它并不是纯粹的流处理。
Delta Lake的基础开放表格式称为Delta Table。在Delta Lake中,Delta Tables既可以作为数据源,也可以作为数据汇——结合Spark Structured Streaming,可以在Delta Lake上实现流处理。示例11-24展示了一个简单的示例。
示例11-24. 使用Spark Structured Streaming将Delta Lake(Delta Tables)作为数据源
spark.readStream.format("delta")
.load("/tmp/delta/events")
import io.delta.implicits._
spark.readStream.delta("/tmp/delta/events")
在这个示例中,Delta表/tmp/delta/events
使用Spark Structured Streaming被读取为一个流。你现在可以添加任意流处理逻辑的查询,查询将处理表中现有的所有数据以及将来到达的新数据。
将处理后的数据从Spark Structured Streaming写回到Delta Tables的过程类似(见示例11-25)。
示例11-25. 使用Spark Structured Streaming将Delta Lake(Delta Tables)作为数据汇
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Apache Paimon
Apache Paimon的目标类似于Delta Lake,但不同的是,Paimon并不像Delta Lake/Spark Structured Streaming那样使用微批处理,而是基于Apache Flink构建的“纯”流处理。Paimon的主要供应商Ververica也将Paimon称为Streamhouse(与Lakehouse相对)的解决方案。
与Delta Lake、Iceberg和Hudi不同,Paimon并不使用Parquet作为底层的列式数据格式——它的文件基于其自有的基于LSM(日志结构合并)树结构的格式。除了Flink,Paimon还支持通过其他计算引擎(如Apache Hive、Apache Spark和Trino)读取这些文件。虽然Paimon基于流优先的架构,但它也支持批处理模式来读取和写入数据。
在示例11-26中,我们展示了如何创建Paimon表customers
和Orders
,后者基于Kafka主题(我们为简洁起见省略了目录设置)。
示例11-26. 在Apache Paimon中基于Kafka主题创建表customers
和临时表Orders
CREATE TABLE customers (
id INT PRIMARY KEY NOT ENFORCED,
name STRING,
country STRING,
zip STRING
);
INSERT INTO customers ...
CREATE TEMPORARY TABLE Orders (
order_id INT,
total INT,
customer_id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = '...',
'properties.bootstrap.servers' = '...',
'format' = 'csv'
...
);
现在,可以使用示例11-27中的查找JOIN查询来查询此表。
示例11-27. 在Apache Paimon中使用查找JOIN查询表Orders
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
Apache Iceberg
Apache Iceberg是一种非常流行的开放表格式,最初由Netflix推出,并得到许多供应商的支持,如Spark、Flink、Presto、Trino、Hive、Impala、StarRocks、Doris、Pig和Snowflake。像Redpanda、WarpStream和开源Kafka这样的流处理平台也计划包括Iceberg/Parquet支持,以便通过Iceberg API直接访问它们的冷数据。Confluent Cloud/Kora已经开始通过Tableflow将Kafka主题暴露为Iceberg表。
流处理数据库也在跟随这一趋势。RisingWave已经支持将Iceberg作为数据汇,如我们在示例11-28中展示的,将数据汇入Iceberg表(RisingWave还支持仅追加模式)。
示例11-28. 在RisingWave中将表s1_table
汇入Apache Iceberg表
CREATE SINK s1_sink FROM s1_table
WITH (
connector = 'iceberg',
warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY},
database.name='dev',
table.name='table',
primary_key='seq_id'
);
由于Iceberg也受到Spark Structured Streaming的支持,在示例11-29中,我们展示了如何使用Python和Spark将数据导入Iceberg表。
示例11-29. 使用Spark Structured Streaming将数据摄入Apache Iceberg表
df.writeStream \
.format("iceberg") \
.outputMode("append") \
.trigger(processingTime=WINDOW_SIZE) \
.option("path", table_id) \
.option("fanout-enabled", "true") \
.option("checkpointLocation", checkpointPath) \
.start()
Iceberg表也可以作为Spark Structured Streaming的数据源(见示例11-30)。
示例11-30. 使用Spark Structured Streaming从Apache Iceberg表读取数据
spark.readStream \
.format("iceberg") \
.load(basePath) \
.start()
Apache Hudi
Apache Hudi是另一种流行的开放表格式,最初由Uber开发。Hudi也支持多个查询引擎,如Apache Spark、Apache Impala、Apache Hive、Presto和Trino。与Iceberg不同,Hudi具有基于记录级变更流的增量查询功能,可用于将数据从Hudi导入例如Spark Structured Streaming(见示例11-31)。
示例11-31. 使用Spark Structured Streaming从Apache Hudi表读取数据
spark.readStream \
.format("hudi") \
.load(basePath) \
.start()
例如,来自Spark Structured Streaming的数据也可以像示例11-30中的Iceberg示例一样被导入Hudi。Hudi的示例见示例11-32。
示例11-32. 使用Spark Structured Streaming将数据写入Apache Hudi表
df.writeStream
.format("hudi") \
.options(**hudi_streaming_options) \
.outputMode("append") \
.option("path", baseStreamingPath) \
.option("checkpointLocation", checkpointLocation) \
.trigger(once=True) \
.start()
OneTable或XTable
OneTable或XTable是一种新的开放表格式,旨在使不同的开放表格式互操作。从这个意义上说,它是一个元开放表格式。目前,OneTable或XTable支持Apache Iceberg、Apache Hudi和Delta Lake。
OneTable或XTable对于同时使用多种开放表格式的公司特别有用,例如Iceberg和Delta Lake——它提供了一个抽象层,避免在不同Lakehouse之间来回复制数据。随着OneTable或XTable的开源,可能也会增加对Apache Paimon表格式的支持。
流处理与Lakehouse的关系
Redpanda、WarpStream和开源Kafka正通过即将推出的功能铺平流处理平台、流处理和Lakehouse之间日益紧密集成的道路,这些功能提供了直接使用列式表格式(如Iceberg/Parquet)从分层存储层(或对WarpStream来说,是其唯一的存储层)获取冷数据的能力,以及Apache Paimon的“Streamhouse”理念。
在我们写这本书的时候,Lakehouse和流处理的集成仍然主要局限于纯粹的流式摄取。然而,流处理平台和开放表格式的更紧密集成已经近在眼前,这将显著改变现状。一个可能的结果是,目前仅存储在Lakehouse中的大量数据可能会转移到流处理平台——避免了从流处理平台到实际Lakehouse的额外复制/摄取。这也意味着流处理平台将逐渐成为数据的“单一真实来源”,而不是Lakehouse。或者,像Confluent的Tableflow这样的技术可以提供两个API——一个是流和事件,另一个是批处理和表——直接在流处理平台上访问相同的非重复数据。
总体而言,许多SaaS数据服务正朝着使用云对象存储作为低层存储(或分层存储)的方向发展。使用分层存储帮助企业卸载较冷的数据,从而节省存储成本。许多人通过建议使用云对象存储作为数据服务和存储的接口(而不是SaaS接口本身)来扩展这一想法。例如,如果你的SaaS提供数据库服务,并将数据卸载到对象存储以进行廉价的冷存储,为什么它不能直接从该对象存储中读取数据?同样,你的SaaS数据库应该能够从云对象存储中读取数据。那么,为什么不使用我已经在云对象存储中的数据来为你的SaaS数据库提供服务呢?
这对本书主要讨论的技术——流处理数据库——意味着什么?我们认为这是它们发展的一个巨大机会。如果像Kafka这样的流处理平台越来越多地成为单一的真实数据来源,那么目前只在Lakehouse上执行的数据处理管道也可能会转移到流处理平台。而且,尤其是在端到端低延迟至关重要的操作性应用中——但实际上,在所有需要处理大量数据的用例中——基于流的流处理系统和流处理数据库可以比基于批处理架构的数据仓库(如Snowflake)或Lakehouse(如Databricks)提供更好的性能和巨大的成本节省。
结论
在本书中,我们已经走了很长的一段路。你了解到流处理技术如何受到数据库中已经实现的功能的影响。你了解到,通过将数据库“翻转”过来,我们能够将各个部分的扩展超越单个数据库的容量。最重要的是,你了解到将这些技术重新引入数据库(换句话说,将数据库技术外置)有助于整合基础设施,并简化工程师的界面,而不丧失我们从数据库分解中学到的可扩展性。
我们看到了一系列关于流处理和批处理(数据库/数据湖/Lakehouse)不断融合的迹象,首先是操作层面和流处理层面的日益重叠。不仅越来越多支持图数据库和向量数据库的供应商提供了与流处理平台直接集成的功能,而且一些供应商还支持增量物化视图的维护。像MongoDB这样的老牌数据库供应商甚至在某种程度上发展成为功能齐全的流处理数据库。
更有趣的是分析层面与流处理层面的持续融合。在这里,数据仓库解决方案如BigQuery、Redshift和Snowflake提供了越来越多直接支持流处理的有趣功能。然而,目前最有趣的融合是流处理与Lakehouse的融合。在这里,我们观察到双方的强劲推动——从流处理到Lakehouse,以及反过来——无缝结合流处理与Lakehouse架构,催生了如“多模式流”这样的新概念,这些概念由Apache Iceberg等开放表格式驱动。我们还观察到,这种融合也可能成为纯流处理的流处理解决方案和流处理数据库普及的强大推动力。
在撰写本书时,我们仅处于起步阶段。我们期待着看到操作、分析和流处理层面的融合故事将如何展开。
转载自:https://juejin.cn/post/7402773345710129161