likes
comments
collection
share

SpringCloudAlibaba之Integrated Example示例工程源码解读二

作者站长头像
站长
· 阅读数 47

综述

本文分析SpringCloudAlibaba源码示例工程Integrated Example中的下单购买商品场景。

业务流程

下单业务流程如下图所示: SpringCloudAlibaba之Integrated Example示例工程源码解读二

在业务上,有两种情况可能下单失败:

  1. 商品库存不足
  2. 用户账户余额不足

技术架构

技术架构如下图:

SpringCloudAlibaba之Integrated Example示例工程源码解读二

外部接口统一由API网关接入,然后根据路由规则转发至后台服务。内部接口通过RestController暴露,服务之间使用FeignClient交互。

该示例的核心演示如何使用Seata组件实现分布式事务,Seata采用XA模式。

Seata-Server是事务协调者(TC,Transaction Coordinator),基于2pc方式的事务模式统一协调事务处理流程。它是Seata的核心,负责协调微服务之间的分布式事务,实现事务的提交或回滚。

integrated-order服务是全局事务发起者,充当事务管理者(TM,Transaction Manager) 角色。事务管理者控制全局事务的边界,它开始全局事务、提交或回滚全局事务。

integrated-storage和integrated-account服务是分支事务,在分布式事务架构中充当资源管理者角色(RM Resource Manager),管理数据库资源(控制本地事务)。integrated-order服务本身也是资源管理者角色,管理integrated_order数据库。

分布式事务基本原理

分布式事务执成成功的时序如下图所示:

SpringCloudAlibaba之Integrated Example示例工程源码解读二

分布式事务执行失败(以插入订单表记录失败场景为基础)的时序见下图:

SpringCloudAlibaba之Integrated Example示例工程源码解读二

核心源码分析

时序交互

源码工程层面的时序图如下:

SpringCloudAlibaba之Integrated Example示例工程源码解读二

integrated-order服务在下单接口中开启全局事务,然后去调用integrated-storage服务接口扣减库存,如果扣减失败则下单失败,integrated-order服务本地事务亦不会执行。

如果商品库存扣减成功,则接下来调用integrated-account服务接口扣减账户余额用以支付订单,如果扣减失败则下单失败,Seata-Server会通知integrated-storage服务回滚本地事务。integrated-order服务本地事务不执行。

网关代码和配置

基础application.yaml配置文件:

server:
  port: 30010
spring:
  application:
    name: integrated-gateway
  cloud:
    nacos:
      # 配置中心
      config:
        server-addr: nacos-server:8848
        group: integrated-example
        file-extension: yaml
      # 服务注册与发现
      discovery:
        server-addr: nacos-server:8848
        group: integrated-example
  config:
    # 从nacos导入配置文件
    import: optional:nacos:integrated-gateway.yaml

nacos配置中心的integrated-gateway.yaml配置

spring:
  cloud:
    gateway:
      routes:
          #下单购买商品
        - id: placeOrder
          uri: lb://integrated-order
          predicates:
            - Path=/order/create
          #查询商品库存
        - id: queryStorage
          uri: lb://integrated-storage
          predicates:
            - Path=/storage/
          #查询帐户余额
        - id: queryAccount
          uri: lb://integrated-account
          predicates:
            - Path=/account/
          #为商品点赞(不限流)
        - id: praiseItemRocketMQ
          uri: lb://integrated-provider
          predicates:
            - Path=/praise/rocketmq
          #为商品点赞(限流)
        - id: praiseItemSentinel
          uri: lb://integrated-provider
          predicates:
            - Path=/praise/sentinel
          #查询商品点赞量
        - id: queryPraise
          uri: lb://integrated-consumer
          predicates:
            - Path=/praise/query

网关启动类

@SpringBootApplication
@EnableDiscoveryClient
public class GatewayApplication {

	public static void main(String[] args) {
		SpringApplication.run(GatewayApplication.class, args);
	}
}

网关也需要在nacos上注册,同时也需要通过nacos查找服务,并将请求路由转发到对应的服务。

所以启动类也必须添加@EnableDiscoveryClient注解

网关核心配置代码

@Configuration
public class GatewayConfig {

	private final List<ViewResolver> viewResolvers;

	private final ServerCodecConfigurer serverCodecConfigurer;

