likes
comments
collection
share

如何设计灵活的分库分表算法

作者站长头像
站长
· 阅读数 34

分库100个,单库128张表,以用户id进行分片,针对这个场景,我会设计一种基于一致性哈希算法的分库分表方案。

首先,需要确定哈希函数的选择。由于用户 ID 是一个字符串,可以选择 MurmurHash3 这个非加密型哈希函数来将用户 ID 映射为一个 32 位整数,例如:

hash_value = murmurhash3(user_id)

接下来,可以将哈希值范围均匀地分散到 12800 个虚拟节点上,每个虚拟节点对应一个表。使用一个环形哈希空间来表示这些虚拟节点,将哈希值映射到环上,然后顺时针查找离这个哈希值最近的虚拟节点,并将数据存储到对应的表中。

如何设计灵活的分库分表算法

为了支持动态扩容,可以增加虚拟节点的数量。当增加一个虚拟节点时,只需要将其哈希值插入到哈希空间中,并重新计算数据的映射关系即可。当删除一个虚拟节点时,只需要将其从哈希空间中移除,并重新计算数据的映射关系即可。

为了保证高可用性,可以采用主从复制的方式来实现数据的备份和容灾。每个表都有一个主节点和多个从节点,主节点负责处理写请求和同步数据到从节点,从节点则负责处理读请求和备份数据。如果主节点出现故障,可以自动选举一个从节点来代替它,从而实现故障转移。如果某个从节点出现故障,可以将其替换为新的节点,然后重新进行数据同步。

以下是一个基于一致性哈希算法的 Java 代码示例:

javaCopy code
import java.util.*;

public class ConsistentHashing {
    private final SortedMap<Integer, String> nodes = new TreeMap<>();
    private final int numReplicas;
    
    public ConsistentHashing(int numReplicas) {
        this.numReplicas = numReplicas;
    }
    
    public void addNode(String node) {
        for (int i = 0; i < numReplicas; i++) {
            int hash = hash(node + ":" + i);
            nodes.put(hash, node);
        }
    }
    
    public void removeNode(String node) {
        for (int i = 0; i < numReplicas; i++) {
            int hash = hash(node + ":" + i);
            nodes.remove(hash);
        }
    }
    
    public String getNode(String key) {
        if (nodes.isEmpty()) {
            return null;
        }
        int hash = hash(key);
        SortedMap<Integer, String> tailMap = nodes.tailMap(hash);
        int nextHash = tailMap.isEmpty() ? nodes.firstKey() : tailMap.firstKey();
        return nodes.get(nextHash);
    }
    
    private int hash(String str) {
        return MurmurHash3.hash32(str.getBytes());
    }
}

这个类实现了一个一致性哈希算法,并提供了添加节点、删除节点和查找节点的接口。每个节点通过 addNode 方法加入哈希空间中,并根据 numReplicas 参数生成若干个虚拟节点。当一个新的数据需要映射到节点时,通过 getNode 方法找到顺时针方向上第一个虚拟节点,并返回对应的节点名称。如果节点列表为空,则返回 null。

可以参考以下示例实现分库分表的功能,例如:

javaCopy code
ConsistentHashing ch = new ConsistentHashing(128);

// 添加 100 个库
for (int i = 0; i < 100; i++) {
    String db = "db" + i;
    ch.addNode(db);
}

// 查找用户 123 所在的表
String user_id = "123";
String db = ch.getNode(user_id);
String table = "table" + (ch.hash(user_id) % 128);

// 将用户数据插入到对应的表中
insertData(db, table, user_id, data);

示例中,首先创建了一个一致性哈希算法对象,并向其中添加了 100 个库节点。接着,查找用户 123 所在的库节点,并根据用户 ID 的哈希值将数据插入到对应的表中。注意,为了将哈希值均匀地分散到 128 张表中,使用了取模运算。

假设需要进行数据扩容,由100库扩容为300库,单库分表数量不变,如何进行数据迁移?

当将分库分表的数量从 100 库扩容到 300 库时,为了保证数据的均匀分布,需要进行数据迁移。具体的迁移方案有很多种,下面是一种基于虚拟节点的方案。

