SpringCloudAlibaba之Integrated Example示例工程源码解读二
综述
本文分析SpringCloudAlibaba源码示例工程Integrated Example中的下单购买商品场景。
业务流程
下单业务流程如下图所示:
在业务上,有两种情况可能下单失败:
- 商品库存不足
- 用户账户余额不足
技术架构
技术架构如下图:
外部接口统一由API网关接入,然后根据路由规则转发至后台服务。内部接口通过RestController暴露,服务之间使用FeignClient交互。
该示例的核心演示如何使用Seata组件实现分布式事务,Seata采用XA模式。
Seata-Server是事务协调者(TC,Transaction Coordinator),基于2pc方式的事务模式统一协调事务处理流程。它是Seata的核心,负责协调微服务之间的分布式事务,实现事务的提交或回滚。
integrated-order服务是全局事务发起者,充当事务管理者(TM,Transaction Manager) 角色。事务管理者控制全局事务的边界,它开始全局事务、提交或回滚全局事务。
integrated-storage和integrated-account服务是分支事务,在分布式事务架构中充当资源管理者角色(RM Resource Manager),管理数据库资源(控制本地事务)。integrated-order服务本身也是资源管理者角色,管理integrated_order数据库。
分布式事务基本原理
分布式事务执成成功的时序如下图所示:
分布式事务执行失败(以插入订单表记录失败场景为基础)的时序见下图:
核心源码分析
时序交互
源码工程层面的时序图如下:
integrated-order服务在下单接口中开启全局事务,然后去调用integrated-storage服务接口扣减库存,如果扣减失败则下单失败,integrated-order服务本地事务亦不会执行。
如果商品库存扣减成功,则接下来调用integrated-account服务接口扣减账户余额用以支付订单,如果扣减失败则下单失败,Seata-Server会通知integrated-storage服务回滚本地事务。integrated-order服务本地事务不执行。
网关代码和配置
基础application.yaml配置文件:
server:
port: 30010
spring:
application:
name: integrated-gateway
cloud:
nacos:
# 配置中心
config:
server-addr: nacos-server:8848
group: integrated-example
file-extension: yaml
# 服务注册与发现
discovery:
server-addr: nacos-server:8848
group: integrated-example
config:
# 从nacos导入配置文件
import: optional:nacos:integrated-gateway.yaml
nacos配置中心的integrated-gateway.yaml配置
spring:
cloud:
gateway:
routes:
#下单购买商品
- id: placeOrder
uri: lb://integrated-order
predicates:
- Path=/order/create
#查询商品库存
- id: queryStorage
uri: lb://integrated-storage
predicates:
- Path=/storage/
#查询帐户余额
- id: queryAccount
uri: lb://integrated-account
predicates:
- Path=/account/
#为商品点赞(不限流)
- id: praiseItemRocketMQ
uri: lb://integrated-provider
predicates:
- Path=/praise/rocketmq
#为商品点赞(限流)
- id: praiseItemSentinel
uri: lb://integrated-provider
predicates:
- Path=/praise/sentinel
#查询商品点赞量
- id: queryPraise
uri: lb://integrated-consumer
predicates:
- Path=/praise/query
网关启动类
@SpringBootApplication
@EnableDiscoveryClient
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}
网关也需要在nacos上注册,同时也需要通过nacos查找服务,并将请求路由转发到对应的服务。
所以启动类也必须添加@EnableDiscoveryClient注解
网关核心配置代码
@Configuration
public class GatewayConfig {
private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer;
public GatewayConfig(ObjectProvider<List<ViewResolver>> viewResolversProvider,
ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
}
// 注册Sentinel网关过虑器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
}
// 初始化Sentinel限流规则
// 针对id为praiseItemSentinel的路由规则进行限流,每秒5次。
@PostConstruct
public void initGatewayRules() {
Set<GatewayFlowRule> rules = new HashSet<>();
rules.add(
new GatewayFlowRule("praiseItemSentinel").setCount(5).setIntervalSec(1));
GatewayRuleManager.loadRules(rules);
}
// 注册Sentinel的BlockExceptionHandler
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() {
// Register the block exception handler for Spring Cloud Gateway.
return new SentinelGatewayBlockExceptionHandler(viewResolvers,
serverCodecConfigurer);
}
// 设置请求被限流阻塞时的处理类
// 此处代码实现:请求限流时,直接返回“此接口被限流了”的消息给客户端。
@PostConstruct
public void initBlockHandlers() {
BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange serverWebExchange,
Throwable throwable) {
return ServerResponse.status(HttpStatus.OK)
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(BodyInserters.fromObject("此接口被限流了"));
}
};
GatewayCallbackManager.setBlockHandler(blockRequestHandler);
}
# 允许跨域请求
@Bean
public CorsWebFilter corsFilter() {
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
CorsConfiguration config = new CorsConfiguration();
config.setAllowCredentials(true);
config.addAllowedHeader("*");
config.addAllowedMethod("*");
config.addAllowedOriginPattern("*");
source.registerCorsConfiguration("/**", config);
return new CorsWebFilter(source);
}
}
订单服务
application.yaml配置
server:
port: 8013
spring:
application:
name: integrated-order
cloud:
nacos:
discovery:
server-addr: nacos-server:8848
group: integrated-example
config:
server-addr: nacos-server:8848
group: integrated-example
file-extension: yaml
config:
import:
- optional:nacos:integrated-order.yaml
- optional:nacos:datasource-config.yaml
# seata分布式事务组件配置
seata:
application-id: ${spring.application.name}
# 事务分组。每个事务分组内的事务都会被一个TC实例处理,而不同的事务分组可能会被不同的TC实例处理
tx-service-group: ${spring.application.name}-group
service:
vgroup-mapping:
# 事务分组至TC集群的映射。Seata中可以配置多个TC集群,每个集群负责处理一个或多个服务组的事务
integrated-order-group: default
grouplist:
# TC服务列表,仅注册中心为file时使用。如注册中心使用nacos等,则不需要设置
default: seata-server:8091
订单服务类-OrderServiceImpl
@Service
public class OrderServiceImpl implements OrderService {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountServiceFeignClient accountService;
@Autowired
private StorageServiceFeignClient storageService;
@Override
// GlobalTransactional注解开启全局事务
@GlobalTransactional
public Result<?> createOrder(String userId, String commodityCode, Integer count) {
logger.info("[createOrder] current XID: {}", RootContext.getXID());
//调用库存服务 扣减库存
StorageDTO storageDTO = new StorageDTO();
storageDTO.setCommodityCode(commodityCode);
storageDTO.setCount(count);
Integer storageCode = storageService.reduceStock(storageDTO).getCode();
if (storageCode.equals(COMMON_FAILED.getCode())) {
throw new BusinessException("stock not enough");
}
//调用账户服务 扣减余额
int price = count * 2;
AccountDTO accountDTO = new AccountDTO();
accountDTO.setUserId(userId);
accountDTO.setPrice(price);
Integer accountCode = accountService.reduceBalance(accountDTO).getCode();
if (accountCode.equals(COMMON_FAILED.getCode())) {
throw new BusinessException("balance not enough");
}
//保存订单至本地数据库
Order order = new Order();
order.setUserId(userId);
order.setCommodityCode(commodityCode);
order.setCount(count);
order.setMoney(price);
order.setCreateTime(new Timestamp(System.currentTimeMillis()));
order.setUpdateTime(new Timestamp(System.currentTimeMillis()));
orderMapper.saveOrder(order);
logger.info("[createOrder] orderId: {}", order.getId());
return Result.success(order);
}
}
订单服务依赖两个FeignClient:
- AccountServiceFeignClient 调用账户服务扣减账户余额
- StorageServiceFeignClient 调用库存服务扣减商品库存
@FeignClient(name = "integrated-account")
public interface AccountServiceFeignClient {
@PostMapping("/account/reduce-balance")
Result<?> reduceBalance(@RequestBody AccountDTO accountReduceBalanceDTO);
}
@FeignClient(name = "integrated-storage")
public interface StorageServiceFeignClient {
@PostMapping("/storage/reduce-stock")
Result<?> reduceStock(@RequestBody StorageDTO productReduceStockDTO);
}
库存服务
application.yaml配置
server:
port: 8011
spring:
application:
name: integrated-storage
cloud:
nacos:
discovery:
server-addr: nacos-server:8848
group: integrated-example
config:
server-addr: nacos-server:8848
group: integrated-example
config:
import:
- optional:nacos:integrated-storage.yaml
- optional:nacos:datasource-config.yaml
seata:
application-id: ${spring.application.name}
tx-service-group: ${spring.application.name}-group
service:
vgroup-mapping:
integrated-storage-group: default
grouplist:
default: seata-server:8091
StorageServiceImpl类扣减商品库存
@Service
public class StorageServiceImpl implements StorageService {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private StorageMapper storageMapper;
@Override
// Transactional注解用于开启本地事务,如果本地事务发生异常导致本地事务回滚。则全局事务回滚时,不会执行对应的undo log。
@Transactional
public void reduceStock(String commodityCode, Integer count)
throws BusinessException {
logger.info("[reduceStock] current XID: {}", RootContext.getXID());
checkStock(commodityCode, count);
Timestamp updateTime = new Timestamp(System.currentTimeMillis());
int updateCount = storageMapper.reduceStock(commodityCode, count, updateTime);
if (updateCount == 0) {
throw new BusinessException("deduct stock failed");
}
}
账户服务
application.yaml配置
server:
port: 8012
spring:
application:
name: integrated-account
cloud:
nacos:
discovery:
server-addr: nacos-server:8848
group: integrated-example
config:
server-addr: nacos-server:8848
group: integrated-example
file-extension: yaml
config:
import:
- optional:nacos:integrated-account.yaml
- optional:nacos:datasource-config.yaml
seata:
application-id: ${spring.application.name}
tx-service-group: ${spring.application.name}-group
service:
vgroup-mapping:
integrated-account-group: default
grouplist:
default: seata-server:8091
AccountServiceImpl 扣减余额
@Service
public class AccountServiceImpl implements AccountService {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private AccountMapper accountMapper;
@Override
//Transactional注解用于开启本地事务,如果本地事务发生异常导致本地事务回滚。则全局事务回滚时,不会执行对应的undo log。
@Transactional
public void reduceBalance(String userId, Integer price) throws BusinessException {
logger.info("[reduceBalance] currenet XID: {}", RootContext.getXID());
checkBalance(userId, price);
Timestamp updateTime = new Timestamp(System.currentTimeMillis());
int updateCount = accountMapper.reduceBalance(userId, price, updateTime);
if (updateCount == 0) {
throw new BusinessException("reduce balance failed");
}
}
Seata分布式事务使用总结
通过源代码的阅读分析,我们知道了下单流程中涉及到三个核心服务integrated-order,integrated-storage,integrated-account在集成seata中间件时,使用了如下流程:
1.引入seata组件依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency
2.添加客户端配置
# seata分布式事务组件配置
seata:
application-id: ${spring.application.name}
# 事务分组。每个事务分组内的事务都会被一个TC实例处理,而不同的事务分组可能会被不同的TC实例处理
tx-service-group: ${spring.application.name}-group
service:
vgroup-mapping:
# 事务分组至TC集群的映射。Seata中可以配置多个TC集群,每个集群负责处理一个或多个服务组的事务
${spring.application.name}-group: default
grouplist:
# TC服务列表,仅注册中心为file时使用。如注册中心使用nacos等,则不需要设置
default: seata-server:8091
3.业务入口接口方法上使用@GlobalTransactional注解,开启全局事务。
// GlobalTransactional注解开启全局事务
@GlobalTransactional
public Result<?> createOrder(String userId, String commodityCode, Integer count) {
4.当然,最重要的,要把seata-server启动起来。否则一切都是白忙乎。 seata-server在整个分布式事务架构中,充当的是最重要的TC角色,也就是事务协调者(Transaction Coordinator),它会把分布式系统中的各个TM,RM角色串联起来,组成一个全局事务,然后来协调统一提交或回滚。
转载自:https://juejin.cn/post/7236010330039337018