likes
comments
collection
share

【伸手即用】从头手撸一个单体服务druid+mybatis跨数据源事务的优雅实现

作者站长头像
站长
· 阅读数 8

是跨数据源事务并不是分布事务,适用场景是单体服务多数据源,这种场景下如果引入seata无疑会加重整个服务

数据库连接池使用druid,多数据源使用dynamic-datasource-spring-boot-starter,jdbc驱动版本为5.x(8.x的驱动有大改动暂时不兼容,如果有需要可以评论留言,我再撸一个8.x的版本)

利用的原理是mysql的XA协议,大致原理为两段提交,具体的mysql实现XA协议的原理可以百度我也是一直半解,大致执行流程如下

客户端数据库A数据库BResourceManager(rm)标记分支事务开始执行application(ap),理解为sql语句rm标记分支事务结束结束ResourceManager(rm)标记分支事务开始执行aprm标记分支事务结束结束prepared 一阶段提交prepared 一阶段提交commit 二阶段提交commit 二阶段提交客户端数据库A数据库B

先来看一个小demo,简单的mysql jdbc驱动如何实现xa协议两端提交

public class MysqlXADemo {
    
    public static void main(String[] args) throws SQLException, InterruptedException {
        //true表示打印XA语句,,用于调试
        boolean logXaCommands = true;
        // 获得资源管理器操作接口实例 RM1
        Connection conn1 = DriverManager.getConnection
                ("jdbc:mysql://localhost:3306/db_order", "root", "root");
        XAConnection xaConn1 = new MysqlXAConnection(
                (com.mysql.jdbc.Connection) conn1, logXaCommands);
        XAResource rm1 = xaConn1.getXAResource();
        
        // 获得资源管理器操作接口实例 RM2
        Connection conn2 = DriverManager.getConnection
                ("jdbc:mysql://localhost:3306/db_storage", "root", "root");
        XAConnection xaConn2 = new MysqlXAConnection(
                (com.mysql.jdbc.Connection) conn2, logXaCommands);
        XAResource rm2 = xaConn2.getXAResource();
        
        // AP请求TM执行一个分布式事务,TM生成全局事务id
        byte[] gtrid = UUID.randomUUID().toString().getBytes();
        int formatId = 1;
        try {
            
            // ==============分别执行RM1和RM2上的事务分支====================
            // TM生成rm1上的事务分支id
            byte[] bqual1 = UUID.randomUUID().toString().getBytes();
            Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
            // 执行rm1上的事务分支
            rm1.start(xid1, XAResource.TMNOFLAGS);//One of TMNOFLAGS, TMJOIN, or TMRESUME.
            PreparedStatement ps1 = conn1.prepareStatement(
                    "INSERT into order_tbl(user_id,commodity_code,count,money,status) VALUES (1001,2002,2,10,1)");
            ps1.execute();
            rm1.end(xid1, XAResource.TMSUCCESS);

            // TM生成rm2上的事务分支id
            byte[] bqual2 = UUID.randomUUID().toString().getBytes();
            Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
            // 执行rm2上的事务分支
            rm2.start(xid2, XAResource.TMNOFLAGS);
            PreparedStatement ps2 = conn2.prepareStatement(
                    "update stock_tbl set count=count-2 where commodity_code=2002");
            ps2.execute();
            rm2.end(xid2, XAResource.TMSUCCESS);
            
            // ===================两阶段提交================================
            // phase1:询问所有的re 准备提交事务分支
            int rm1_prepare = rm1.prepare(xid1);
            int rm2_prepare = rm2.prepare(xid2);
            // phase2:提交所有事务分支
            boolean onePhase = false;
            //TM判断有2个事务分支,所以不能优化为一阶段提交
            if (rm1_prepare == XAResource.XA_OK
                    && rm2_prepare == XAResource.XA_OK) {
                //所有事务分支都prepare成功,提交所有事务分支
                rm1.commit(xid1, onePhase);
                rm2.commit(xid2, onePhase);
            } else {
                //如果有事务分支没有成功,则回滚
                rm1.rollback(xid1);
                rm2.rollback(xid2);
            }
        } catch (XAException e) {
            // 如果出现异常,也要进行回滚
            e.printStackTrace();
        }
    }
}