	public GatewayConfig(ObjectProvider<List<ViewResolver>> viewResolversProvider,
			ServerCodecConfigurer serverCodecConfigurer) {
		this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
		this.serverCodecConfigurer = serverCodecConfigurer;
	}
        // 注册Sentinel网关过虑器
	@Bean
	@Order(Ordered.HIGHEST_PRECEDENCE)
	public GlobalFilter sentinelGatewayFilter() {
		return new SentinelGatewayFilter();
	}
        // 初始化Sentinel限流规则
        // 针对id为praiseItemSentinel的路由规则进行限流,每秒5次。
	@PostConstruct
	public void initGatewayRules() {
		Set<GatewayFlowRule> rules = new HashSet<>();
		rules.add(
				new GatewayFlowRule("praiseItemSentinel").setCount(5).setIntervalSec(1));
		GatewayRuleManager.loadRules(rules);
	}
        // 注册Sentinel的BlockExceptionHandler
	@Bean
	@Order(Ordered.HIGHEST_PRECEDENCE)
	public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() {
		// Register the block exception handler for Spring Cloud Gateway.
		return new SentinelGatewayBlockExceptionHandler(viewResolvers,
				serverCodecConfigurer);
	}
        // 设置请求被限流阻塞时的处理类
        // 此处代码实现:请求限流时,直接返回“此接口被限流了”的消息给客户端。
	@PostConstruct
	public void initBlockHandlers() {
		BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
			@Override
			public Mono<ServerResponse> handleRequest(ServerWebExchange serverWebExchange,
					Throwable throwable) {
				return ServerResponse.status(HttpStatus.OK)
						.contentType(MediaType.APPLICATION_JSON_UTF8)
						.body(BodyInserters.fromObject("此接口被限流了"));
			}
		};
		GatewayCallbackManager.setBlockHandler(blockRequestHandler);
	}
        # 允许跨域请求
	@Bean
	public CorsWebFilter corsFilter() {
		UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
		CorsConfiguration config = new CorsConfiguration();
		config.setAllowCredentials(true);
		config.addAllowedHeader("*");
		config.addAllowedMethod("*");
		config.addAllowedOriginPattern("*");
		source.registerCorsConfiguration("/**", config);

		return new CorsWebFilter(source);
	}

}

订单服务

application.yaml配置

server:
  port: 8013

spring:
  application:
    name: integrated-order
  cloud:
    nacos:
      discovery:
        server-addr: nacos-server:8848
        group: integrated-example
      config:
        server-addr: nacos-server:8848
        group: integrated-example
        file-extension: yaml
  config:
    import:
      - optional:nacos:integrated-order.yaml
      - optional:nacos:datasource-config.yaml
# seata分布式事务组件配置
seata:
  application-id: ${spring.application.name}
  # 事务分组。每个事务分组内的事务都会被一个TC实例处理,而不同的事务分组可能会被不同的TC实例处理
  tx-service-group: ${spring.application.name}-group
  service:
    vgroup-mapping:
      # 事务分组至TC集群的映射。Seata中可以配置多个TC集群,每个集群负责处理一个或多个服务组的事务
      integrated-order-group: default
    grouplist:
      # TC服务列表,仅注册中心为file时使用。如注册中心使用nacos等,则不需要设置
      default: seata-server:8091

订单服务类-OrderServiceImpl

@Service
public class OrderServiceImpl implements OrderService {

	private Logger logger = LoggerFactory.getLogger(getClass());

	@Autowired
	private OrderMapper orderMapper;

	@Autowired
	private AccountServiceFeignClient accountService;

	@Autowired
	private StorageServiceFeignClient storageService;

        @Override
        // GlobalTransactional注解开启全局事务
	@GlobalTransactional
	public Result<?> createOrder(String userId, String commodityCode, Integer count) {

		logger.info("[createOrder] current XID: {}", RootContext.getXID());

		//调用库存服务 扣减库存
		StorageDTO storageDTO = new StorageDTO();
		storageDTO.setCommodityCode(commodityCode);
		storageDTO.setCount(count);
		Integer storageCode = storageService.reduceStock(storageDTO).getCode();
		if (storageCode.equals(COMMON_FAILED.getCode())) {
			throw new BusinessException("stock not enough");
		}

		//调用账户服务 扣减余额
		int price = count * 2;
		AccountDTO accountDTO = new AccountDTO();
		accountDTO.setUserId(userId);
		accountDTO.setPrice(price);
		Integer accountCode = accountService.reduceBalance(accountDTO).getCode();
		if (accountCode.equals(COMMON_FAILED.getCode())) {
			throw new BusinessException("balance not enough");
		}

		//保存订单至本地数据库
		Order order = new Order();
		order.setUserId(userId);
		order.setCommodityCode(commodityCode);
		order.setCount(count);
		order.setMoney(price);
		order.setCreateTime(new Timestamp(System.currentTimeMillis()));
		order.setUpdateTime(new Timestamp(System.currentTimeMillis()));
		orderMapper.saveOrder(order);
		logger.info("[createOrder] orderId: {}", order.getId());

		return Result.success(order);
	}

}

订单服务依赖两个FeignClient:

  1. AccountServiceFeignClient 调用账户服务扣减账户余额
  2. StorageServiceFeignClient 调用库存服务扣减商品库存
@FeignClient(name = "integrated-account")
public interface AccountServiceFeignClient {

