likes
comments
collection
share

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

作者站长头像
站长
· 阅读数 23

记ShardingProxy分库分表查询的一次全流程源码剖析

引言

Apache ShardingSphere 是一款分布式的数据库生态系统, 可以将任意数据库转换为分布式数据库,并通过数据分片、弹性伸缩、加密等能力对原有数据库进行增强。

由于工作中需要对Proxy代理进行魔改,例如分库分表中blob字段需要转义(protoBin to json),特此学习Shadring的分库分表原理;

本文的价值是梳理Sharding分库分表的原理,以Proxy为切入点,对整个链路上的核心源码进行解释说明,用流程图来展示整个流程;

本次实验是拉取的master分支的代码;本文发布时间2023年8月。

Sharding Proxy概述

ShardingSphere-Proxy 的定位为透明化的数据库代理,理论上支持任何使用 MySQL、PostgreSQL、openGauss 协议的客户端操作数据,对异构语言、运维场景更友好。

例如:我们现在利用Shadring JDBC对数据库进行分库分表,操作数据库时需要对所有分库与分表进行聚合操作,较为麻烦。Proxy屏蔽了底层分库分表的逻辑,对外暴露如同操作单库单表一样方便;

进行分库分表实例查询

实验对象

我们将user表分4个表,并存储在4个库下,共计16张表;

数据库名:testdb_0,testdb_1,testdb_2,testdb_3

表名: user_0,user_1,user_2,user_3

CREATE TABLE `user` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `roleId` bigint(20) unsigned zerofill NOT NULL,
  `name` varchar(128) DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB;
分库分表配置
rules:
  - !SHARDING
    tables:
      user:
        actualDataNodes: testdb_${0..3}.user_${0..3}
        tableStrategy:
          standard:
            shardingColumn: roleId
            shardingAlgorithmName: user_inline
        keyGenerateStrategy:
          column: id
          keyGeneratorName: snowflake
        databaseStrategy:
          standard:
            shardingColumn: id
            shardingAlgorithmName: database_inline
    bindingTables:
      - user
    defaultDatabaseStrategy:
      standard:
        shardingColumn: id
        shardingAlgorithmName: database_inline
    defaultTableStrategy:
      none:

    shardingAlgorithms:
      database_inline:
        type: INLINE
        props:
          algorithm-expression: testdb_${id % 4}
      user_inline:
        type: INLINE
        props:
          algorithm-expression: user_${roleId % 4}

    keyGenerators:
      snowflake:
        type: SNOWFLAKE
查询链路流程图

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

后续源码分析均依照这张流程图,可对照阅读;

流程与源码剖析

当我们客户端连接Proxy后,给Proxy发送select * from user where id = 0 and roleId=0时;Proxy收到请求并进行处理;

Netty收到请求

我们可以看到,在pipeline上,我们将收到的Mysql查询协议数据包委托给ProxyStateContext进行执行;

// FrontendChannelInboundHandler.java
public void channelRead(final ChannelHandlerContext context, final Object message) {
    if (!authenticated) {
        authenticated = authenticate(context, (ByteBuf) message);
        return;
    }
    // 委托ProxyStateContext进行执行
    ProxyStateContext.execute(context, message, databaseProtocolFrontendEngine, connectionSession);
}
ProxyStateContext.execute执行请求任务

ProxyContext根据当前连接的状态,去执行相应的命令;当我们正常情况下连接状态是OKProxyState状态

这里的连接状态有三个:

OKProxyState:连接正常状态

LockProxyState:连接锁定状态

CircuitBreakProxyState:断路器状态

// ProxyStateContext.java
public static void execute(final ChannelHandlerContext context, final Object message,final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final ConnectionSession connectionSession) {
    // 委托ProxyState执行
    ProxyContext.getInstance().getInstanceStateContext().ifPresent(optional -> STATES.get(optional.getCurrentState()).execute(context, message, databaseProtocolFrontendEngine, connectionSession));
    
}
OKProxyState.execute() 异步处理任务

这里是将我们的请求放入执行线程池进行处理,避免阻塞Netty的工作线程;我们将任务包装成CommandExecutorTask,因此CommandExecutorTask中就是真正处理请求的逻辑;

// OKProxyState.java
public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final ConnectionSession connectionSession) {
        ExecutorService executorService = determineSuitableExecutorService(connectionSession);
        context.channel().config().setAutoRead(false);
    // 交给线程池处理
        executorService.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, connectionSession, context, message));
    }
CommandExecutorTask.run() 执行请求核心逻辑

在执行Task过程中,将执行查询逻辑用try catch封装,用于处理异常和执行完毕的清理逻辑;

// CommandExecutorTask.java
public void run() {
    try {
        // 执行请求
        isNeedFlush = executeCommand(context, databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message, context.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()));
    } catch (final Exception ex) {
        // 异常处理
        processException(ex);
    } finally {
        // 查询结束后的清理任务
        connectionSession.clearQueryContext();
    }
}
[第一部分] CommandExecutorTask.executeCommand()

如果将查询的逻辑分为:前置准备+实际查询,那么这里是封装了要执行SQL的前置准备工作;

我们考虑,我们要去底层分库分表查数据,那需要知道哪几方面的事情:

  • 对查询sql进行重新封装,解析要查询的表是什么,显示的字段是啥,查询条件是啥;
  • 我们底层是什么数据库;
  • 有一个能直接操作分库分表的handler,去执行相应的sql;

Proxy将需要的参数,用相应的Context对象进行封装,最终操作这些Context去执行查询逻辑;

