likes
comments
collection
share

Seata(一) AT 模式探索

作者站长头像
站长
· 阅读数 50

seata是什么

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

AT模式

seata默认事务模式是AT,也是广泛使用,侵入度比较低的接入方式。

基本概念

TM (Transaction Manager) - 事务管理器定义全局事务的范围:开始全局事务、提交或回滚全局事务。
TC (Transaction Coordinator) - 事务协调者维护全局和分支事务的状态,驱动全局事务提交或回滚。
RM (Resource Manager) - 资源管理器管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

流程设计

Seata(一) AT 模式探索

步骤

  1. 开启分布式事务
  2. 插入订单
  3. feign远程调用产品扣减库存

代码实现

由于依赖数据库表回滚,所以需要在数据库中创建undo_log表。表结构如下:

-- auto-generated definition
create table undo_log
(
  id            bigint auto_increment
  primary key,
  branch_id     bigint       not null,
  xid           varchar(100) not null,
  context       varchar(128) not null,
  rollback_info longblob     not null,
  log_status    int          not null,
  log_created   datetime     not null,
  log_modified  datetime     not null,
  constraint ux_undo_log
  unique (xid, branch_id)
)
charset = utf8;

订单服务

@GlobalTransactional
public void insert(Order order) {

    log.info("全局事务id ==>" + RootContext.getXID());

    // 创建订单
    // 微服务扣减订单
    orderRepository.insert(order);
    productStockClient.deductStock(order);

    if (order.getNumber() > 0) {
        throw new RuntimeException("异常:模拟业务异常:stock branch exception");
    }

}

产品库存服务

public int deduct(Order order) {

    log.info("全局事务id ==>" + RootContext.getXID());

    // 数据库扣减库存
    int i = productRepository.deductStock(order);

    return i;

}

服务调用

Seata(一) AT 模式探索

现象结果

我们先看下结果,分别查看下订单表和产品库存表中的数据是否有发生变化

订单表Seata(一) AT 模式探索

库存表

Seata(一) AT 模式探索

没有发生任何变化,订单表没有新的订单,库存表原始库存100的容量也没有减少。

分析日志

订单表的日志

插入订单表Execute SQL:INSERT INTO t_order ( name, number, product_id ) VALUES ( 'iphone', 1, 1 )
查询新插入的数据SELECT id, name, number, product_id FROM t_order WHERE (id) in ( (33) )
插入undo_log表,可以看到定义的分支id和全局事务xidINSERT INTO undo_log (branch_id, xid, context, rollback_info, log_status, log_created, log_modified) VALUES (18380499234267236, '172.17.0.2:8091:18380499234267235', 'serializer=jackson&compressorType=NONE', '7B2***', 0, now(6), now(6))
发生了异常,但是没有进行打印,先进行分布式事务回滚
RM执行xid回滚操作开始rm handle branch rollback process:xid=172.17.0.2:8091:18380499234267235,branchId=18380499234267236,branchType=AT,resourceId=jdbc:mysql://47.103.130.223:3306/task,applicationData={"skipCheckLock":true}
通知TC,进行xid对应事务回滚Branch Rollbacking: 172.17.0.2:8091:18380499234267235 18380499234267236 jdbc:mysql://47.103.130.223:3306/task
查询xid,branch_id对应的undo_logSELECT * FROM undo_log WHERE branch_id = 18380499234267236 AND xid = '172.17.0.2:8091:18380499234267235' FOR UPDATE
定位id,发起数据库行锁SELECT * FROM t_order WHERE (id) in ( (33) ) FOR UPDATE
删除之前插入的数据DELETE FROM t_order WHERE id = 33
二阶段回滚Branch Rollbacked result: PhaseTwo_Rollbacked

库存表的日志

更新库存表UPDATE t_product SET stock_num = stock_num - 1 WHERE id = 1
查询更新后的结果SELECT t_product.id, stock_num FROM t_product WHERE (id) in ( (1) )
插入undo_log表INSERT INTO undo_log (branch_id, xid, context, rollback_info, log_status, log_created, log_modified) VALUES (18380499234267240, '172.17.0.2:8091:18380499234267238', 'serializer=jackson&compressorType=NONE', '***', 0, now(6), now(6))
收到TC发起的rpc回滚消息,本地RM发起回滚操作rm handle branch rollback process:xid=172.17.0.2:8091:18380499234267238,branchId=18380499234267240,branchType=AT,resourceId=jdbc:mysql://47.103.130.223:3306/task,applicationData=null
查出undo_log中更新前后的记录状态SELECT * FROM undo_log WHERE branch_id = 18380499234267240 AND xid = '172.17.0.2:8091:18380499234267238' FOR UPDATE
查询库存表中数据状态,进行对比,决定是否发起回滚操作SELECT * FROM t_product WHERE (id) in ( (1) ) FOR UPDATE
更新库存到数据库操作前UPDATE t_product SET stock_num = 100 WHERE id = 1
删除undo_log日志DELETE FROM undo_log WHERE branch_id = 18380499234267240 AND xid = '172.17.0.2:8091:18380499234267238'
二阶段回滚提交Branch Rollbacked result: PhaseTwo_Rollbacked

配置

seata包依赖

去掉依赖包中的io-seata版本,改成跟seata-server版本一致。客户端和服务端都采用的是1.6.0的版本

implementation('com.alibaba.cloud:spring-cloud-starter-alibaba-seata:2021.1') {
    exclude group: 'io.seata', module: 'seata-all'
    exclude group: 'io.seata', module: 'seata-spring-boot-starter'
}

implementation('io.seata:seata-all:1.6.0')
implementation('io.seata:seata-spring-boot-starter:1.6.0')

application.yml

采用的是单机docker的部署形式,因此没有配置事务组

seata:
  service:
    grouplist:
      default: 192.168.110.62:8091
  client:
    undo:
      log-serialization: jackson

xid的传递

通过feign调用远程服务时,我们需要将xid传递到第三方服务中。我们可以采用打印的方式来查看xid在不同服务之间是否一致。

log.info("全局事务id ==>" + RootContext.getXID());

避免在手动配置,直接使用 com.alibaba.cloud:spring-cloud-starter-alibaba-seata版本中的SeataFeignClientAutoConfiguration,通过代理feign-client的方式。进行header请求头中添加xid的传输。有一个小坑,最新的版本是没有自动配置的,需要手动配置。所以需要检查一下自动配置是否生效。

注解配置

调用方上配置@GlobalTransactional即可。被调用方不需要添加事务注解。因为主要是通过数据库中表undo_log来发起的数据库回滚。没有利用数据库本身的事务回滚功能。

seata-server单机启动

docker run --name seata-server -p 8091:8091 seataio/seata-server:1.6.0

总结

seata的AT使用方式上,还是非常简单的。核心是保证xid能够传递到每一个微服务中。发生回滚操作时,通过TC来进行rpc通知。发起本地的事务回滚操作。

转载自:https://juejin.cn/post/7208084427393613882
评论
请登录