	@PostMapping("/account/reduce-balance")
	Result<?> reduceBalance(@RequestBody AccountDTO accountReduceBalanceDTO);

}

@FeignClient(name = "integrated-storage")
public interface StorageServiceFeignClient {

	@PostMapping("/storage/reduce-stock")
	Result<?> reduceStock(@RequestBody StorageDTO productReduceStockDTO);

}

库存服务

application.yaml配置

server:
  port: 8011

spring:
  application:
    name: integrated-storage
  cloud:
    nacos:
      discovery:
        server-addr: nacos-server:8848
        group: integrated-example
      config:
        server-addr: nacos-server:8848
        group: integrated-example
  config:
    import:
      - optional:nacos:integrated-storage.yaml
      - optional:nacos:datasource-config.yaml

seata:
  application-id: ${spring.application.name}
  tx-service-group: ${spring.application.name}-group
  service:
    vgroup-mapping:
      integrated-storage-group: default
    grouplist:
      default: seata-server:8091


StorageServiceImpl类扣减商品库存

@Service
public class StorageServiceImpl implements StorageService {

	private Logger logger = LoggerFactory.getLogger(getClass());

	@Autowired
	private StorageMapper storageMapper;

	@Override
        // Transactional注解用于开启本地事务,如果本地事务发生异常导致本地事务回滚。则全局事务回滚时,不会执行对应的undo log。
	@Transactional
	public void reduceStock(String commodityCode, Integer count)
			throws BusinessException {
		logger.info("[reduceStock] current XID: {}", RootContext.getXID());

		checkStock(commodityCode, count);

		Timestamp updateTime = new Timestamp(System.currentTimeMillis());
		int updateCount = storageMapper.reduceStock(commodityCode, count, updateTime);
		if (updateCount == 0) {
			throw new BusinessException("deduct stock failed");
		}
	}

账户服务

application.yaml配置

server:
  port: 8012

spring:
  application:
    name: integrated-account
  cloud:
    nacos:
      discovery:
        server-addr: nacos-server:8848
        group: integrated-example
      config:
        server-addr: nacos-server:8848
        group: integrated-example
        file-extension: yaml
  config:
    import:
      - optional:nacos:integrated-account.yaml
      - optional:nacos:datasource-config.yaml

seata:
  application-id: ${spring.application.name}
  tx-service-group: ${spring.application.name}-group
  service:
    vgroup-mapping:
      integrated-account-group: default
    grouplist:
      default: seata-server:8091

AccountServiceImpl 扣减余额

@Service
public class AccountServiceImpl implements AccountService {

	private Logger logger = LoggerFactory.getLogger(getClass());

	@Autowired
	private AccountMapper accountMapper;

	@Override
        //Transactional注解用于开启本地事务,如果本地事务发生异常导致本地事务回滚。则全局事务回滚时,不会执行对应的undo log。
	@Transactional
	public void reduceBalance(String userId, Integer price) throws BusinessException {
		logger.info("[reduceBalance] currenet XID: {}", RootContext.getXID());

		checkBalance(userId, price);

		Timestamp updateTime = new Timestamp(System.currentTimeMillis());
		int updateCount = accountMapper.reduceBalance(userId, price, updateTime);
		if (updateCount == 0) {
			throw new BusinessException("reduce balance failed");
		}
	}

Seata分布式事务使用总结

通过源代码的阅读分析,我们知道了下单流程中涉及到三个核心服务integrated-order,integrated-storage,integrated-account在集成seata中间件时,使用了如下流程:

1.引入seata组件依赖

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency

2.添加客户端配置

# seata分布式事务组件配置
seata:
  application-id: ${spring.application.name}
  # 事务分组。每个事务分组内的事务都会被一个TC实例处理,而不同的事务分组可能会被不同的TC实例处理
  tx-service-group: ${spring.application.name}-group
  service:
    vgroup-mapping:
      # 事务分组至TC集群的映射。Seata中可以配置多个TC集群,每个集群负责处理一个或多个服务组的事务
      ${spring.application.name}-group: default
    grouplist:
      # TC服务列表,仅注册中心为file时使用。如注册中心使用nacos等,则不需要设置
      default: seata-server:8091

3.业务入口接口方法上使用@GlobalTransactional注解,开启全局事务。

        // GlobalTransactional注解开启全局事务
	@GlobalTransactional
	public Result<?> createOrder(String userId, String commodityCode, Integer count) {

4.当然,最重要的,要把seata-server启动起来。否则一切都是白忙乎。 seata-server在整个分布式事务架构中,充当的是最重要的TC角色,也就是事务协调者(Transaction Coordinator),它会把分布式系统中的各个TM,RM角色串联起来,组成一个全局事务,然后来协调统一提交或回滚。

转载自:https://juejin.cn/post/7236010330039337018
评论
请登录