// CommandExecutorTask.java
private boolean executeCommand(final ChannelHandlerContext context, final PacketPayload payload) throws SQLException {
    // 获取执行引擎 MySQLCommandExecuteEngine
    CommandExecuteEngine commandExecuteEngine = databaseProtocolFrontendEngine.getCommandExecuteEngine();
    // mysql包类型 COM_QUERY
    CommandPacketType type = commandExecuteEngine.getCommandPacketType(payload);
    // 解析数据包
    CommandPacket commandPacket = commandExecuteEngine.getCommandPacket(payload, type, connectionSession);
    // 包含JDBC连接,要执行的SQL语句等(此时没有进行分库分表重组sql)
    CommandExecutor commandExecutor = commandExecuteEngine.getCommandExecutor(type, commandPacket, connectionSession);
    return doExecuteCommand(context, commandExecuteEngine, commandExecutor);
}
- commandExecuteEngine.getCommandExecutor()
// MySQLCommandExecuteEngine.java
@Override
public CommandExecutor getCommandExecutor(final CommandPacketType type, final CommandPacket packet, final ConnectionSession connectionSession) throws SQLException {
    // 用工厂模式创建执行器
    return MySQLCommandExecutorFactory.newInstance((MySQLCommandPacketType) type, packet, connectionSession);
}