基于这个demo可以看出来XA协议大致分为以下步骤:

  1. 对于jdbc conncetion要创建出XAConncetion和XAResource
  2. 根据全局事务ID(可以理解为这一次跨库事务的唯一id)和分支事务ID(每个库上自己的事务id)
  3. 标记分支事务开始
  4. 执行sql
  5. 标记分支事务结束
  6. 一阶段预提交prepared,从跨数据源事务上下文中获取rm和xid执行预提交操作
  7. 二阶段提交commit

捋清楚这个步骤我们就可以动工,分析现有项目大概需要做两部分内容:动态数据源切换和跨库事务,对于动态数据源切换可以借助苞米豆的dynamic-datasource-spring-boot-starter组件直接实现,很遗憾的是这个组件只有单数据源事务支持,对于多数据源事务并没有提供实现。

接下来看实现效果,只需要一个简单的注解就可以实现跨库数据源

//mapper接口定义
public interface OrderMapper {

    //对于@DS注解不了解的同学可以看以下苞米豆官方文档
    //https://baomidou.com/pages/a61e1b/#dynamic-datasource
    //核心源码类是com.baomidou.dynamic.datasource.DynamicRoutingDataSource,可以从这里开始看实现原理
    @DS("db_order")
    @Insert("INSERT into order_tbl(user_id,commodity_code,count,money,status) " +
            "VALUES (#{userId},#{commodityCode},#{count},#{money},#{status})")
    public void add(Order order);

}

public interface StockMapper {

    @DS("db_storage")
    @Insert("INSERT into stock_tbl(commodity_code,count) " +
            "VALUES (#{commodityCode},#{count})")
    public void add(Stock stock);

}
@Service
public class OrderServiceImpl implements OrderService {

    //绑定db_order数据源
    @Resource
    private OrderMapper orderMapper;
    
    //绑定db_storage数据源
    @Resource
    private StockMapper stockMapper;
    
    //只需要加一个@MutiDSTransaction注解即可,减少了业务代码入侵
    //参考了苞米豆的dynamic-datasource的自动配置实现
    //源码位置com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSourceAutoConfiguration#dynamicTransactionAdvisor
    @Override
    @MutiDSTransaction
    public void placeOrder() {
        Order order=new Order();
        order.setUserId("1001");
        order.setCommodityCode("2001");
        order.setCount(2);
        order.setMoney(10);
        order.setStatus(1);
        orderMapper.add(order);

        Stock stock=new Stock();
        stock.setCount(100);
        stock.setCommodityCode("2005");
        stockMapper.add(stock);
        int i=1/0;
    }
}

接下来看如何实现,这部分内容可以直接ctrl+c ctrl+v直接使用,借助之前分析的jdbc XA事务流程来实现

  1. 创建全局事务ID在@MutiDSTransaction切面逻辑中实现
  2. 创建出XAConncetion、XAResource、分支事务id、标记分支事务开始,在mybatis的StatementHandler拦截器中实现
  3. 执行sql不用处理还是mybatis的mapper
  4. 标记分支事务结束,在mybatis的Excutor拦截器中实
  5. 一阶段提交、二阶段提交在@MutiDSTransaction切面逻辑中实现
//注解定义
@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MutiDSTransaction {
}
//@MutiDSTransaction注解的切面逻辑,主要用于标记全局事务开启,并且创建一个全局事务ID(gtrid)
@Slf4j
public class DynamicMutiTransactionInterceptor implements MethodInterceptor {

    @Override
    public Object invoke(MethodInvocation methodInvocation) throws Throwable {
        //如果没有事务ID则直接执行
        if (!StringUtils.isEmpty(MutiTransactionContext.getGTRID())) {
            return methodInvocation.proceed();
        }
        boolean state = true;
        Object o;
        //标记全局事务开启
        MutiDataSourceTxUtil.startTransaction();

        try {
            //执行业务逻辑
            o = methodInvocation.proceed();
            //执行一阶段提交 prepared
            state=MutiTransactionContext.branchPrepare();
        } catch (Exception e) {
            state = false;
            throw e;
        } finally {
            if (state) {
                MutiDataSourceTxUtil.commit();//如果成功则执行二阶段提交 commit
            } else {
                MutiDataSourceTxUtil.rollback();
            }
        }
        return o;
    }
}
/**
 * StatementHandler拦截器
 * 用于在创建Statemen前创建多数据源事务上下文,对Connection创建分支事务,并标识分支事务开始
 * */