将数据从原有的 100 个库迁移到新的 300 个库中,迁移的过程分为两个阶段:

  1. 数据复制:对于每个原有的库节点,将其上的数据复制到对应的新库节点中。为了避免数据倾斜,可以按照每个库节点上的数据量与总数据量的比例来确定复制到哪些新库节点上。
  2. 数据清理:等待所有数据复制完毕后,将原有的库节点从一致性哈希算法中删除,并将其上的数据删除。

由于使用了虚拟节点,节点的数量扩大后并不会对数据的分布产生较大的影响。同时可以通过调整虚拟节点数量的大小来平衡节点数量与数据均衡性之间的关系。

public class Sharding {
    private int dbCount; // 库的数量
    private int tableCount; // 每个库中表的数量
    private List<DataSource> dataSources; // 数据源列表

    public Sharding(int dbCount, int tableCount) {
        this.dbCount = dbCount;
        this.tableCount = tableCount;
        this.dataSources = new ArrayList<>();
        // 初始化数据源列表
        for (int i = 0; i < dbCount; i++) {
            DataSource dataSource = createDataSource();
            dataSources.add(dataSource);
        }
    }

    // 创建数据源
    private DataSource createDataSource() {
        // TODO: 创建数据源的代码
        return null;
    }

    // 获取数据源
    private DataSource getDataSource(int dbIndex) {
        return dataSources.get(dbIndex);
    }

    // 获取表名
    private String getTableName(int tableIndex) {
        return "user_" + tableIndex;
    }

    // 根据用户 ID 获取数据源和表名
    public Pair<DataSource, String> getDataSourceAndTableName(long userId) {
        int hash = Long.hashCode(userId);
        int dbIndex = (hash & 0xffff0000) >>> 16; // 前 16 位作为库的编号
        int tableIndex = hash & 0x0000ffff; // 后 16 位作为表的编号
        DataSource dataSource = getDataSource(dbIndex);
        String tableName = getTableName(tableIndex);
        return Pair.of(dataSource, tableName);
    }

    // 增加数据库
    public void addDatabase() {
        DataSource dataSource = createDataSource();
        dataSources.add(dataSource);
    }

    // 迁移数据到新的数据库中
    public void migrateData(int oldDbIndex, int newDbIndex) {
        DataSource oldDataSource = getDataSource(oldDbIndex);
        DataSource newDataSource = getDataSource(newDbIndex);

        for (int i = 0; i < tableCount; i++) {
            String tableName = getTableName(i);
            try (Connection conn1 = oldDataSource.getConnection();
                 Connection conn2 = newDataSource.getConnection();
                 PreparedStatement stmt1 = conn1.prepareStatement("SELECT * FROM " + tableName);
                 PreparedStatement stmt2 = conn2.prepareStatement("INSERT INTO " + tableName + " VALUES (?, ?, ?)")) {
                ResultSet rs = stmt1.executeQuery();
                while (rs.next()) {
                    long userId = rs.getLong("user_id");
                    String name = rs.getString("name");
                    int age = rs.getInt("age");
                    Pair<DataSource, String> pair = getDataSourceAndTableName(userId);
                    if (pair.getLeft() == oldDataSource && pair.getRight().equals(tableName)) {
                        stmt2.setLong(1, userId);
                        stmt2.setString(2, name);
                        stmt2.setInt(3, age);
                        stmt2.executeUpdate();
                    }
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

新增了addDatabase,migrateData方法:

addDatabase 方法用于增加一个新的数据库,它会调用 createDataSource 方法创建一个新的数据源,并将其添加到数据源列表中。

migrateData 方法用于将旧的数据库中的数据迁移到新的数据库中。它需要传入两个参数:旧数据库的编号和新数据库的编号。对于每个表,它会从旧数据库中读取数据,并将符合条件的数据插入到新数据库中。迁移数据的过程需要使用两个连接,因此需要分别获取旧数据库和新数据库的连接。

首先需要获取每个用户对应的数据源和表名。调用 getDataSourceAndTableName 方法来获取它们。如果一个用户的数据源是旧数据库,并且表名是当前正在处理的表名,则将该用户的数据插入到新数据库中。