// MySQLCommandExecutorFactory.java
public static CommandExecutor newInstance(final MySQLCommandPacketType commandPacketType, final CommandPacket commandPacket, final ConnectionSession connectionSession) throws SQLException {
    switch (commandPacketType) {
        case COM_QUIT: 
            return new MySQLComQuitExecutor();
        case COM_INIT_DB:
            return new MySQLComInitDbExecutor((MySQLComInitDbPacket) commandPacket, connectionSession);
        case COM_FIELD_LIST:
            return new MySQLComFieldListPacketExecutor((MySQLComFieldListPacket) commandPacket, connectionSession);
        case COM_QUERY: // 查询命令
            return new MySQLComQueryPacketExecutor((MySQLComQueryPacket) commandPacket, connectionSession);
        case COM_PING:
            return new MySQLComPingExecutor(connectionSession);
        case COM_STMT_PREPARE:
            return new MySQLComStmtPrepareExecutor((MySQLComStmtPreparePacket) commandPacket, connectionSession);
        case COM_STMT_EXECUTE:
            return new MySQLComStmtExecuteExecutor((MySQLComStmtExecutePacket) commandPacket, connectionSession);
        case COM_STMT_SEND_LONG_DATA:
            return new MySQLComStmtSendLongDataExecutor((MySQLComStmtSendLongDataPacket) commandPacket, connectionSession);
        case COM_STMT_RESET:
            return new MySQLComStmtResetExecutor((MySQLComStmtResetPacket) commandPacket, connectionSession);
        case COM_STMT_CLOSE:
            return new MySQLComStmtCloseExecutor((MySQLComStmtClosePacket) commandPacket, connectionSession);
        case COM_SET_OPTION:
            return new MySQLComSetOptionExecutor((MySQLComSetOptionPacket) commandPacket, connectionSession);
        case COM_RESET_CONNECTION:
            return new MySQLComResetConnectionExecutor(connectionSession);
        default:
            return new MySQLUnsupportedCommandExecutor(commandPacketType);
    }

    // MySQLComQueryPacketExecutor.java
    public MySQLComQueryPacketExecutor(final MySQLComQueryPacket packet, final ConnectionSession connectionSession) throws SQLException {
        this.connectionSession = connectionSession;
        DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "MySQL");
        // sql语法分析器分析SQL语句
        // SQLStatement只是一个接口,根据SQL的类型生成对应的实现类
        // 例如select * from user对应的就是MySQLSelectStatement
        // 但是他只是从sql字面语义,拿不到其他信息,后面会重新组装新的SQLStatement
        // 例如projections中没有表结构信息 比如select * 就没有数据,需要后面将表结构信息填充进去
        // 如果 select user.id roleId from user; projections中就有字面数据,id列会有own表示来自user表,roleId不会有own,代表默认user表
        SQLStatement sqlStatement = parseSQL(packet.getSQL(), databaseType);
        // 是否含有多条语句
        proxyBackendHandler = areMultiStatements(connectionSession, sqlStatement, packet.getSQL()) ? new MySQLMultiStatementsHandler(connectionSession, sqlStatement, packet.getSQL())
            // 创建了一个DatabaseConnector 相当于上述的封装,毕竟上述的都是局部变量
            : ProxyBackendHandlerFactory.newInstance(databaseType, packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
        characterSet = connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
    }

由于我们是一条查询语句,因此最终会创建一个MySQLComQueryPacketExecutor对象;

这个构造函数里做了几个事情:

  • 保存当前的connectionSession:主要是当前代理的一些连接信息;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

  • 解析字面SQL,就是根据SQL语句的字面含义去解析,生成SQLStatement对象;底层就是用AST语法树对sql进行解析的;

    分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

  • 创建proxyBackendHandler:它相当于管理了真实数据库操作对象,可执行SQL等重要信息;可谓重中之重。这里创建的就是DatabaseConnector;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

- ProxyBackendHandlerFactory.newInstance()

ProxyBackendHandler是一个接口,他实际是执行底层逻辑的handler,我们这里创建了DatabaseConnector

  • 重新封装SQLStatementContext;
  • 组装查询上下文QueryContext;对上面的SQLStatementContext的封装;
  • 创建ProxyBackendHandler;实际创建的DatabaseConnector;
/**
 * 代理后端处理器接口,用于执行后端数据库的命令和处理结果。
 */
public interface ProxyBackendHandler {

    /**
     * 执行命令并返回后端响应。
     *
     * @return 后端响应
     * @throws SQLException SQL 异常
     */
    ResponseHeader execute() throws SQLException;

    /**
     * 移动到下一个结果值。
     *
     * @return 是否还有更多的结果值
     * @throws SQLException SQL 异常
     */
    default boolean next() throws SQLException {
        return false;
    }

    /**
     * 获取行数据。
     *
     * @return 行数据
     * @throws SQLException SQL 异常
     */
    default QueryResponseRow getRowData() throws SQLException {
        return new QueryResponseRow(Collections.emptyList());
    }

    /**
     * 关闭处理器。
     *
     * @throws SQLException SQL 异常
     */
    default void close() throws SQLException {
    }
}

// ProxyBackendHandlerFactory.java
public static ProxyBackendHandler newInstance(final DatabaseType databaseType, final String sql, final SQLStatement sqlStatement,
                                              final ConnectionSession connectionSession, final HintValueContext hintValueContext) throws SQLException {
    if (sqlStatement instanceof EmptyStatement) {
        return new SkipBackendHandler(sqlStatement);
    }
    // 重新组装的SqlStatement 生成的SQL上下文
    // 这里projections包含了表里的列信息,是从表元数据中获取的
    // 根据 不同类型的SQL生成对应的上下文,方便后续根据上下文断定sql的含义,例如是插入还是查询
    SQLStatementContext sqlStatementContext = sqlStatement instanceof DistSQLStatement ? new DistSQLStatementContext((DistSQLStatement) sqlStatement)
        : new SQLBindEngine(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData(), connectionSession.getDefaultDatabaseName()).bind(sqlStatement,
                        Collections.emptyList());
    // 生成查询上下文 对上一个的进一步封装
    QueryContext queryContext = new QueryContext(sqlStatementContext, sql, Collections.emptyList(), hintValueContext);
    connectionSession.setQueryContext(queryContext);
    // DatabaseConnector 创建一个数据库连接
    return newInstance(databaseType, queryContext, connectionSession, false);
}
- 创建SQL描述上下文SQLStatementContext

Sharding喜欢将所有需要准备的对象都封装成Context上下文,这里SQL描述上下文是对先前字面解析的SqlStatement的重新包装,例如图中的projections包含了表里的列信息,是从表元数据中获取的,而先前字面解析获得的SqlStatement是没有的;

这里根据 不同类型的SQL生成对应的上下文,方便后续根据上下文断定sql的含义,例如是插入语句还是查询语句;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

创建SQLStatementContext走的是SQLBindEngine的bind方法;

  • 首先他会根据表结构以及字面SQLStatement去重新组装SQLStatement;例如我们的DML语句,只针对新增和查询去做相应的处理;
  • 根据SQL类型,封装对应的SQL上下文。

1.重新组装SQLStatement

// SQLBindEngine.java
public SQLStatementContext bind(final SQLStatement sqlStatement, final List<Object> params) {
    // 重新组装的SQLStatement 包含了表结构信息
    SQLStatement buoundedSQLStatement = bind(sqlStatement, metaData, defaultDatabaseName);
    // 根据SQL类型,封装对应的SQL上下文
    return SQLStatementContextFactory.newInstance(metaData, params, buoundedSQLStatement, defaultDatabaseName);
}

// 根据字面SQLStatement的类型是DDL还是DML语句创建对应的新的SQLStatement
private SQLStatement bind(final SQLStatement statement, final ShardingSphereMetaData metaData, final String defaultDatabaseName) {
    if (containsDataSourceNameSQLHint(statement)) {
        return statement;
    }
    if (statement instanceof DMLStatement) {
        return bindDMLStatement(statement, metaData, defaultDatabaseName);
    }
    if (statement instanceof DDLStatement) {
        return bindDDLStatement(statement, metaData, defaultDatabaseName);
    }
    return statement;
}
// 重新组装SQLStatement
private static SQLStatement bindDMLStatement(final SQLStatement statement, final ShardingSphereMetaData metaData, final String defaultDatabaseName) {
    // 查询的
    if (statement instanceof SelectStatement) {
        return new SelectStatementBinder().bind((SelectStatement) statement, metaData, defaultDatabaseName);
    }
    // 新增的
    if (statement instanceof InsertStatement) {
        return new InsertStatementBinder().bind((InsertStatement) statement, metaData, defaultDatabaseName);
    }
    // 其余的
    return statement;
}

我们看到重新组装的逻辑实际上类似于clone(),只是额外填充了一些需要的数据,代码注释中有写;

// SelectStatementBinder.java
public SelectStatement bind(final SelectStatement sqlStatement, final ShardingSphereMetaData metaData, final String defaultDatabaseName) {
    // 重新实例化一个新的SelectStatement
    SelectStatement result = sqlStatement.getClass().getDeclaredConstructor().newInstance();
    // 后续保存表结构信息的Map
    Map<String, TableSegmentBinderContext> tableBinderContexts = new CaseInsensitiveMap<>();
    // 重新Copy一个TableSegment,并且将表结构的列信息存入tableBinderContexts中
    // 它里面指明了表名在sql的起始位置,以及相关信息例如表名,库名等
    TableSegment boundedTableSegment = TableSegmentBinder.bind(sqlStatement.getFrom(), metaData, defaultDatabaseName, sqlStatement.getDatabaseType(), tableBinderContexts);
    // 设置来源表信息
    result.setFrom(boundedTableSegment);
    // 设置查询的信息 例如* 就是*在sql的起始位置,顺便会把表的结构信息放入其中
    result.setProjections(ProjectionsSegmentBinder.bind(sqlStatement.getProjections(), boundedTableSegment, tableBinderContexts));
    // TODO support other segment bind in select statement
    sqlStatement.getWhere().ifPresent(result::setWhere);
    sqlStatement.getGroupBy().ifPresent(result::setGroupBy);
    sqlStatement.getHaving().ifPresent(result::setHaving);
    sqlStatement.getOrderBy().ifPresent(result::setOrderBy);
    sqlStatement.getCombine().ifPresent(optional -> result.setCombine(CombineSegmentBinder.bind(optional, metaData, defaultDatabaseName)));
    SelectStatementHandler.getLimitSegment(sqlStatement).ifPresent(optional -> SelectStatementHandler.setLimitSegment(result, optional));
    SelectStatementHandler.getLockSegment(sqlStatement).ifPresent(optional -> SelectStatementHandler.setLockSegment(result, optional));
    SelectStatementHandler.getWindowSegment(sqlStatement).ifPresent(optional -> SelectStatementHandler.setWindowSegment(result, optional));
    SelectStatementHandler.getWithSegment(sqlStatement).ifPresent(optional -> SelectStatementHandler.setWithSegment(result, optional));
    SelectStatementHandler.getModelSegment(sqlStatement).ifPresent(optional -> SelectStatementHandler.setModelSegment(result, optional));
    result.addParameterMarkerSegments(sqlStatement.getParameterMarkerSegments());
    result.getCommentSegments().addAll(sqlStatement.getCommentSegments());
    // 将新创建的SelectStatement返回
    return result;
}

2.创建SQLStatementContext上下文

我们可以看到,依据不同类型的SQL,创建对应的上下文;后续只需要根据上下文的类型,就能快速区分SQL对应的含义;

public static SQLStatementContext newInstance(final ShardingSphereMetaData metaData, final List<Object> params, final SQLStatement sqlStatement, final String defaultDatabaseName) {
    if (sqlStatement instanceof DMLStatement) {
        return getDMLStatementContext(metaData, params, (DMLStatement) sqlStatement, defaultDatabaseName);
    }
    if (sqlStatement instanceof DDLStatement) {
        return getDDLStatementContext(metaData, params, (DDLStatement) sqlStatement, defaultDatabaseName);
    }
    if (sqlStatement instanceof DCLStatement) {
        return getDCLStatementContext((DCLStatement) sqlStatement);
    }
    if (sqlStatement instanceof DALStatement) {
        return getDALStatementContext((DALStatement) sqlStatement);
    }
    return new UnknownSQLStatementContext(sqlStatement);
}

private static SQLStatementContext getDMLStatementContext(final ShardingSphereMetaData metaData,final List<Object> params, final DMLStatement sqlStatement, final String defaultDatabaseName) {
    if (sqlStatement instanceof SelectStatement) {
        return new SelectStatementContext(metaData, params, (SelectStatement) sqlStatement, defaultDatabaseName);
    }
    if (sqlStatement instanceof UpdateStatement) {
        return new UpdateStatementContext((UpdateStatement) sqlStatement);
    }
    if (sqlStatement instanceof DeleteStatement) {
        return new DeleteStatementContext((DeleteStatement) sqlStatement);
    }
    if (sqlStatement instanceof InsertStatement) {
        return new InsertStatementContext(metaData, params, (InsertStatement) sqlStatement, defaultDatabaseName);
    }
    if (sqlStatement instanceof CallStatement) {
        return new CallStatementContext((CallStatement) sqlStatement);
    }
    if (sqlStatement instanceof CopyStatement) {
        return new CopyStatementContext((CopyStatement) sqlStatement);
    }
    if (sqlStatement instanceof DoStatement) {
        return new DoStatementContext((DoStatement) sqlStatement);
    }
    if (sqlStatement instanceof MySQLLoadDataStatement) {
        return new LoadDataStatementContext((MySQLLoadDataStatement) sqlStatement);
    }
    if (sqlStatement instanceof MySQLLoadXMLStatement) {
        return new LoadXMLStatementContext((MySQLLoadXMLStatement) sqlStatement);
    }
    throw new UnsupportedSQLOperationException(String.format("Unsupported SQL statement `%s`", sqlStatement.getClass().getSimpleName()));
}
- 创建查询上下文QueryContext

可以理解为就是对上述的SQL描述上下文的封装

// QueryContext的构造函数
public QueryContext(final SQLStatementContext sqlStatementContext, final String sql, final List<Object> params, final HintValueContext hintValueContext, final boolean useCache) {
    this.sqlStatementContext = sqlStatementContext;
    this.sql = sql;
    parameters = params;
    databaseName = sqlStatementContext instanceof TableAvailable ? ((TableAvailable) sqlStatementContext).getTablesContext().getDatabaseName().orElse(null) : null;
    schemaName = sqlStatementContext instanceof TableAvailable ? ((TableAvailable) sqlStatementContext).getTablesContext().getSchemaName().orElse(null) : null;
    this.hintValueContext = sqlStatementContext.getSqlStatement() instanceof AbstractSQLStatement && !((AbstractSQLStatement) sqlStatementContext.getSqlStatement()).getCommentSegments().isEmpty()
        ? SQLHintUtils.extractHint(((AbstractSQLStatement) sqlStatementContext.getSqlStatement()).getCommentSegments().iterator().next().getText()).orElse(hintValueContext)
        : hintValueContext;
    this.useCache = useCache;
}
- 创建代理执行器 ProxyBackendHandler

这里的代码我精简了一下,剔除了一些无用的逻辑;最终我们创建ProxyBackendHandler的实现类DatabaseConnector,他包含了我们的JDBC连接,查询上下文等信息;作为后续Proxy处理分库分表操作的重要handler;

// ProxyBackendHandlerFactory.java
// 我们调用的上层
return newInstance(databaseType, queryContext, connectionSession, false);

// 创建的地方
public static ProxyBackendHandler newInstance(final DatabaseType databaseType, final QueryContext queryContext, final ConnectionSession connectionSession,final boolean preferPreparedStatement) throws SQLException {
    DatabaseBackendHandlerFactory.newInstance(queryContext, connectionSession, preferPreparedStatement)
}

// DatabaseBackendHandlerFactory.java
public static DatabaseBackendHandler newInstance(final QueryContext queryContext, final ConnectionSession connectionSession, final boolean preferPreparedStatement) {
    return DatabaseConnectorFactory.getInstance().newInstance(queryContext, connectionSession.getDatabaseConnectionManager(), preferPreparedStatement);
}

// DatabaseConnectorFactory.java
public DatabaseConnector newInstance(final QueryContext queryContext, final ProxyDatabaseConnectionManager databaseConnectionManager, final boolean preferPreparedStatement) {
    // 获取全局分库数据信息
    ShardingSphereDatabase database = ProxyContext.getInstance().getDatabase(databaseConnectionManager.getConnectionSession().getDatabaseName());
    String driverType = preferPreparedStatement || !queryContext.getParameters().isEmpty() ? JDBCDriverType.PREPARED_STATEMENT : JDBCDriverType.STATEMENT;
    // 包含了JDBC的连接
    DatabaseConnector result = new DatabaseConnector(driverType, database, queryContext, databaseConnectionManager);
    databaseConnectionManager.add(result);
    return result;
}

至此,我们命令执行器的初始化逻辑就走完了;我们最终封装好了CommandExecutor;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

我们来回顾一下入口代码:

// CommandExecutorTask.java
private boolean executeCommand(final ChannelHandlerContext context, final PacketPayload payload) throws SQLException {
    // 获取执行引擎 MySQLCommandExecuteEngine
    CommandExecuteEngine commandExecuteEngine = databaseProtocolFrontendEngine.getCommandExecuteEngine();
    // mysql包类型 COM_QUERY
    CommandPacketType type = commandExecuteEngine.getCommandPacketType(payload);
    // 数据包
    CommandPacket commandPacket = commandExecuteEngine.getCommandPacket(payload, type, connectionSession);
    // 包含JDBC连接,要执行的SQL语句等(此时没有进行分库分表重组sql)
    CommandExecutor commandExecutor = commandExecuteEngine.getCommandExecutor(type, commandPacket, connectionSession);
    // 【接下来开始梳理这一部分逻辑】
    return doExecuteCommand(context, commandExecuteEngine, commandExecutor);
}
[第二部分] doExecuteCommand()

如果说上面都是准备工作,那这里就是真正需要执行SQL,数据归并返回的逻辑;

  • 查询器执行SQL查询操作,底层会根据分库分表规则路由,确定执行的SQL语句;然后对结果进行合并,采用的是迭代器模式;
  • 数据头写入;例如查询到的表头列;id, roleId, age
  • 数据值写入;就是对应的行值;
  • 清理操作;
// CommandExecutorTask.java
private boolean doExecuteCommand(final ChannelHandlerContext context, final CommandExecuteEngine commandExecuteEngine, final CommandExecutor commandExecutor) throws SQLException {
    try {
        // 执行底层的DB查询和上层的数据合并,最终将代理数据返回
        Collection<? extends DatabasePacket> responsePackets = commandExecutor.execute();
        if (responsePackets.isEmpty()) {
            return false;
        }
        // 写入数据头
        responsePackets.forEach(context::write);
        // 对于是查询数据 写入查询结果
        if (commandExecutor instanceof QueryCommandExecutor) {
            commandExecuteEngine.writeQueryData(context, connectionSession.getDatabaseConnectionManager(), (QueryCommandExecutor) commandExecutor, responsePackets.size());
        }
        return true;
    } catch (final SQLException | ShardingSphereSQLException | SQLDialectException ex) {
        databaseProtocolFrontendEngine.handleException(connectionSession, ex);
        throw ex;
    } finally {
        commandExecutor.close();
    }
}
- 执行查询操作

实际上就是委托我们的DatabaseConnector去执行execute()方法;

// MySQLComQueryPacketExecutor.java
public Collection<DatabasePacket> execute() throws SQLException {
    // 这里是前面创建的 DatabaseConnector
    ResponseHeader responseHeader = proxyBackendHandler.execute();
    if (responseHeader instanceof QueryResponseHeader) {
        // 返回查询表头数据包
        return processQuery((QueryResponseHeader) responseHeader);
    }
    responseType = ResponseType.UPDATE;
    return processUpdate((UpdateResponseHeader) responseHeader);
}

// DatabaseConnector.java
public ResponseHeader execute() throws SQLException {
    // 生成执行上下文
    // 将sql进行组装,生成分库分表下需要执行的sql信息
    Collection<ExecutionContext> executionContexts = generateExecutionContexts();
    // 执行SQL数据,jdbc查询,合并查询结果
    return doExecute(executionContexts);
}
- 生成通用执行上下文

这里是通过核心处理器KernelProcessor来生成执行上下文的;这里我们将查询上下文,分库数据库对象ShardingSphereDatabase,分库规则元数据传递给核心是处理器,由他帮我们进行SQL的路由;

// DatabaseConnector
private Collection<ExecutionContext> generateExecutionContexts() {
    Collection<ExecutionContext> result = new LinkedList<>();
    MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
    // 生成执行器上下文-包含分库数据源,分库分表名,重写数据库,查询上下文
    ExecutionContext executionContext = new KernelProcessor().generateExecutionContext(queryContext, database, metaDataContexts.getMetaData().getGlobalRuleMetaData(),
                                                                                       metaDataContexts.getMetaData().getProps(), databaseConnectionManager.getConnectionSession().getConnectionContext());
    result.add(executionContext);
    return result;
}

生成通用上下文的主要步骤

  • 路由;创建路由规则-对数据库和数据表进行分片;
  • SQL重写;注意替换sql只是替换表名,不会对额外查询条件做变动;其中表名的替换取决于是否有分表列条件;
  • 对上面的数据封装成ExecutionContext;
  • 打印一下重写的sql日志;
// KernelProcessor.java
public ExecutionContext generateExecutionContext(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData,
                                                 final ConfigurationProperties props, final ConnectionContext connectionContext) {
    // 创建路由规则-对数据库和数据表进行分片
    RouteContext routeContext = route(queryContext, database, globalRuleMetaData, props, connectionContext);
    // 重写SQLUnit
    // 对4个库下的所有表进行联查,最终返回的SQL只有4个(每个库对应一个)
    // select * from user_0 UNION ALL select * from user_1 UNION ALL select * from user_2 UNION ALL select * from user_3
    // 注意替换sql只是替换表名 不会对额外查询条件做变动
    // 表名的替换取决于是否有分表列条件
    SQLRewriteResult rewriteResult = rewrite(queryContext, database, globalRuleMetaData, props, routeContext, connectionContext);
    // 对上面的数据封装成ExecutionContext
    ExecutionContext result = createExecutionContext(queryContext, database, routeContext, rewriteResult);
    // 打一个日志
    logSQL(queryContext, props, result);
    return result;
}
- SQL分片 之 委托执行创建路由上下文

路由的意思就是说,目前根据SQL查询的条件和分库分表的规则,确定到底需要查询哪些库下的哪些表;

这里我们看到,我们实际上是通用SQL路由引擎SQLRouteEngine,交由PartialSQLRouteExecutor路由执行器帮我们进行路由操作;

PartialSQLRouteExecutor的执行逻辑

  • 先走 ShardingSQLRoute的createRouteContext()
  • 再走SingleSQLRoute的 decorateRouteContext();当我们走分库逻辑的时候这里实际无意义;
// KernelProcessor.java
private RouteContext route(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData, final ConfigurationProperties props, final ConnectionContext connectionContext) {
    // 通过路由引擎获取路由结果
    return new SQLRouteEngine(database.getRuleMetaData().getRules(), props).route(connectionContext, queryContext, globalRuleMetaData, database);
}

// SQLRouteEngine.java
public RouteContext route(final ConnectionContext connectionContext, final QueryContext queryContext, final ShardingSphereRuleMetaData globalRuleMetaData, final ShardingSphereDatabase database) {
    // 判断SQL是否是获取schema的SQL,例如查询表结构等是获取schema的SQL
    SQLRouteExecutor executor = isNeedAllSchemas(queryContext.getSqlStatementContext().getSqlStatement()) ? new AllSQLRouteExecutor() : new PartialSQLRouteExecutor(rules, props);
    // 实际上走的是 PartialSQLRouteExecutor
    return executor.route(connectionContext, queryContext, globalRuleMetaData, database);
}

// PartialSQLRouteExecutor.java
public RouteContext route(final ConnectionContext connectionContext, final QueryContext queryContext, final ShardingSphereRuleMetaData globalRuleMetaData, final ShardingSphereDatabase database) {
    RouteContext result = new RouteContext();
    Optional<String> dataSourceName = findDataSourceByHint(queryContext.getHintValueContext(), database.getResourceMetaData().getDataSources());
    if (dataSourceName.isPresent()) {
        result.getRouteUnits().add(new RouteUnit(new RouteMapper(dataSourceName.get(), dataSourceName.get()), Collections.emptyList()));
        return result;
    }
    // 这里获取到的是:ShardingRule->ShardingSQLRoute;
    // SingleRule->SingleSQLRoute;
    for (Entry<ShardingSphereRule, SQLRouter> entry : routers.entrySet()) {
        if (result.getRouteUnits().isEmpty()) {
            // ShardingSQLRoute走这里
            result = entry.getValue().createRouteContext(queryContext, globalRuleMetaData, database, entry.getKey(), props, connectionContext);
        } else {
            // SingleSQLRoute走这里-没啥逻辑
            entry.getValue().decorateRouteContext(result, queryContext, database, entry.getKey(), props, connectionContext);
        }
    }
    if (result.getRouteUnits().isEmpty() && 1 == database.getResourceMetaData().getDataSources().size()) {
        String singleDataSourceName = database.getResourceMetaData().getDataSources().keySet().iterator().next();
        result.getRouteUnits().add(new RouteUnit(new RouteMapper(singleDataSourceName, singleDataSourceName), Collections.emptyList()));
    }
    return result;
}
- 创建路由上下文

这里主要做的是处理分片查询条件;就是说我们的where条件语句中是否包含分片规则中的列;交给WhereClauseShardingConditionEngine去分析当前where条件中是否有分片分表规则;

其次就是创建RouteContext;真正 去根据上面的分片条件去确定待查询的数据库和表名;

// ShardingSQLRoute
private RouteContext createRouteContext0(final QueryContext queryContext, final ShardingSphereRuleMetaData globalRuleMetaData, final ShardingSphereDatabase database, final ShardingRule rule,final ConfigurationProperties props, final ConnectionContext connectionContext) {
    SQLStatement sqlStatement = queryContext.getSqlStatementContext().getSqlStatement();
    // 组装查询条件
    ShardingConditions shardingConditions = createShardingConditions(queryContext, globalRuleMetaData, database, rule);
    // 创建路由上下文-这里是由ShardingStandardRoutingEngine创建的路由上下文
    RouteContext result = ShardingRouteEngineFactory.newInstance(rule, database, queryContext, shardingConditions, props, connectionContext, globalRuleMetaData).route(rule);
    return result;
}

private ShardingConditions createShardingConditions(final QueryContext queryContext,
                                                    final ShardingSphereRuleMetaData globalRuleMetaData, final ShardingSphereDatabase database, final ShardingRule rule) {
    List<ShardingCondition> shardingConditions;
    // 组装查询条件
    shardingConditions = new ShardingConditionEngine(globalRuleMetaData, database, rule).createShardingConditions(queryContext.getSqlStatementContext(), queryContext.getParameters());
    return new ShardingConditions(shardingConditions, queryContext.getSqlStatementContext(), rule);
}

// ShardingConditionEngine.java
public List<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final List<Object> params) {
    TimestampServiceRule timestampServiceRule = globalRuleMetaData.getSingleRule(TimestampServiceRule.class);
    // 判定是插入还是插入
    return sqlStatementContext instanceof InsertStatementContext
        ? new InsertClauseShardingConditionEngine(database, shardingRule, timestampServiceRule).createShardingConditions((InsertStatementContext) sqlStatementContext, params)
        // 查询走这里
        : new WhereClauseShardingConditionEngine(database, shardingRule, timestampServiceRule).createShardingConditions(sqlStatementContext, params);
}

