流数据库——一致性如果你对数据库非常熟悉,你可能会认为一致性是理所当然的。你知道查询的结果将与输入数据一致。然而,假如你
如果你对数据库非常熟悉,你可能会认为一致性是理所当然的。你知道查询的结果将与输入数据一致。然而,假如你敢于从数据库的世界跨越到流处理的世界,你是否可以依靠类似的一致性保证,即使在数据迟到和乱序到达的额外复杂性下,以及对低延迟和高吞吐量的强调下?
对于传统的流处理器,答案是否定的。它们保证的是一种较弱的一致性形式,称为最终一致性。在传统的流处理用例中,通常涉及窗口化数据的聚合,最终一致性非常合适,同时也使得数据管道能够实现超低延迟、极高吞吐量和极大规模。然而,对于来自数据库世界的人来说,最终一致性可能会变成一种令人困惑和直觉不符的体验——尤其是结合非窗口化数据时。
在本章中,我们将使用一个来自银行领域的玩具示例,来展示如果仅仅按照数据库世界的直觉来操作,在像Flink、ksqlDB和Proton这样的最终一致性流处理器中可能会出现什么问题。
有趣的是,一些较新的流处理系统支持更强的一致性形式,其中每个输出都是输入子集的正确输出:内部一致性。我们将RisingWave、Materialize和Pathway放在同一工作任务中,看看它们是否允许我们以更接近典型数据库工程师直觉的方式解决这个玩具示例。
本章继续详细说明在遵循最终一致性路径时,具体会出现什么问题,以及提供更强一致性保证的流处理系统如何能够表现得更好。
最后,我们将探讨对于像Flink这样的传统流处理器,支持更强一致性保证是否有意义。换句话说,它们实际需要放弃多少低延迟和高吞吐量,是否值得?
玩具示例
本章通过一个从 Jamie Brandon 的博客中改编的玩具示例来生动地展示这一概念。这个示例故意不是一个经典的流处理用例。它没有使用窗口化技术,并且需要一种在经典流处理系统中无法完全实现的同步形式。选择这个示例的理由是,本书关注的是流数据库在流处理和数据库世界的融合。我们认为,为了真正实现这种融合,流处理系统应该能够以类似于数据库世界的方式处理非典型的、非窗口化的用例,特别是在一致性方面。
假设一个银行有10个账户,这些账户不断地将1美元转账到其他银行账户。这就像表 6-1 所示,其中每一列“Transaction”表示一个进行借记和贷记的交易。左侧的“Account”和“Starting value”列显示了三个账户,即1、2和3,初始值为0。接下来的列“Transaction 1”借记账户1并贷记账户2,以此类推。对于每一列表示的借记和贷记交易,所有行的总和应该为零。
表 6-1. 借记和贷记交易
Account | Starting value | Transaction 1 | Transaction 2 | Transaction 3 | Transaction 4 |
---|---|---|---|---|---|
1 | $0 | –$1 | –$2 | –$3 | –$2 |
2 | $0 | $1 | $1 | $2 | $2 |
3 | $0 | $0 | $1 | $1 | $0 |
Sum | $0 | $0 | $0 | $0 | $0 |
我们通过查看当所有账户的余额相加时是否能得到零来测试一致性。这是我们的不变式。任何时刻如果总和不是零,则表明存在一致性问题。
交易
我们使用Python代码来设置我们的玩具银行示例,如示例 6-1 所示。
示例 6-1. 设置玩具银行示例的 Python 代码
import datetime, json, random, time
from kafi.kafi import Cluster
c = Cluster("local")
c.create("transactions", partitions=1)
p = c.producer("transactions")
random.seed(42)
for id_int in range(0, 10000):
row_str = json.dumps({
"id": id_int,
"from_account": random.randint(0, 9),
"to_account": random.randint(0, 9),
"amount": 1,
"ts": datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds")
})
print(row_str)
p.produce(row_str, key=str(id_int))
time.sleep(0.01)
p.close()
每隔10毫秒,代码会向名为“transactions”的Kafka主题(只有一个分区)生产一个新的消息,其中一个银行账户 from_account
向另一个银行账户 to_account
转账1美元。我们在示例 6-2 中展示了一个示例消息。
示例 6-2. 示例消息/交易
{
"id": 42,
"from_account": 3,
"to_account": 0,
"amount": 1,
"ts": "2023-10-24 23:27:57.603"
}
分析交易
接下来,我们使用 SQL 对交易进行进一步分析。我们首先设置两个视图来聚合账户的贷记和借记:贷记是转入账户的金额之和,借记是从账户转出的金额之和(示例 6-3)。
示例 6-3. 设置贷记和借记视图
CREATE VIEW credits AS
SELECT
to_account as account,
SUM(amount) as credits
FROM transactions
GROUP BY to_account;
CREATE VIEW debits(account, debits) AS
SELECT
from_account as account,
SUM(amount) as debits
FROM transactions
GROUP BY from_account;
接下来,我们计算账户的余额,即贷记减去借记:
CREATE VIEW balance AS
SELECT
credits.account AS account,
credits - debits AS balance
FROM credits
INNER JOIN debits ON credits.account = debits.account;
最后,我们创建一个视图来汇总所有账户的余额。由于没有钱能凭空出现,也没有钱会丢失,所以这个总和应该始终为0,并为我们提供不变式来测试流处理器和流数据库的一致性行为:
CREATE VIEW total(total) AS
SELECT SUM(balance) FROM balance;
跨流处理系统的一致性比较
在面对玩具银行示例时,一些现有的流处理系统表现如何?我们将对以下六个系统进行比较:
- Flink SQL
- ksqlDB
- Proton(Timeplus)
- RisingWave
- Materialize
- Pathway
Flink SQL
我们从 Flink 开始,这是最受欢迎的流处理系统之一。由于本书主要关注基于 SQL 的流数据库,我们使用 Flink 的 SQL 层/API,即 Flink SQL(我们使用的是 1.19.0 版本)。
在 Flink SQL 中,我们首先设置与源主题 transactions 的连接:
CREATE TABLE transactions (
id BIGINT,
from_account INT,
to_account INT,
amount DOUBLE,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'transactions_flink',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);
接下来,我们创建视图 credits、debits、balance 和 total:
CREATE VIEW credits(account, credits) AS
SELECT
to_account as account,
SUM(amount) as credits
FROM
transactions
GROUP BY
to_account;
CREATE VIEW debits(account, debits) AS
SELECT
from_account as account,
SUM(amount) as debits
FROM
transactions
GROUP BY
from_account;
CREATE VIEW balance(account, balance) AS
SELECT
credits.account,
credits - debits as balance
FROM
credits,
debits
WHERE
credits.account = debits.account;
CREATE VIEW total(total) AS
SELECT
SUM(balance)
FROM
balance;
最后一步,我们将视图 total 的结果写回到一个名为 total_flinksql 的 Kafka sink 主题:
CREATE TABLE total_sink (
total DOUBLE,
PRIMARY KEY (total) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'property-version' = 'universal',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'total_flink',
'key.format' = 'json',
'value.format' = 'json',
'properties.group.id' = 'total_flink'
);
INSERT INTO total_sink SELECT * FROM total;
现在让我们看看 Flink SQL 代码的数据流图,如图 6-1 所示。左侧是 transactions 源表,然后将其拆分为两个聚合,分别用于 credits 和 debits。接着,这两个聚合被 JOIN 到 balance 中,最后计算所有账户余额的总和并存储在 total 中。
注意事项
所有在此比较的流处理系统,包括 ksqlDB、Proton、RisingWave、Materialize 和 Pathway,都使用了类似于图 6-1 的数据流图/拓扑结构。
要在 Flink SQL 中运行示例,我们首先设置表和视图,然后运行 Python 代码在我们的玩具银行中设置 10,000 个交易记录,并查看结果。如果我们来自数据库领域,我们会期望不会发生严重问题——结果主题应该像视图 total 一样,总是返回总和为 0。
让我们在图 6-2 中可视化 Flink SQL 从 Kafka 主题 total_flinksql 计算出的结果。
在图 6-2 中,我们看到总余额在 +400 和 –600 之间剧烈波动,总共近 80,000 条消息。尽管如此,当输入流在 10,000 条消息后最终停止时,Flink SQL 还是会收敛到正确的一致性总和,即 0,如表 6-2 所示。
表 6-2. total_flinksql sink Kafka 主题中的最后几条消息
Offset | Total |
---|---|
79936 | –3.0 |
79937 | 40.0 |
79938 | 40.0 |
79939 | –2.0 |
79940 | –2.0 |
79941 | –103.0 |
79942 | –103.0 |
79943 | –1.0 |
79944 | –1.0 |
79945 | –83.0 |
79946 | –83.0 |
79947 | 0.0 |
这种行为的原因是 Flink SQL 的设计选择——即在高层次上支持最终一致性而非内部一致性,这与选择低延迟而非一致性相一致。在最终一致性模型中,只要流处理的结果不是始终一致的,而是“在某个稍后的时间一致”,就足够了。在我们的案例中,“稍后的时间”正是 Python 代码停止向源主题 transactions 推送新消息的时间点。
最终一致性模型具有将延迟保持在最低水平的明显好处,是窗口流处理的理想模型,这也是 Flink 和 Flink SQL 最常用的场景。然而,在我们的案例中,这导致了一个困境。我们有一个无限制的、非窗口的用例,除非我们停止输入流,否则我们无法确保 Flink SQL 返回的中间结果是正确的。但我们不能在任何时候停止输入流,至少在生产环境中不能,因为我们无法禁止我们的虚拟客户进行交易……
在版本 1.19 中,Flink 引入了所谓的 MiniBatch,用于 JOIN 操作符。如果为手头的用例正确配置,MiniBatch 不仅显著提高了性能,还有助于实现更高程度的一致性。我们将在本章后面详细讨论这一点。
ksqlDB
我们的下一个流处理器/流数据库是 Confluent 的 ksqlDB(我们使用的是 7.6.0 版本)。许多人认为 ksqlDB 是第一个流数据库(其第一个版本于 2017 年发布)。本章主要关注基于 SQL 的流处理系统,因此只包含 ksqlDB,而不涉及其底层库 Kafka Streams。
我们首先设置与源主题 transactions 的连接,如示例 6-4 所示。
示例 6-4. 在 ksqlDB 中设置 transactions 表
CREATE TABLE transactions (
id VARCHAR PRIMARY KEY,
from_account INT,
to_account INT,
amount DOUBLE,
ts VARCHAR
) WITH (
kafka_topic = 'transactions',
value_format = 'json',
partitions = 1,
timestamp = 'ts',
timestamp_format = 'yyyy-MM-dd HH:mm:ss.SSS'
);
接下来,我们创建了视图 credits、debits、balance 和 total,每个视图对应一个 ksqlDB 表:
CREATE TABLE credits WITH (
kafka_topic = 'credits',
value_format = 'json'
) AS
SELECT
to_account AS account,
SUM(amount) AS credits
FROM
transactions
GROUP BY
to_account EMIT CHANGES;
CREATE TABLE debits WITH (
kafka_topic = 'debits',
value_format = 'json'
) AS
SELECT
from_account AS account,
SUM(amount) AS debits
FROM
transactions
GROUP BY
from_account EMIT CHANGES;
CREATE TABLE balance WITH (
kafka_topic = 'balance',
value_format = 'json'
) AS
SELECT
credits.account AS account,
credits - debits AS balance
FROM
credits
INNER JOIN debits ON credits.account = debits.account EMIT CHANGES;
CREATE TABLE total WITH (
kafka_topic = 'total_ksqldb',
value_format = 'json'
) AS
SELECT
'foo',
SUM(balance)
FROM
balance
GROUP BY
'foo' EMIT CHANGES;
EMIT CHANGES
告诉 ksqlDB 查询是一个推送查询。
当我们设置表 total 时,我们将所有账户余额的总和结果写入一个 sink Kafka 主题 total_ksqldb
。
现在,正如我们对 Flink SQL 所做的那样,我们首先在 ksqlDB 中设置了这些表,然后运行 Python 代码以创建 10,000 条交易。接着,我们可视化了从 Kafka 主题 total_ksqldb
中计算的 ksqlDB 结果(见图 6-3)。
与图 6-2 类似,我们看到很多消息(几乎 40,000 条),仍然在 0 附近剧烈波动(在 +100 和 –100 之间)。同样,当输入流在 10,000 条消息后停止时,ksqlDB 最终也会收敛到正确的一致性总和 0,如 sink Kafka 主题 total_ksqldb
中最后几条 Kafka 消息所示:
表格:Kafka 主题 total_ksqldb 中的最后几条消息
Offset | Total |
---|---|
39905 | –6.0 |
39906 | –78.0 |
39907 | –5.0 |
39908 | 68.0 |
39909 | –4.0 |
39910 | –3.0 |
39911 | –3.0 |
39912 | 7.0 |
39913 | –2.0 |
39914 | –10.0 |
39915 | –1.0 |
39916 | –26.0 |
39917 | 0.0 |
ksqlDB 的这种行为原因与 Flink SQL 相同——ksqlDB 采用了最终一致性模型(或在 Kafka Streams/ksqlDB 的上下文中称为连续优化)。在我们的例子中,ksqlDB 给我们带来了与 Flink SQL 相同的困境:除非我们停止输入流,否则我们永远无法确定哪些中间结果实际上是正确的。
Proton (Timeplus)
在本节中,我们将探讨 Proton,这是一种开源流数据库,底层支持 Timeplus。Proton 的设置从创建一个连接到输入 Kafka 主题 transactions
的 STREAM 开始,如示例 6-5 所示。
示例 6-5. 在 Proton 中设置输入 STREAM
CREATE EXTERNAL STREAM transactions(
id int,
from_account int,
to_account int,
amount int,
ts datetime64
) SETTINGS
type = 'kafka',
brokers = 'broker:29092',
topic = 'transactions',
data_format = 'JSONEachRow';
接下来,我们在示例 6-6 中设置视图 credits
、debits
和 balance
。
示例 6-6. 在 Proton 中设置视图 credits、debits 和 balance
CREATE EXTERNAL STREAM transactions(
id int,
from_account int,
to_account int,
amount int,
ts datetime64
) SETTINGS type = 'kafka',
brokers = 'broker:29092',
topic = 'transactions',
data_format = 'JSONEachRow';
CREATE VIEW credits AS
SELECT
now64() as ts,
to_account as account,
sum(amount) as credits
FROM
transactions
GROUP BY
to_account EMIT PERIODIC 100ms;
CREATE VIEW debits AS
SELECT
now64() as ts,
from_account as account,
sum(amount) as debits
FROM
transactions
GROUP BY
from_account EMIT PERIODIC 100ms;
CREATE VIEW balance AS
SELECT
c.account,
credits - debits as balance
FROM
changelog(credits, account, ts, true) AS c
JOIN changelog(debits, account, ts, true) AS d ON c.account = d.account;
现在,我们已经设置了这些视图,接下来在示例 6-7 中,我们只需创建输出 STREAM,将结果汇总到 Kafka 主题 total_proton
中,并设置物化视图 total
。
示例 6-7. 在 Proton 中设置输出 STREAM 和 MATERIALIZED VIEW total
CREATE EXTERNAL STREAM total_s(total int) SETTINGS type = 'kafka',
brokers = 'broker:29092',
topic = 'total_proton',
data_format = 'JSONEachRow';
CREATE MATERIALIZED VIEW total INTO total_s AS
SELECT
sum(balance) as total
FROM
balance;
我们在结果主题中只看到 56 条消息,这些消息在 –10 和 9 之间波动。当输入流在 10,000 条消息后停止时,Proton 收敛到正确的总和:0。这可以从 sink Kafka 主题 total_proton
中的最后几条消息中观察到,如表 6-3 所示。
表 6-3. Kafka 主题 total_proton 中的最后几条消息
Offset | Total |
---|---|
48 | 9 |
49 | 9 |
50 | 9 |
51 | 9 |
52 | 9 |
53 | –9 |
54 | 9 |
55 | 0 |
在图 6-4 中,我们展示了 Proton 的可视化结果,您可以轻松看到最后的正确总和 0,因为结果 sink 主题相对较小。
再次,正如 Flink 和 ksqlDB 一样,底层模型是最终一致性,我们面临的困境也类似——尽管我们在中间结果中得到的错误不如之前极端,但我们仍然必须停止输入流,以确保最终结果是正确的。
RisingWave
接下来是 RisingWave,一种与 PostgreSQL 协议兼容的流数据库。我们为 RisingWave 设置示例如下。首先,我们创建一个由输入 Kafka 主题 transactions
提供数据的表:
CREATE TABLE IF NOT EXISTS transactions (
id INT,
from_account INT,
to_account INT,
amount INT,
ts TIMESTAMP
) WITH (
connector = 'kafka',
topic = 'transactions',
properties.bootstrap.server = 'broker:29092',
scan.startup.mode = 'earliest',
scan.startup.timestamp_millis = '140000000'
) ROW FORMAT JSON;
接下来,我们设置视图,如示例 6-8 所示。
示例 6-8. 在 RisingWave 中设置视图 credits、debits、balance 和 total
CREATE VIEW accounts AS
SELECT
from_account AS account
FROM
transactions
UNION
SELECT
to_account
FROM
transactions;
CREATE VIEW credits AS
SELECT
transactions.to_account AS account,
SUM(transactions.amount) AS credits
FROM
transactions
LEFT JOIN accounts ON transactions.to_account = accounts.account
GROUP BY
to_account;
CREATE VIEW debits AS
SELECT
transactions.from_account AS account,
SUM(transactions.amount) AS debits
FROM
transactions
LEFT JOIN accounts ON transactions.from_account = accounts.account
GROUP BY
from_account;
CREATE VIEW balance AS
SELECT
credits.account AS account,
credits - debits AS balance
FROM
credits
INNER JOIN debits ON credits.account = debits.account;
CREATE VIEW total AS
SELECT
sum(balance)
FROM
balance;
最后,我们将视图 total
的结果汇总到 Kafka 主题 total_risingwave
中:
CREATE SINK total_sink
FROM
total WITH (
connector = 'kafka',
properties.bootstrap.server = 'broker:29092',
topic = 'total_risingwave',
type = 'append-only',
force_append_only = 'true'
);
我们在 RisingWave 中设置了表和视图,然后运行 Python 代码以创建 10,000 条交易。RisingWave 的可视化结果如图 6-5 所示。
我们在 sink 主题中得到了 105 条消息,这比 Proton 稍多,但远少于 Flink SQL 和 ksqlDB 生成的消息数。更重要的是,每条消息都给出了正确的结果:0。
Materialize
我们的第五个流处理系统是 Materialize,这也是一个提供 PostgreSQL 兼容 API 的流数据库,类似于 RisingWave。我们首先创建一个由输入 Kafka 主题 transactions
提供数据的表。
CREATE CONNECTION kafka_connection TO kafka (broker 'broker:29092');
CREATE SOURCE transactions_source
FROM
kafka connection kafka_connection (TOPIC 'transactions', START OFFSET (0)) KEY FORMAT
TEXT VALUE FORMAT TEXT INCLUDE KEY ENVELOPE UPSERT WITH (SIZE = '1');
CREATE VIEW transactions AS
SELECT
((text :: jsonb) ->> 'id') :: string AS id,
((text :: jsonb) ->> 'from_account') :: int AS from_account,
((text :: jsonb) ->> 'to_account') :: int AS to_account,
((text :: jsonb) ->> 'amount') :: int AS amount,
((text :: jsonb) ->> 'ts') :: timestamp AS ts,
key
FROM
transactions_source;
接下来,我们在 Materialize 中设置视图,如示例 6-9 所示。
示例 6-9. 在 Materialize 中设置视图 credits、debits、balance 和 total
CREATE VIEW accounts AS
SELECT
from_account AS account
FROM
transactions
UNION
SELECT
to_account
FROM
transactions;
CREATE VIEW credits AS
SELECT
transactions.to_account AS account,
SUM(transactions.amount) AS credits
FROM
transactions
LEFT JOIN accounts ON transactions.to_account = accounts.account
GROUP BY
to_account;
CREATE VIEW debits AS
SELECT
transactions.from_account AS account,
SUM(transactions.amount) AS debits
FROM
transactions
LEFT JOIN accounts ON transactions.from_account = accounts.account
GROUP BY
from_account;
CREATE VIEW balance AS
SELECT
credits.account AS account,
credits - debits AS balance
FROM
credits
INNER JOIN debits ON credits.account = debits.account;
CREATE VIEW total AS
SELECT
SUM(balance)
FROM
balance;
最后,我们将视图 total
的结果汇总到 Kafka 主题 total_materialize
中:
CREATE SINK total_sink
FROM
total INTO kafka connection kafka_connection (TOPIC 'total_materialize')
FORMAT JSON ENVELOPE DEBEZIUM WITH (SIZE = '1');
我们现在可以通过首先在 Materialize 中设置数据源和视图,然后执行 Python 代码以创建 10,000 条交易来运行这个示例。Materialize 的可视化结果如图 6-6 所示。
Materialize 仅向 Kafka 主题输出一条消息,并且每次运行时这条消息都包含正确的结果 0。我们将在下面详细讨论 Materialize 是如何实现这一点的。
Pathway
Pathway 实际上更多是一个 Python 流处理库,而不是一个完整的流数据库。我们将其添加到本章中,以展示使用流处理器也可以实现一致性,并且流数据库并不是必要条件。
此外,它是一个使用 Python 作为命令式语言来编写流处理管道的流处理器。示例 6-10 中的 Python 代码包含类似于 Flink SQL、ksqlDB、Proton、RisingWave 和 Materialize 的 SQL 语句,只不过结果的源和目标在 Python + SQL 中定义,而不仅仅是 SQL。
示例 6-10. 使用 Pathway 在 Python 中设置示例
#!/bin/python
import pathway as pw
rdkafka_settings = {
"bootstrap.servers": "localhost:56512",
"group.id": "pw",
"session.timeout.ms": "6000"
}
class InputSchema(pw.Schema):
id: int
from_account: int
to_account: int
amount: int
ts: str
t = pw.io.kafka.read(
rdkafka_settings,
topic="transactions",
schema=InputSchema,
format="json",
autocommit_duration_ms=1000
)
credits = pw.sql(
"""
SELECT to_account, sum(amount) as credits
FROM T GROUP BY to_account
""", T=t)
debits = pw.sql(
"""
SELECT from_account, sum(amount) as debits
FROM T GROUP BY from_account
""", T=t)
balance = pw.sql(
"""
SELECT CC.to_account, credits - debits as balance
FROM CC
join DD on CC.to_account = DD.from_account
""", CC=credits, DD=debits)
total = pw.sql(
"""
SELECT sum(balance) as total FROM BB
""", BB=balance)
pw.io.kafka.write(
total,
rdkafka_settings=rdkafka_settings,
topic_name='total_pathway',
format='json')
pw.run()
- 连接 Kafka 的信息。
- 消费的 transactions 的 schema。
t
表示来自 Kafka 的流式交易数据。credits
表。debits
表。- 从
credits
中减去debits
后的余额。 - 将用于发送到 Kafka 的总计。
- Kafka 主题
total_pathway
中的结果。 - 异步运行数据流。
运行该应用程序的结果是写入 Kafka 中 total_pathway
主题的单个记录(见示例 6-11)。
示例 6-11. 写入结果主题 total_pathway
的单条记录
{
"total": 0,
"diff": 1,
"time": 1698960910176
}
这是总值为 0 的记录的值。
total_pathway
主题的可视化结果(见图 6-7)显示的行为与 Materialize 完全一致。
乱序消息
为了模拟更现实的条件,我们修改了示例 6-1 中的 Python 代码,制造了大约 1/10 的乱序消息。在 10,000 条消息中,大约 1,000 条是乱序的。对于 Flink SQL、ksqlDB 和 Proton,我们得到的结果与没有任何迟到或乱序消息的简单案例几乎相同,而 RisingWave、Materialize 和 Pathway 的结果完全一致。
超越最终一致性
在我们了解了这五种流处理系统的表现后,还有一些未解答的问题:
- 为什么最终一致性的 Flink SQL、ksqlDB 和 Proton 在我们的玩具示例中失败了?
- 内部一致性的 RisingWave、Materialize 和 Pathway 为什么能通过这个示例?
- 我们能从中学到什么?我们是否能为 Flink SQL、ksqlDB 和 Proton 找到解决方法?
为什么最终一致性的流处理器会失败?
让我们回顾一下在我们的玩具示例中做了什么。我们首先创建了两个视图:credits 和 debits。到此为止,一切正常。但是,一旦我们在视图 balance 中进行了连接操作,Flink SQL、ksqlDB 和 Proton 的 JOIN 操作符未能正确匹配来自视图 credits 和 debits 的数据。
为了理解这一点,我们限制在四个交易场景:
- 从账户 0 向账户 1 转账 1 美元。
- 从账户 0 向账户 2 转账 1 美元。
- 从账户 1 向账户 2 转账 1 美元。
- 从账户 2 向账户 0 转账 1 美元。
现在,让我们看看在像 Flink SQL、ksqlDB 和 Proton 这样的最终一致性流处理系统中可能发生什么情况。
由于 balance 视图的两个 JOIN 操作数没有同步,一个可能的场景是 credits 视图比 debits 视图更早地发出其结果,这导致 balance 视图以如下方式组合其输入:
- balance 将 credits 视图的第一个结果与 debits 视图的第一个结果进行组合。
- balance 将 credits 视图的第二个结果与 debits 视图的第一个结果进行组合。
- balance 将 credits 视图的第三个结果与 debits 视图的第一个结果进行组合。
- balance 将 credits 视图的第四个结果与 debits 视图的第一个结果进行组合。
这种情况可以比作图 6-8 中展示的竞态条件,其中左侧显示了四个交易,中间显示了 credits 和 debits 视图的结果。右侧可以看到 balance 视图中的结果以及 total 视图中的相应总和。在图表的 credits、debits 和 balance 部分,“0:1”代表“账户 0,值 1”,“2:1”代表“账户 2,值 1”,以此类推(“值”根据图表中的位置可以是 credits、debits 或 balance)。
credits 和 debits 的结果组合到 balance 视图中的四种情况由虚线表示。右侧的三条虚线表示不正确的组合,而最右侧的三个结果表示 balance 和 total 中的不正确结果。如图所示,尽管 balance 视图中的第一个结果(以粗体显示,表示 credits 的第一个结果与 debits 的第一个结果正确匹配)是正确的,但接下来的三个结果是不正确的,这打破了我们对余额总和为 0 的不变性。由于 credits 视图比 debits 视图更早发出结果,导致最终的错误总和计算得出了正值结果(1、2 和 3)。
为了进一步说明,我们来看另一个可能的场景。在这个场景中,debits 视图比 credits 视图更快发出结果:
- balance 将 credits 视图的第一个结果与 debits 视图的第一个结果进行组合。
- balance 将 credits 视图的第一个结果与 debits 视图的第二个结果进行组合。
- balance 将 credits 视图的第一个结果与 debits 视图的第三个结果进行组合。
- balance 将 credits 视图的第一个结果与 debits 视图的第四个结果进行组合。
这里,debits 视图的结果比 credits 视图早发出,因此导致了错误的总计计算结果为负值(–1、–2 和 –3)。让我们更深入地探讨一下可能导致这种情况的原因。
早期发射的影响
在最终一致性流处理系统中,主要目标是低延迟。为了实现这一目标,Flink SQL 和 ksqlDB 的设计决策之一是尽可能早地发出结果。这也导致这些系统向 Kafka 接收主题中发出了大量消息(Flink SQL 约 80,000 条,ksqlDB 约 40,000 条)。虽然 Proton 也采用了最终一致性模型,但它发出的消息数量要少得多(56 条)。对于大多数经典流处理用例,这种行为是可以接受的,但对于非窗口数据,这可能成为一种失败模式,Brandon 的博客称之为“非单调操作的早期发射”。
结果的早期发射对单调操作(如过滤器)不会造成问题。但对非单调操作(如 JOIN 和 UNION)来说,则可能会有问题。非单调操作不能像单调操作那样“尽早结合它们得到的内容”并发出这些中间结果。它们必须确保只结合对齐、同步的输入。
MiniBatch(在 Proton 中使用,并在 Flink 1.19+ 中可选)可能是这种失败模式的解决方案,后面我们将讨论,而 ksqlDB 的缓存机制也是一种可能的解决方案。
无同步的流组合
实际上,使最终一致性流处理系统在我们的玩具示例中失败的原因是“非单调操作的早期发射”和这些操作不进行输入同步(Brandon 的博客称之为“无同步的流组合”)的结合。
问题在于,一旦我们在 balance 视图中对 credits 和 debits 视图进行 JOIN,实际对应于数据库事务的玩具银行交易就会丢失。Essentially, Flink SQL、ksqlDB 和在一定程度上 Proton 的 JOIN 操作符只是将来自两个输入视图的内容进行 JOIN,如果出现竞争条件并且任何一个输入视图提供的输入比另一个更快,我们就会得到不正确/不一致的结果(如图 6-8 和 6-9 所示)。
对同步需求的常见回应是,这种同步形式需要全局锁,就像在数据库中一样,这样的全局锁不能扩展。事实上,使用全局锁进行同步只是一个选项(最简单且扩展性最差)。有许多方法可以实现并发和可扩展的同步,我们将在以下章节中看到。
如何通过内部一致性的流处理系统解决玩具示例
正如我们所见,正确处理玩具示例的关键是能够同步地组合流。直观上,我们必须确保只有那些来自 credits 和 debits 的结果属于同一交易的(换句话说,对应于 transactions 源 Kafka 主题中的相同事务)才能进行组合。
在总主题中显示的结果图表表明 RisingWave(图 6-5)、Materialize(图 6-6)和 Pathway(图 6-7)可能找到了一种实现这一目标的方法。但如何实现呢?
RisingWave
RisingWave 使用了灵感来自 Flink 检查点屏障的屏障概念。屏障本质上是包含纪元(时间戳)的控制记录,定期(例如每秒)自动注入到所有源中。在 RisingWave 中,屏障被用作数据的版本号。操作符仅在从所有输入接收到相同版本/屏障后,才允许发出与特定版本相关的结果。
Flink 使用屏障进行一致的检查点,而 RisingWave 则完全利用了它作为流数据库的特点,不仅是流处理器,因此对其持久层(包括检查点)有完全控制。因此,RisingWave 在这方面可以超越 Flink,使用检查点屏障不仅进行检查点,还可以用于版本控制,这是一种数据库世界中的快照隔离概念的适应。
让我们可视化一下我们的四个交易以及 RisingWave 如何处理它们。在图 6-10 中,我们在每个交易后注入屏障,用垂直线表示。每个屏障都有其自己的版本,由下标(1、2、3、4)表示。在 RisingWave 中处理时,交易屏障被转发到下一个操作符。现在,当计算 balance 视图时,屏障被用来确保只有当 credits 和 debits 视图的输入都位于相同屏障之前(即具有相同版本)时,这些输入才会被组合。
原则上,障碍(barriers)也可以更少频繁地注入。考虑图 6-11,其中我们仅在每两条消息中注入一个障碍。
注入这些纪元障碍(epoch barriers)的频率在 RisingWave 中直接影响延迟和内存消耗。增加障碍注入的频率可以减少端到端的延迟,但代价是更高的内存消耗,因为维护更多版本的数据会消耗更多的内存。
注意:
乍一看,障碍(barriers)类似于流处理器如 Flink 中的水印(watermarks),因为它们都是数据流图中的控制记录。然而,障碍和水印具有略微不同的语义。使用障碍时,操作符只有在所有输入到达相同的障碍时才可以发出结果。而使用水印时,操作符只能在遇到水印后继续处理。水印表示所有到达特定时间戳的事件应该已经到达。
Materialize
Materialize 基于差分数据流(Differential Dataflow,DD)。在 DD 中,数据总是有版本的,DD 的所有操作符都会尊重这些版本。因此,Materialize 的图示(图 6-12)与图 6-10 非常相似,唯一的区别在于数据默认是版本化的——DD 不需要像障碍这样的额外概念。
在差分数据流(DD)中,操作符同步是通过以下方式实现的:
- 每个数据项都附带一个版本(在图 6-12 中,这些版本称为“v1”、“v2”、“v3”和“v4”)。
- 操作符只能组合相同版本的数据。
通过这种方式,DD 实现了一种快照隔离(snapshot isolation)形式,从而能够通过我们的玩具示例(toy example)。
Pathway
与 Materialize 类似,Pathway 也基于差分数据流(DD),因此也使用版本控制来实现操作符输入同步,从而通过我们的玩具示例挑战。我们包括 Pathway 是为了说明这种一致性不仅可以通过流处理数据库实现,还可以通过具有内部一致性底层引擎(如 DD)的流处理库实现。
如何修复最终一致的流处理系统以通过玩具示例?
我们已经看到,内部一致性的流处理系统的一个关键特性是它们能够同步其二元非单调操作符(如 UNION 和 JOIN)的输入,方法是使用屏障(RisingWave)或版本(DD、Materialize、Pathway)。更抽象地说,“无全球锁同步王国”的关键在于有效的语义上有意义的时间戳系统,它允许在流处理拓扑中实现解耦进展。
我们能否利用这一见解来为 Flink SQL 提供一个修复方案,使其也能通过我们的玩具示例?
如何使 Flink SQL 通过玩具示例
对于 Flink SQL,确实有办法通过我们的玩具示例。一个方法是显式地使用时间戳字段 ts
在 balance
视图的 WHERE
子句中,只有当 credits
和 debits
的 ts
字段匹配时才进行 JOIN
。我们在 Example 6-12 中展示了对 Flink SQL 代码的修改。
Example 6-12. 在 Flink SQL 中使用显式的 ts
字段进行操作符输入同步,设置 balance
和 total
视图
CREATE VIEW credits(account, credits, ts) AS
SELECT
to_account AS account,
SUM(amount) AS credits,
ts
FROM
transactions
GROUP BY
to_account,
ts;
CREATE VIEW debits(account, debits, ts) AS
SELECT
from_account AS account,
SUM(amount) AS debits,
ts
FROM
transactions
GROUP BY
from_account,
ts;
CREATE VIEW balance(account, balance) AS
SELECT
credits.account,
credits - debits AS balance
FROM
credits,
debits
WHERE
credits.account = debits.account
AND credits.ts = debits.ts;
使用此修复后,Flink SQL 的 sink 主题 total_flinksql_ts
的大小从大约 80,000 减少到 1,没有任何中间结果。现在,它只包含一个消息,其正确的总和为 0,就像 Materialize 和 Pathway 一样,正如图 6-13 所示。
为什么这个修复可能会有问题
虽然我们已经识别出内部一致性的流处理系统帮助解决我们玩具示例的关键特性,并且通过在 Flink SQL 中添加显式的操作符输入同步(通过时间戳)也可以解决它,但这种修复在多个方面可能会带来问题,这些问题并不立即显现。虽然它在玩具示例中工作得很好,但:
- 内部状态存储问题
通过在 Example 6-12 中将时间戳
ts
添加到GROUP BY
子句中,我们为聚合创建了一个无限增长的内部状态存储,这可能导致 Flink 在后期耗尽内存。 - 数据库工程师的理解难题 正如我们在本章介绍中所述,从数据库领域过来的工程师可能难以理解为什么他们直观的解决方案在 Flink SQL 中会产生 80,000 个大部分不正确的结果,以及为什么他们需要修复任何看似完美的直观 SQL 代码。
- 一致性修复的局限性 显式地在最终一致系统上“附加”一致性只能逐案处理。每个新的用例可能需要不同的修复。
- 潜在的微妙不一致
假设你有两个输入主题:
transactions1
和transactions2
,一个包含账户 0 到 4,另一个包含账户 5 到 9。那么,如果transactions1
中的一个事务和transactions2
中的一个事务具有相同的时间戳,你将无法阻止 Flink SQL 继续组合错误的输入。你将不得不寻找对你修复的修复(例如,包含事务 ID 在JOIN
子句中等)。
综合来看,最终一致的流处理系统通常可以修复以表现得更一致,但你能够达到的一致性水平完全取决于你,工程师——并且可能会随着你在现有 SQL 代码上实现的下一个 JOIN
而改变。在非窗口化数据上的一致流处理可以在最终一致的流处理器中完成,但几乎可以肯定会更加耗时、容易出错,而且对流处理专家以外的人几乎不可及。即使 SQL 代码在初看时看起来完全正确,也很容易得到不一致的结果。
Flink 1.19+ 的 MiniBatch
Flink 1.19 引入了 MiniBatch 语义用于 JOIN
。如果激活并配置得当,MiniBatch 不仅可以显著提高 Flink 的性能,还可以提高一致性水平。
MiniBatch 是一种优化,用于缓冲输入记录以减少状态访问。我们实验了各种 MiniBatch 配置,发现对于本章的玩具示例,它是解决之前遇到的“不一致”问题的非常有效的办法。
要激活 MiniBatch,需要配置三个参数:
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5S';
SET 'table.exec.mini-batch.size' = '5000';
配置项 table.exec.mini-batch.allow-latency
设置 MiniBatch 优化的最大延迟时间,以缓冲输入记录到 JOIN
操作符。table.exec.mini-batch.size
设置要缓冲的最大输入记录数。MiniBatch 将在允许的延迟间隔内以及达到最大记录数时触发。
我们发现,如果将 table.exec.mini-batch.size
设置为 1,Flink 仍然输出相同数量的记录(接近 80,000),其中大部分是错误的。这并不令人惊讶,因为这个设置实际上禁用了 MiniBatch,Flink 返回到其默认行为。如果我们将批大小增加到 10,我们已经大幅减少了输出消息(403),当我们将批大小增加到 50 或更多时,只写出一条消息到 sink 主题——给出正确的结果 0。这种一致的结果在我们将事务乱序发送时仍然有效。
因此,MiniBatch 不仅是性能优化,还影响 Flink 的一致性水平。然而,请记住,它不会将 Flink 从最终一致转变为内部一致的流处理器。公平地说,在 Flink 文档中,MiniBatch 被描述为仅仅是性能优化。要获得一致性上的优势,MiniBatch 需要调整到合适的配置,即便如此,它也不能在所有情况下保证真正更高的一致性水平。
一致性与延迟
我们作为读者可能已经提出了一个显而易见的问题,那就是延迟。为了实现更强的一致性,我们需要牺牲多少最终一致流处理系统的低延迟?
在回答这个问题之前,我们需要区分两种不同的“延迟”:
- 处理时间延迟 流处理系统生成查询结果所需的时间。
- 端到端延迟 流处理系统生成一致性结果所需的时间。
在纯处理时间方面,内部一致的流处理系统的延迟通常高于最终一致的系统。然而,对于大多数非经典的流处理用例,比如涉及非窗口化数据的 JOIN
,用户更关注的是端到端延迟。在我们玩具示例的极端情况下,我们观察到所有三种内部一致的流处理系统能够在不到一秒的时间内得出一致的结果——相比之下,最终一致的系统则永远无法达到这一点(除非你停止输入流或以某种方式修复你的 SQL)。
总之,在一个理想的世界中,Flink SQL、ksqlDB 和 Proton 的更新版本将包括一个开关来启用或禁用内部一致性——以便用户可以轻松选择他们想要的权衡:
- 对于涉及非窗口化/无限数据且不需要超低延迟的用例,启用内部一致性。
- 对于涉及窗口化数据且需要超低延迟的经典流处理用例,禁用内部一致性并恢复到最终一致性。
这将允许它们保持当前的处理模型以应对超低延迟用例,同时又能以更一致的方式处理非窗口化数据,使从数据库世界迁移到流处理世界的从业者能够无缝过渡,而无需将一致性作为事后的附加功能。
总结
你已经看到,虽然经典的最终一致流处理系统非常适合于处理大规模的窗口化数据的低延迟、高吞吐量的经典用例,但它们也有其缺点:
- 对于涉及非窗口化数据的用例,它们可能难以应用。
- 从数据库领域过来的工程师无法坚持他们经过验证的 SQL 直觉和公式。
我们认为,这两个问题是流处理系统以及流处理本身更广泛应用的主要障碍。如果你来自数据库领域,你是否愿意使用一个将看似完美的 SQL 代码转变为不一致混乱的系统?当然,一致性可以通过增加额外的条件、定义水印等来“附加”,但这通常只能由少数昂贵且难以找到的流处理专家可靠地完成。
内部一致的流处理系统,如 RisingWave、Materialize 和 Pathway,提供了更强的一致性保证。它们通过提供更高水平(且更不漏的抽象)的处理时间的复杂概念,能够开箱即用地解决我们的玩具示例。因此,这些系统有潜力让流处理变得更加普及,使那些敢于从数据库世界转向流处理世界的人能够更容易过渡,并显著扩展整个流处理市场。
至于处理时间的延迟,内部一致的流处理系统无法超越最终一致的系统。然而,当你关注通常更重要的端到端延迟时,它们可以做到这一点。
在下一章中,我们将进入与流处理数据库高度相关的混合数据系统领域。
转载自:https://juejin.cn/post/7402548056283824139