@Intercepts({@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class,Integer.class})})
public class MyBatisMutiTransactionStatementIntercepetor implements Interceptor {
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        //判断是否存在全局事务,如果存在则使用XA的预提交
        if(MutiTransactionContext.getGTRID()!=null){
            XAResource rm = getXaResource(invocation);
            Xid xid = new MysqlXid(MutiTransactionContext.getGTRID(), UUID.randomUUID().toString().getBytes(), 1);

            //记录分支事务,用于业务代码执行完后的预提交和二阶段提交
            MutiTransactionContext.bind(xid,rm);

            //标记ap开始
            rm.start(xid, XAResource.TMNOFLAGS);

            Object result = invocation.proceed();

            return result;
        }
        else{ //不存在全局事务直接执行方法
            Object result = invocation.proceed();
            return result;
        }
    }

    private static XAResource getXaResource(Invocation invocation) throws SQLException {
        //由于使用了Druid连接池,这里需要转换并获取JDBC的Connection
        DruidPooledConnection druidPooledConnection=(DruidPooledConnection) invocation.getArgs()[0];
        ConnectionProxyImpl connectionProxy = (ConnectionProxyImpl)druidPooledConnection.getConnection();
        Connection connection = connectionProxy.getConnectionRaw();

        XAConnection xaConn = new MysqlXAConnection(
                (com.mysql.jdbc.Connection) connection, true);
        XAResource rm = xaConn.getXAResource();
        return rm;
    }
}
/**
 * Executor拦截器
 * 用于在mapper执行完后标识分支事务结束
 * */
@Intercepts({@Signature(type = Executor.class,method = "update",args ={MappedStatement.class,Object.class} )})
public class MyBatisMutiTransactionExcutorInterceptor implements Interceptor {
    @Override
    public Object intercept(Invocation invocation) throws Throwable {

        //执行Mapper
        Object result = invocation.proceed();

        //从上下文中获取当前的rm
        ResourceManagerWrapper resourceManager = MutiTransactionContext.getResourceManager();
        XAResource xaResource = resourceManager.getXaResource();
        Xid xid = resourceManager.getXid();

        //标记ap结束
        xaResource.end(xid,XAResource.TMSUCCESS);

        return result;
    }
}
/**
 * 跨数据源事务工具类
 * */
@Slf4j
public class MutiDataSourceTxUtil {
    public static void startTransaction() {
        //标记事务开启,如果已经存在了全局事务ID则标全局事务ID已存在
        if (MutiTransactionContext.getGTRID()!=null&&MutiTransactionContext.getGTRID().length>0) {
            log.debug("dynamic-datasource exist local tx [{}]", MutiTransactionContext.getGTRID());
        } else {
            String xid = UUID.randomUUID().toString();
            MutiTransactionContext.bind(xid.getBytes());
            log.debug("dynamic-datasource start local tx [{}]", xid);
        }
    }

    /**
     * 手动提交事务
     */
    public static void commit() {
        Map<Xid,XAResource> brachSource = MutiTransactionContext.getBrachSource();
        brachSource.forEach((xid,xaResource)->{
            try {
                //将指定的分支事务id预提交
                //这里没有做多个xaResoure未全部commit完毕时服务宕机的处理,小概率可能会出现多数据源事务不一致问题,如果想完全避免这个文件还是要借助seata或者参考seata自己实现一个redolog回滚逻辑
                xaResource.commit(xid,false);
            } catch (XAException e) {
                throw new RuntimeException(e);
            }
        });
        MutiTransactionContext.unbind();
    }

    /**
     * 手动回滚事务
     */
    public static void rollback() {
        Map<Xid,XAResource> brachSource = MutiTransactionContext.getBrachSource();
        brachSource.forEach((xid,xaResource)->{
            try {
                //将指定的分支事务id回滚
                xaResource.rollback(xid);
            } catch (XAException e) {
                throw new RuntimeException(e);
            }
        });
        MutiTransactionContext.unbind();
    }

    /**
     * 获取所有分支事务的预提交结果
     * 所有分支事务都预提交成功后返回true,否则返回false
     * */
    public static boolean branchPrepare(){
        Map<Xid, XAResource> brachSource = MutiTransactionContext.getBrachSource();
        int xidCount=brachSource.size();
        int successCount=0;
        for(Map.Entry<Xid,XAResource> entry: brachSource.entrySet()){
            XAResource xaResource = entry.getValue();
            Xid xid = entry.getKey();
            try {
                int rmPrepare=xaResource.prepare(xid); //预提交
                if(rmPrepare==XAResource.XA_OK){
                    successCount++;
                }
            } catch (XAException e) {
                throw new RuntimeException(e);
            }
        }
        return xidCount==successCount;
    }
}