我们分析下他根据where条件和分库分表规则去创建分割条件的原理;

例如select * from user where id=1 and roleId=1;

我们看到返回的是一个分片条件ShardingCondition,但这一个条件对象values里面有两个条件值;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

例如select * from user where id=1 or roleId=1; 返回两个分片条件; 分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

例如select * from user where id=1 and roleId=1 or name="123";返回为empty,表示全库全表查询,不需要分片;

// WhereClauseShardingConditionEngine.java
public List<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final List<Object> params) {
    if (!(sqlStatementContext instanceof WhereAvailable)) {
        return Collections.emptyList();
    }
    Collection<ColumnSegment> columnSegments = ((WhereAvailable) sqlStatementContext).getColumnSegments();
    String defaultSchemaName = new DatabaseTypeRegistry(sqlStatementContext.getDatabaseType()).getDefaultSchemaName(database.getName());
    ShardingSphereSchema schema = sqlStatementContext.getTablesContext().getSchemaName()
        .map(database::getSchema).orElseGet(() -> database.getSchema(defaultSchemaName));
    Map<String, String> columnExpressionTableNames = sqlStatementContext.getTablesContext().findTableNamesByColumnSegment(columnSegments, schema);
    List<ShardingCondition> result = new ArrayList<>();
    // 循环where 条件 生成查询条件对象
    // 这里只生成和分片列有关的条件对象
    // 比如id,roleId是分库分布表列
    // 若id=1and roleId=2 and name=3 则只生成id=1 and roleId=2的查询条件对象
    // 若id=1 and roleId=2 or name=3 则不会生成任何查询条件对象,因为需要全库全表查询
    for (WhereSegment each : ((WhereAvailable) sqlStatementContext).getWhereSegments()) {
        result.addAll(createShardingConditions(each.getExpr(), params, columnExpressionTableNames));
    }
    // 将创建好的查询条件返回
    return result;
}
- 真正的分片操作

