深入理解分布式系统的 7 种数据分片策略
数据分片是将全量数据按照某种规则分散存储在多个数据存储上的一种技术,用于平衡系统的存储压力和读写压力,并实现数据存储和系统性能的线性扩展能力。
常见的数据中间件如MySQL、Redis、HBase、ElasticSearch、Kafka、Hive等,都会使用一种或多种数据分片方式。以下是七种常见的数据分片方式:
- Hash取余分片:根据数据的哈希值进行取余操作,将数据分配到不同的存储节点上。
- 一致性Hash分片:根据数据的哈希值在哈希环上进行映射,将数据分配到与其哈希值最近的节点上,以实现负载均衡。
- Range分片:根据数据的范围进行划分,按照一定规则将数据分配到不同的存储节点上。
- 时间分片:根据数据的时间戳将数据按时间段进行划分,将不同时间段的数据存放在不同的存储节点上。
- 固定行数分片:按照固定行数将数据分割成多个片段,每个片段存放在不同的存储节点上。
- 固定文件大小分片:按照固定文件大小将数据分割成多个片段,每个片段存放在不同的存储节点上。
- 随机机分片:随机将数据分配到不同的存储节点上,实现简单的负载均衡。
Range 范围分片
Range 范围分片是使用索引表维护每个节点负责的起始范围,数据读写时查询索引表,将请求路由到对应的存储节点。
HBase 范围分片是怎么回事?
为了更好地理解,我以HBase为例进行说明。HBase是一种分布式数据库,它将每张表切分成多个Region,并将这些Region分布在多个节点上进行存储。每个Region负责存储表中的一部分数据,而分片key即为rowKey,也就是每条记录的主键。rowKey要求为ascii码,通常会由英文和数字组成。那么如何将rowKey路由到对应的Region呢?每个Region都有自己负责的startKey和endKey。我们可以在创建表时指定每个Region的范围,也可以指定排序方式。
举个例子,我们创建了一个包含50个分区的表,并且指定rowKey的排序方式为16进制字符串方式分割。在HBase中,会为每个分区生成对应的索引表,用来快速查询该分区对应的Region。
create'test',{NAME => 'f1',COMPRESSION => 'snappy' }, { NUMREGIONS => 5, SPLITALGO => 'HexStringSplit' }
- HexStringSplit:适用于以十六进制的字符串作为前缀的Rowkey。
- DecimalStringSplit:适用于以十进制的数字字符串作为前缀的Rowkey。
- UniformSplit:适用于Rowkey的前缀是完全随机的。
也可以指定每个分区的范围,创建6个分区,每个Region负责以下范围。
create 'datamanroad:Employee', 'info', 'partition1', SPLITS => ['10000','20000','30000','40000','50000']
例如第一个分区负责[, 10000] 起始到10000,最后一个负责[50000,] 50000到结束。每个 Region 负责一个范围。
HBase在数据读写时,通过索引表将请求映射到对应的Region。而且HBase会自动进行Region管理,默认情况下,当Region数量过大时会自动进行分裂,分裂后每个Region负责不同的RowKey范围。每个Region内部的rowKey也是有序存储的。不同于MySQL使用B+树索引,HBase使用LSM(Log-Structured Merge Tree日志结构合并树)树,特别适合于频繁插入和更新的索引场景。
为什么 Hbase 使用范围分片
为什么HBase需要支持Region范围分片呢?其实,这是因为HBase不仅支持随机查找,还支持范围查找。为了支持范围查找,存储在HBase中的数据必须是有序的,不仅每个Region内部有序,每个Region之间也要能排序。如果没有按照Region范围进行分片,那么在进行范围查找时,HBase将不得不查找所有的Region,这势必会显著降低查询效率。
所以不同的使用场景,决定了使用不同的分片策略。
RedisCluster如果使用范围分片呢?
如果RedisCluster能够支持范围分片,将会带来很多好处。例如,我们可以通过指定路由表,使得每个Redis节点负责某一个范围。这样,相同批次的Key就可以被路由到同一个Redis节点上。
因为RedisCluster会将Key哈希到不同的节点,所以像Lua脚本和mget这样涉及多个Key的请求会失败。但如果RedisCluster支持范围分片,例如Redis节点1负责A-G范围,那么A0001_type1和A0001_type2这两个Key就可以被路由到同一个Redis节点。这样一来,在RedisCluster中就可以使用Lua脚本和mget等涉及多个Key操作。
举个例子,假设文章的总阅读量和每天阅读量数据保存在Redis中,我们使用incrBy来实时更新。如果这两个Key在同一个节点,就可以通过Lua脚本同时增加这两个Key,而不需要调用两次。同时,也可以使用mget一次性查询多个Key。然而,现实情况是RedisCluster使用哈希分片,上述操作都无法同时执行,只能逐个操作。
虽然RedisCluster不支持范围分片,但是Redis客户端可以自行分片。例如ShardedJedis支持keyTagPattern模式,这意味着我们可以通过命名Key的格式,将一组相关联的Key放入同一个Redis节点,从而实现批量操作。
不同的使用场景决定了使用不同的分片策略。
Hash 取模适合哪些场景?
Hash是最常见的分片方式,通过Hash算法把Key均匀打散生成一个数值,
例如订单数据的数据库分库分表大多数使用userId 取模,路由到对应的库和表。
- 使用userId 倒数第三位进行分库, 共分5个库。 index = (userId/100) % 5;
- 使用userId 倒数后两位进行分表,共分100个表。 index = userId % 100;
由于userId的生成基本上是均匀的,后两位随机排列,所以直接使用userId取模即可得到分片值。然而,如果要哈希的是不均匀的数字键,就会遇到问题。以后两位为偶数的概率非常大为例,按照100取模,数据分布将会非常倾斜。
MurmurHash是什么?
目前最常用的hash算法是 murmurhash。
murmurhash 广泛应用于各开源产品Java 界中 Redis,Memcached,Cassandra,Hadoop,HBase,Lucene,spark,nginx等。
我进行了一个对比测试,测试了对偶数使用 Long.hashCode 和 MurmurHash 进行哈希的效果。
我生成了一组偶数数字,范围从0到100万,总共有50万个数字。我分别使用了 Java 的 Long.hashCode 和 MurmurHash 进行哈希,并将哈希值对10取模,最终得到了一些有趣的测试结果。
可以看到,使用 MurmurHash 对这50万个偶数进行哈希,它们基本均匀地散落在了10个桶中,每个桶中大约有5万个数字,差异量不超过200。然而,Java 的 Long.hashCode 所生成的哈希值却很奇特。
使用 Java 的 Long.hashCode,哈希值与原始值相同。我本希望将这些偶数均匀地分散到10个桶中,但由于 Long 的哈希值与原始值相同,所以只有5个桶中有数据。
究其原因是因为 Java 的 Long.hashCode 是对 Long 类型的高32位和低32位按位异或运算。由于我选取的值都小于2的32次方,所以高32位都是0。这就相当于一个数与0进行异或运算,结果自然等于自己(按位异或:相同则为0,不同则为1)。于是得到的哈希值自然都是偶数。这些偶数被分散到了10个桶中,数据分布一定非常倾斜。
由此可见,如果需要对数据进行哈希分片,就一定要确保哈希值是随机分散的,如果不是,极有可能导致数据分布极度倾斜。最好亲自测试一下所使用的哈希方法,以避免得到严重倾斜的哈希分布。
以下为本次测试代码
- 引入pom, guava中 Hashing类提供了Murmurhash 工具方法。
Hashing.murmur3_128()
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
- 示例
long max = 1000000;
Map<Long, Long> map = Maps.newHashMap();
int mo = 10;
for (long i = 0; i < mo; i++) {
map.put(i, 0L);
}
Map<Long, Long> map2 = Maps.newHashMap();
for (long i = 0; i < mo; i++) {
map2.put(i, 0L);
}
for (long i1 = 0; i1 < max; i1 += 2) {
HashFunction hashFunction = Hashing.murmur3_128();
HashCode code = hashFunction.hashLong(i1);
long v = code.asLong();
if (v < 0) {
v = 0 - v;
}
Long count = map.get(v % mo);
count++;
map.put(v % mo, count);
long v2 = Long.valueOf(i1).hashCode();
if (v2 < 0) {
v2 = 0 - v2;
}
count = map2.get(v2 % 10);
count++;
map2.put(v2 % 10, count);
}
System.out.println("murmurhash:" + map);
System.out.println("Long hashCode:" + map2);
如果要对字符串进行分片,则无法直接进行取模操作,因此必须使用hash算法来计算键的hash值,以获得更加均匀和随机的结果。
在测试中,我尝试了使用String.hashCode和MurmurHash两种方法进行计算,发现它们的hash效果几乎相似。
我生成了100万个随机字符串,并分别使用以上两种方法计算它们的hash值。结果显示,它们之间的差异并不大,散列结果非常好。总共有10个桶,每个桶分到的字符串数量在10万左右,差异值不超过300。
总结:如果key本身是散列均匀的数字,无需hash直接取模即可,否则最好使用MurmurHash hash后再取模,这样数据会比较均匀。
一致性Hash
Hash取模有什么痛点?
Hash取模算法是一种简单实用的方法,适用于节点数量长期不变的情况。然而,在数据扩缩容的情况下,这种算法表现较差。当存储负载过大或过小时,需要进行扩缩容操作,例如将10个节点扩容为15个。按照之前的取模算法,即hashValue%15,会导致大量数据分片与之前的情况不同,甚至需要迁移几乎全部的数据到新节点。因此,系统在扩缩容时面临巨大的数据迁移难度和稳定性挑战。那么,在扩缩容时是否有方法可以减少数据迁移的量呢?
什么是一致性Hash?
一致性哈希算法是为了解决哈希取模算法的问题而提出的,可以有效地减少在扩容或缩容时数据的迁移情况。一致性哈希算法引入了虚拟节点的概念,通过增加哈希取模的数值范围,将取模后的结果作为虚拟节点的值,并提供映射表将虚拟节点的值再映射为实际节点的值。当需要新增或减少实际节点时,只需要修改部分虚拟节点的映射表即可。
例如,当前系统将1万个虚拟节点划分到了4台机器上,也就是4个实际节点。当前系统的映射表如下所示:
节点 | 范围 |
---|---|
节点 1 | 0 - 2499 |
节点 2 | 2500 - 4999 |
节点 3 | 5000 - 7499 |
节点4 | 7500 - 9999 |
此时新增了节点5,需要修改一部分虚拟节点的映射。
- 节点1负责的 2000-2499
- 节点2 负责的 4500-4999
- 节点3 负责的 7000-7499
- 节点4 负责的 9500-9999
通过在以上4个节点中组合成2000个节点,并将其分配给节点5,实现了数据的均衡分布。 对于每个节点来说,它们各自负责2000个虚拟节点。或许有人会好奇,如何实现这个映射表呢?其实很简单,由于虚拟节点的数量是可以确定的,所以我们可以维护一个包含1万个键值对(KV)的HashMap。每当实际节点数发生变化时,我们需要一个调度模块来重新分配每个实际节点负责的虚拟节点,并生成相应的迁移计划。在迁移完成后,我们只需修改映射表即可。
这个思想就是一致性哈希。在网上有很多一致性哈希的资料都提到了哈希环。
在我看来,Hash环的思想是将虚拟节点均匀映射到一个环上,并确保每个实际节点负责其前一个节点到当前节点之间的所有虚拟节点。当节点加入时,只需分配部分虚拟节点,而无需重新计算整个Hash环。
然而,我认为这种Hash环的解释并不易于理解,也与实际情况不太一致。由于变更节点数量会导致虚拟节点到实际节点的映射表的修改,为了尽可能减少迁移量,又要平均分配所有虚拟节点。这必然导致每个实际节点负责的虚拟节点值不一定是连续的。因此,Hash环的思路并不完全符合实际情况。
RedisCluster中的一致性Hash
RedisCluster使用了一致性哈希的思想。它将多个Redis节点组合为一个集群,并且需要将缓存Key分片到每个节点。为了实现分片,缓存Key会使用CRC16进行哈希运算,然后被分配到16384个槽中,即CRC16(key) & 16384。这16384个槽实际上是16384个虚拟节点,运维人员可以指定每个Redis节点负责的槽范围,或者交由redis-trib.rb运维工具进行管理。
使用redis-cli管理每个节点的槽范围
redis-cli -h 192.168.0.1 –p 6379 cluster addslots 0,4095
redis-cli -h 192.168.0.2 –p 6379 cluster addslots 4096,8191
也可以 使用redis-trib.rb运维工具创建集群,运维工具会自动帮你均匀的分配好每个节点的虚拟槽数量
redis-trib.rb create --replicas 1 127.0.0.1:6379 127.0.0.1:6380 127.0.0.1:6381 127.0.0.1:6382 127.0.0.1:6383 127.0.0.1:6384
16384个槽分配成功,集群创建完成。
使用redis-trib.rb运维工具还再平衡 槽的数量,可以指定每个节点的权重,分别分配不同的槽数量。平衡过程中,完成槽的数据迁移。
rebalance host:port
--weight <arg>
--auto-weights
--use-empty-masters
--timeout <arg>
--simulate
--pipeline <arg>
--threshold <arg>
# redis-trib.rb rebalance --weight a8b3d0f9b12d63dab3b7337d602245d96dd55844=3 --weight f413fb7e6460308b17cdb71442798e1341b56cbc=2 --use-empty-masters 127.0.0.1:6379
当需要新增节点时,可以在执行 redis-trib.rb 命令时指定要迁移的虚拟槽数量,以达到平衡的目的。举例来说,如果要分配100个槽,这些槽并不是连续划分的,而是从其他节点中获取的一部分槽,这样做是为了确保每个节点都有尽可能均匀的槽数量。
一致性 Hash 还有哪些应用?
一致性Hash是一种经常被使用的算法,在许多地方都有应用。例如,Dubbo框架提供了一致性Hash的负载均衡策略,该策略能够将请求尽可能地路由到固定的节点上。当服务节点具有大量的本地缓存时,但每个节点的容量不足以保存全部缓存时,可以使用一致性Hash的方式。这种方式将缓存分成多部分,并将每部分的缓存负责权路由到相应的节点上,这样就能够尽可能地使缓存和用户的请求对应起来。这种方法可以显著提高缓存命中率,同时减轻节点上本地缓存的内存压力。
总结来说,一致性Hash算法将键(Key)映射为虚拟节点,并维护一个虚拟节点到实际节点的映射表。当需要扩容或缩容节点时,只需修改映射表,迁移一部分数据给新节点,避免了全量数据的迁移。
除了Hash取模、一致性Hash、Range范围这三种常用的分片策略之外,还存在其他四种分片策略。
基于时间分片
如果数据的时间属性非常重要,并且查询的时候通常需要指定时间范围,那么可以采用时间分片的方法。例如,在查询指定时间范围内的日志时,我们可以直接查找对应日期的分片数据。
Hive如何基于时间分片
最典型的当属Hive,hive在创建表时可以指定时间作为分区键,一般为日期。把数据按照每一天组织起来,在SQL查询时 指定时间范围,这样可以避免查询全量的数据。
CREATE TABLE IF NOT EXISTS `$target.table`(
user_id bigint COMMENT '用户ID'
)COMMENT '备注'
PARTITIONED BY (dt string COMMENT 'ctime, 日期分区字段,格式为datekey(yyyymmdd)')
STORED AS ORC
;
INSERT OVERWRITE TABLE `${target.table}` PARTITION (dt) SELECT * FROM XXX;
以上Hive建表SQL指定了日期作为分区键,要求SQL必须指定日期范围查询数据。
日志可以按时间分片吗?
除此之外还有很多时间分片的场景,例如应用日志,一般通过小时、天来切分日志,每个小时的日志分到不同的文件中,避免生成过大的日志文件,从而增加上传和清理的成本。
业务系统有哪些场景可以使用时间分片?
ElasticSearch 时间分片场景
一般情况下,我们使用ElasticSearch来实现日志检索,查询时必须指定时间范围。为什么要这样做呢?这是因为我们希望缩小要检索的数据范围。使用ElasticSearch实现数据检索时,并没有存储全部的数据,而是只存储了近期的数据,比如三个月或半年。过期的数据会被清理掉。为了能够有效检索数据,我们采用了时间分片的策略,每天的数据存储在一个新的索引上,过期的数据会被清理掉。当用户进行检索时,也必须指定时间范围,从而确定要检索的具体日期所在的索引。通过按照日期组织索引数据的方式,在实践中非常常见。
随机分片
虽然随机分片在表面上看起来并不起眼,但在 Kafka 中它却发挥着重要的作用。随机分片的核心思想是不控制数据被路由到哪个分片,完全随缘。一个典型的例子是 Kafka 的分片策略,在每个 topic 下会划分为多个 partition,而这些 partition 可以根据需求动态添加。每个消息在投递时会被路由到一个特定的 partition,而在消费时,每个 partition 只能由一个消费者实例来消费。Kafka 的分片路由策略相当复杂,默认情况下它会随机将消息路由到一个 partition 上。此外,Kafka 还提供了轮询策略,即将消息平均地分发到每个 partition 上。
除此之外,Kafka 还提供了指定分区的策略,即由生产者自定义路由策略,可以选择要发送到哪个分区。在这种情况下,用户可以选择使用哈希路由方式,例如按照用户的 UserId,将同一个用户的消息路由到同一个分区。由于每个分区只有一个消费者实例进行消费,因此可以保证单个用户的消息可以按顺序串行地被消费。
Kafka 为什么可以选择随机分片?
Kafka与MySQL、Redis、HBase等中间件在许多方面都不同。Kafka并不提供随机查找或范围查找的能力。它只需确保消息可靠地传递给消费者,并且消费者按序逐个消费这些消息。因此,Kafka并不需要使用哈希取模、一致性哈希或范围查找等分片策略。在生产端,Kafka会等待某个时间段,然后将这个时间段内的消息批量路由到一个分片上。如果使用默认的哈希分片策略,可能会导致同一时间段内的消息被路由到不同的分片上,这会影响消息的批量投递和同时存储在磁盘上的可靠性,并降低消息发送的吞吐量。
当然,Kafka也提供了灵活性,允许业务自定义路由分片策略,以满足特定的业务逻辑需求。
不同的场景诉求决定了使用不同的路由策略。
按文件大小分片
业务系统中一般不涉及按照文件大小分片,但是在存储系统中涉及到读写文件,为了避免文件过大降低读写文件时性能,会控制文件的大小。例如 Kafka 每个分片对应一个文件目录,每个分片目录下都包含多个文件,每个文件包含了一部分消息。当文件数量超过阈值时,Kafka 就会重新新建一个文件,消息也会写到新的文件中。
.log文件保存了消息内容,而 index 文件是消息的索引文件,两者除后缀外同名,index 文件用来标识消息 offset 和对应文件内偏移量。当然index 文件采取稀疏索引存储方式,它减少索引文件大小,只记录了一部分消息的位置。
例如 8,1686 代表文件内第八条消息,在文件物理偏移 1686 位开始。这样 Kafka 在检索一条消息时,就能兼顾性能和存储。
Kafka 为什么要切分日志文件
虽然 Kafka 并不提供消息 Id 的随机查询接口,但是它提供了重置消费位点的功能。消费组可以指定某个分区的消费点和 offset,这样 Kafka 就可以迅速定位到该条消息。
假设日志文件非常庞大,如果要读取该消息的 offset,就需要花费很长时间。但是如果将日志文件拆分成多个小文件,并辅以文件索引,就可以快速定位到该条消息,并从该条消息开始消费。
实际上,除了 Kafka,HBase 也进行了文件的拆分。如前所述,HBase 的每个 Region 是一个分区,可以预先进行分区划分,事先规划好分区。HBase 也提供了默认的分区策略,即根据 Region 的大小自动进行分裂。
HBase Region 按文件大小切分。
HBase 每个 Region 超过一个阈值,会自动分裂。该阈值的计算比较复杂。 Math.min( regionNumber ^ 3 * hbase.hregion.memstore.flush.size *2, 默认最大文件大小 )
使用默认值替换后,为 Math.min( regionNumber ^ 3 * 256 M, 10G)
例如 hbase.hregion.memstore.flush.size = 128M。
- 当只有1个文件时,切分Region大小为 1 ^ 3 * 128M * 2 = 256M。
- 2个文件时,切分Region大小就会增加 2^3*128M * 2 = 2G。
- 3个文件时 切分Region大小 3^3*128M *2 = 6.75G
- 4个文件时,切分文件大小触发10G阈值。每个Region大小为10G。
为什么 Region 的阈值定的这么复杂,主要是为了保证当数据量较少时,切分的阈值较少、数据量大的场景阈值较大。例如256M-2G期间,只会有两个 Region,而数据量越来越大时,阈值就要越大,直到10G。避免小数据量场景,却出现几十个 Region 过度分片的情况发生。
不同的场景决定了不同的数据分片策略。
按照文件行数分片
优化分片策略可以提高文件处理的效率和可读性。除了按照文件大小切分外,对于按行分割的文件,可以考虑按照固定行数切分文件。例如,对于一个包含1亿条数据的大文件,可以将其切分为10个包含1千万条数据的小文件,然后并行处理这些小文件。
这种分片策略的本质和按照文件大小进行切分类似,都是为了避免生成过大的文件。在实际应用中,可以综合使用这两种策略,以提高文件处理的效率。即首先按照文件大小进行切分,同时限制最大行数。一旦超过最大文件大小或最大文件行数的限制,就进行文件切分。这样可以兼顾文件大小和行数的限制,使得文件处理更加灵活和高效。
10 亿的大文件受限于一块磁盘的读写性能,只能顺序读取。如果把文件切分为 100 份,就可以增大数据读取的并发度,提高读取性能。
所以在文件生产阶段就应该按照文件行数、或者文件大小进行切分。而不应该等到读取文件前,在进行文件切分。
为了解决一个容量为10亿的大文件在一块磁盘上读写性能受限的问题,可以采取切分文件的策略。将文件分为100份,可以增加数据读取的并发度,从而提高读取性能。
因此,在文件生成阶段就应该根据文件的行数或大小进行切分,而不是等到需要读取文件时再进行切分操作。
通过对上述7种数据分片方式的分析,我们可以得出以下结论:
不同的场景决定了不同的数据分片策略。
总结
当需要范围查找时,可以使用范围分片来提高查询效率。例如,在需要查找某个范围内的数据时,可以将数据划分成多个片段,分别存储在不同的节点上,然后根据查询的范围确定需要查找的分片,从而加快查找速度。
在一些场景中,如果可以明确指定分区键,如按照用户ID进行数据分片,可以选择使用哈希分片。这样可以保证根据用户ID快速定位到对应的分片,提高查询效率。
在某些中间件场景中,存储节点数可能会发生变化,例如Redis Cluster。为了减少数据迁移的情况,最好采用一致性哈希分片算法,将数据均匀分散在各个节点上。
对于需要按照时间范围检索并需要全量数据扫描的场景,如ElasticSearch,可以使用时间分片来提高查询效率。将数据按照时间划分成多个片段,可以根据时间区间确定需要检索的分片,从而减少不必要的扫描操作。
而对于一些不需要随机查找、范围查找的场景,只需要按照顺序消费数据,如Kafka,可以使用随机分片来提高消费和生产的并发度。将数据分成多个随机分片,可以使多个消费者并行消费数据,提高整体的处理速度。
对于大文件处理的场景,可以根据文件的大小或行数进行切分。将大文件切分成多个较小的文件,可以同时进行多个读写任务,提高处理效率。
转载自:https://juejin.cn/post/7281825352531656715