将rm保存到队列和在Executor拦截器中取出使用rm的时序图

MapperAQueueMapperB队列从始至尾只有一个元素,使用队列的目的是为了保证MapperA与MapperB按顺序执行时获取到的是自己的rmStatementHandler入队rmExcutor出队rmStatementHandler入队rmExcutor出队rmMapperAQueueMapperB
/**
 * 跨数据源事务上下文
 * */
@Slf4j
public class MutiTransactionContext {

    //全局事务ID gtrid
    private static final ThreadLocal<byte[]> CURRENT_GTRID = new ThreadLocal<>();

    //分支事务的资源与期对应的xid
    private static final ThreadLocal<Map<Xid,XAResource>> BRACH_SOURCE=new ThreadLocal<>();

    //RM队列,在StatementHandler拦截器中入队,在Executor拦截器中出队
    //因为Executor中无法取到RM,这里借助mybatis执行mapper时的插件切面逻辑执行顺序实现
    //1.Executor before
    //2.Executor proceed
    //3.StatementHandler before 创建分支事务上下文(入队)、标记分支事务开启
    //4.StatementHandler proceed
    //5.StatementHandler after
    //6.Excutor after 获取rm(出队)、标记分支事务结束
    private static final ThreadLocal<Queue<ResourceManagerWrapper>> ResourceManager_QUEUE =new ThreadLocal<>();

    /**
     * 获取全局事务id
     * */
    public static byte[] getGTRID() {
        byte[] xid = CURRENT_GTRID.get();
        if (xid!=null&&xid.length>0) {
            return xid;
        }
        return null;
    }

    /**
     * 移除全局事务ID
     * */
    public static void unbind() {
        CURRENT_GTRID.remove();
        BRACH_SOURCE.remove();
    }

    /**
     * 绑定全局事务ID
     * */
    public static void bind(byte[] gtrid) {
        if(gtrid.length>Xid.MAXGTRIDSIZE){
            throw new RuntimeException("全局事务id长度不能超过64个字节");
        }
        CURRENT_GTRID.set(gtrid);
    }

    /**
     * 绑定分支事务ID
     * */
    public static void bind(Xid xid, XAResource xaResource){
        if(BRACH_SOURCE.get()==null){
            BRACH_SOURCE.set(new HashMap<>());
        }
        if(ResourceManager_QUEUE.get()==null){
            Queue<ResourceManagerWrapper> queue=new LinkedList<>();
            ResourceManager_QUEUE.set(queue);
        }
        if(BRACH_SOURCE.get().containsKey(xid)){
            log.warn("xid已存在于事务上下文中:{}",xid);
            return;
        }
        BRACH_SOURCE.get().put(xid,xaResource);

        //将当前的rm和xid存储到队列中,便于在Executor拦截器中获取
        ResourceManagerWrapper resourceManagerWrapper=new ResourceManagerWrapper(xaResource,xid);
        ResourceManager_QUEUE.get().add(resourceManagerWrapper);
    }

    public static ResourceManagerWrapper getResourceManager(){
        return ResourceManager_QUEUE.get().remove();
    }

    public static Map<Xid,XAResource> getBrachSource(){
        return BRACH_SOURCE.get();
    }

}
/**
 * 配置类,注册切面,注册mybatis拦截器
 * */
@Configuration(proxyBeanMethods = false)
public class MutiTranscationConfig
{

    /**
     * 注入自定义Advisor,扫描MutiDSTransaction注解,添加动态多数据源事务拦截器
     * 用于开启全局事务
     * */
    @Bean
    public Advisor dynamicMutiTransactionAdvisor() {
        DynamicMutiTransactionInterceptor interceptor = new DynamicMutiTransactionInterceptor();
        return new DynamicDataSourceAnnotationAdvisor(interceptor, MutiDSTransaction.class);
    }

    /**
     * 注入自定义Mybatis拦截器
     * 用于标记分支事务开启
     * */
    @Bean
    public Interceptor myBatisMutiTransactionStatementIntercepetor(){
        return new MyBatisMutiTransactionStatementIntercepetor();
    }

    /**
     * 注入自定义Mybatis拦截器
     * 用于标记分支事务结束
     * */
    @Bean
    public Interceptor myBatisMutiTransactionExcutorInterceptor(){
        return new MyBatisMutiTransactionExcutorInterceptor();
    }

}