根据sql的查询条件和分片规则,确定最终需要路由的数据库和表名;

  • 获取符合的数据库;根据前面创建的分片条件 以及 数据库分片规则,计算满足条件的数据库分片后的名字;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

  • 循环符合条件的数据库名,判定该库下,符合条件的表的名称;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

  • 将库和表名组成DataNode;只包含数据库名和表名;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

  • 将所有的DataNode封装成RouteUnit,放入RoouteContext中

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

// ShardingStandardRoutingEngine.java
public RouteContext route(final ShardingRule shardingRule) {
    // 创建路由上下文
    RouteContext result = new RouteContext();
    // 获取待计算 好的查询节点
    Collection<DataNode> dataNodes = getDataNodes(shardingRule, shardingRule.getTableRule(logicTableName));
    result.getOriginalDataNodes().addAll(originalDataNodes);
    for (DataNode each : dataNodes) {
        // 将分库分表的DataNode组装成 RouteUnit 包含库名和表名
        result.getRouteUnits().add(
            new RouteUnit(new RouteMapper(each.getDataSourceName(), each.getDataSourceName()), Collections.singleton(new RouteMapper(logicTableName, each.getTableName()))));
    }
    return result;
}

// 数据库,表分片
private Collection<DataNode> getDataNodes(final ShardingRule shardingRule, final TableRule tableRule) {
    // 获取数据库分片策略:包含分片字段和分片算法
    ShardingStrategy databaseShardingStrategy = createShardingStrategy(shardingRule.getDatabaseShardingStrategyConfiguration(tableRule),
                                                                       shardingRule.getShardingAlgorithms(), shardingRule.getDefaultShardingColumn());
    // 获取表的分片策略
    ShardingStrategy tableShardingStrategy = createShardingStrategy(shardingRule.getTableShardingStrategyConfiguration(tableRule),
                                                                    shardingRule.getShardingAlgorithms(), shardingRule.getDefaultShardingColumn());
    if (isRoutingByHint(shardingRule, tableRule)) {
        return routeByHint(tableRule, databaseShardingStrategy, tableShardingStrategy);
    }
    if (isRoutingByShardingConditions(shardingRule, tableRule)) {
        // 走这里
        return routeByShardingConditions(shardingRule, tableRule,databaseShardingStrategy, tableShardingStrategy);
    }
    return routeByMixedConditions(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
}

// 判定是否是带查询条件的了路由
private Collection<DataNode> routeByShardingConditions(final ShardingRule shardingRule, final TableRule tableRule,
                                                       final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
    return shardingConditions.getConditions().isEmpty()
        // 当没有查询条件时 根据分库分表策略组装分库分表DataNode返回
        // 注意这里分表,分库的条件传的空集合
        ? route0(tableRule, databaseShardingStrategy, Collections.emptyList(), tableShardingStrategy, Collections.emptyList())
        // 带有查询条件时
        // 实际会走route0 只是分库分表不是空集合
        : routeByShardingConditionsWithCondition(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
}

// 带条件的库分片
private Collection<DataNode> routeByShardingConditionsWithCondition(final ShardingRule shardingRule, final TableRule tableRule,
                                                                    final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
    Collection<DataNode> result = new LinkedList<>();
    // 循环所有的查询条件
    for (ShardingCondition each : shardingConditions.getConditions()) {
        Collection<DataNode> dataNodes = route0(tableRule,
                                                // 根据分库策略,获取分库条件
                                                databaseShardingStrategy, getShardingValuesFromShardingConditions(shardingRule, databaseShardingStrategy.getShardingColumns(), each),
                                                // 根据分表策略,获取分表条件
                                                tableShardingStrategy, getShardingValuesFromShardingConditions(shardingRule, tableShardingStrategy.getShardingColumns(), each));
        result.addAll(dataNodes);
        originalDataNodes.add(dataNodes);
    }
    return result;
}

// 核心分片逻辑
private Collection<DataNode> route0(final TableRule tableRule,
                                    final ShardingStrategy databaseShardingStrategy, final List<ShardingConditionValue> databaseShardingValues,
                                    final ShardingStrategy tableShardingStrategy, final List<ShardingConditionValue> tableShardingValues) {
    // 根据分库规则 返回满足条件的数据库名称
    Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingStrategy, databaseShardingValues);
    Collection<DataNode> result = new LinkedList<>();
    // 遍历满足条件的数据库
    for (String each : routedDataSources) {
        // 将对该库进行分表,并将结果组装DataNode 返回
        result.addAll(routeTables(tableRule, each, tableShardingStrategy, tableShardingValues));
    }
    return result;
}
- SQL重写

这里实际上是委托sqlRewriteEntry来执行重写,重写的逻辑就是依据上面的RouteUnit来分析是否重写,如果一个库下的所有表都需要查询,则直接发送一条union all语句来进行单个库下的聚合查询;

这里重写主要是对表名的重写;将原始的逻辑表名替换成真实待查表名;

SQLRewriteResult对象的重写Units集合中,Key是RouteUnit,Value是重写的SQL;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

// KernelProcessor.java
private SQLRewriteResult rewrite(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData,
                                 final ConfigurationProperties props, final RouteContext routeContext, final ConnectionContext connectionContext) {
    SQLRewriteEntry sqlRewriteEntry = new SQLRewriteEntry(database, globalRuleMetaData, props);
    return sqlRewriteEntry.rewrite(queryContext.getSql(), queryContext.getParameters(), queryContext.getSqlStatementContext(), routeContext, connectionContext, queryContext.getHintValueContext());
}
- 创建执行上下文

实际上就是对查询上下文,分库对象,路由上下文,查询结构进行封装;

// KernelProcessor.java
private ExecutionContext createExecutionContext(final QueryContext queryContext, final ShardingSphereDatabase database, final RouteContext routeContext, final SQLRewriteResult rewriteResult) {
    return new ExecutionContext(queryContext, ExecutionContextBuilder.build(database, rewriteResult, queryContext.getSqlStatementContext()), routeContext);
}
- 执行查询

我们对上述的执行上下文ExecutionContext做执行操作;

  • 执行JDBC的查询逻辑;这里是执行分库的查询操作,捕获查询到的结果返回;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

  • 刷新元数据,是对于创表,修改索引等sql语句,重新维护表结构。我们在Proxy启动的时候,对分库分表的真实表结构进行维护,这里应该是更新这块的缓存;例如执行了alter table user add columnday int(11) default 0;更新表结构sql,他会将最新的表结构刷新到Proxy的缓存中;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析

  • 组装请求头;将列信息,根据mysql的返回和本地表结构缓存,创建表头数据返回;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析
  • 归并排序集合;将JDBC的查询结果,通过迭代器模式进行封装,保存在DatabaseConnector对象中,后续通过遍历该结果集获取输出结果;

分库分表源码剖析 | 记ShardingProxy分库分表查询的一次全流程源码剖析
// DatabaseConnector.java
private ResponseHeader doExecute(final Collection<ExecutionContext> executionContexts) throws SQLException {
    ResponseHeader result = null;
    for (ExecutionContext each : executionContexts) {
        // 执行SQL
        ResponseHeader responseHeader = doExecute(each);
        if (null == result) {
            result = responseHeader;
        }
    }
    return result;
}

// 实际执行逻辑
private ResponseHeader doExecute(final ExecutionContext executionContext) throws SQLException {
    // 真正执行SQL
    List result = proxySQLExecutor.execute(executionContext);
    // 刷新元数据
    refreshMetaData(executionContext);
    Object executeResultSample = result.iterator().next();
    // 查询后 创建返回数据包的表头和合并查询结果
    return executeResultSample instanceof QueryResult ? processExecuteQuery(executionContext, result, (QueryResult) executeResultSample) : processExecuteUpdate(executionContext, result);
}

// 执行查询请求
private QueryResponseHeader processExecuteQuery(final ExecutionContext executionContext, final List<QueryResult> queryResults, final QueryResult queryResultSample) throws SQLException {
    // 根据查询结果和表结构组装表头数据包
    queryHeaders = createQueryHeaders(executionContext, queryResultSample);
    // 合并查询结果
    mergedResult = mergeQuery(executionContext.getSqlStatementContext(), queryResults);
    return new QueryResponseHeader(queryHeaders);
}

// 组装查询表头
private List<QueryHeader> createQueryHeaders(final ExecutionContext executionContext, final QueryResult queryResultSample) throws SQLException {
    int columnCount = getColumnCount(executionContext, queryResultSample);
    List<QueryHeader> result = new ArrayList<>(columnCount);
    QueryHeaderBuilderEngine queryHeaderBuilderEngine = new QueryHeaderBuilderEngine(database.getProtocolType());
    for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
        result.add(createQueryHeader(queryHeaderBuilderEngine, executionContext, queryResultSample, database, columnIndex));
    }
    return result;
}

// 合并查询结果
private MergedResult mergeQuery(final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
    MergeEngine mergeEngine = new MergeEngine(database, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getProps(),

[第三部分] 回写数据

前面我们已经走完了前置准备,数据查询,结果整理;现在需要将结果返回给client了;

我们回到这段代码,可以看到,当归并完结果后,表头数据会被封装成Mysql数据包并返回给上层;

// MySQLComQueryPacketExecutor.java
public Collection<DatabasePacket> execute() throws SQLException {
    // 执行路由查询
    ResponseHeader responseHeader = proxyBackendHandler.execute();
    if (responseHeader instanceof QueryResponseHeader) {
    	// 对于查询的走这里-对数据表头进行封装mysql数据包返回
        return processQuery((QueryResponseHeader) responseHeader);
    }
    responseType = ResponseType.UPDATE;
    return processUpdate((UpdateResponseHeader) responseHeader);
}

我们看到,这个时候回到了最初的查询任务中CommandExecutorTask;我们首先将 表头数据写到TCP缓冲区,再将查询到的结果,就是之前的迭代器数据写到TCP缓冲器,刷新缓冲区,结束一次查询过程。

// CommandExecutorTask.java
private boolean doExecuteCommand(final ChannelHandlerContext context, final CommandExecuteEngine commandExecuteEngine, final CommandExecutor commandExecutor) throws SQLException {
    try {
        // 执行路由分片并获取返回表头数据包
        Collection<? extends DatabasePacket> responsePackets = commandExecutor.execute();
        if (responsePackets.isEmpty()) {
            return false;
        }
        // 回写到TCP
        responsePackets.forEach(context::write);
        if (commandExecutor instanceof QueryCommandExecutor) {
            // 回写查询数据到TCP中
            commandExecuteEngine.writeQueryData(context, connectionSession.getDatabaseConnectionManager(), (QueryCommandExecutor) commandExecutor, responsePackets.size());
        }
        return true;
    } catch (final SQLException | ShardingSphereSQLException | SQLDialectException ex) {
        databaseProtocolFrontendEngine.handleException(connectionSession, ex);
        throw ex;
    } finally {
        commandExecutor.close();
    }
}

那么这里就是完成了一次查询的全过程!撒花!