Spring Boot项目实战:消息丢失和重复消费问题
你好,我是田哥
在我的充电桩项目中,有个用户积分模块,原型图如下:
我的积分
下面来聊聊这个项目中,积分增加场景。
- 用户充值完成后,赠送积分,比如:充100元,给用户积分新增100个。
- 用户充电支付完成(非余额支付),赠送积分。
- 用户邀请新人注册,赠送积分
- 新用户认证完成,赠送积分
- ....
关于积分增加策略,基本上都是由运营来定,总之,很多项目中都有这么个功能。
常规系统就是用户上面的行为伴随着用户积分处理完成(在同一个事物里),但,咱们为了提升系统性能和用户体验,我们通常把积分增加这类业务采用异步方式去实现。
比如:线程池、各种消息队列等
在我们这个版本中,采用的是RabbitMQ消息队列来实现的。
问题
既然使用RabbitMQ,那我们就不得不考虑关于消息的问题:
- 消息丢失问题
- 重复消费问题
消息丢失问题
这里对用户积分增加,如果把消息搞丢了,用户积分最终并不会得到增加,那用户肯定不干了,为了防止这类问题出现,我们采用下面的解决方案。
1:我们采用confirm模式+持久化+手动ack
2:消息丢失种鸽问题:消息发送失败,我们采用失败消息记录表
3:定时任务轮训失败消息记录,再次发送
这里为了防止多次重试问题,所以设置一个重试上限,并加入警告(比如一条消息最多重发5次,一旦到5次了,就给运维/开发/测试发送邮件警告)。
重复消费问题
这里是对用户积分增加,所以,绝对不能重复消费,不然这样会导致用户积分暴增,数据会出现一致性问题。解决:
1:每个消息有一个唯一的reqId,reqId=业务前缀+UUID+年月日时分秒毫秒的时间戳
2:在对比是否重复消费之前,对用户加上分布式锁,key=固定用户分布式锁前缀+userId
整体流程图
标配版:
标配版
为了更好地监控消息发送失败问题,我们还可以对标配版进行升级。
升级版
其他问题
我们上面说了,为了防止消息丢失,采用confirm模式+持久化+手动ack
但,实现起来并非那么简单,如果没有做过,很多东西是无法体会到的。
在使用confirm模式时,新的问题来了。
问题
我们Spring中,一个Bean默认是单列的,这样的话会造成一个RabbitMQTemplate只能绑定一个confirm,这就不对了,我们需要RabbitMQTemplate不受Spring这个影响,很多人第一印象想到的就是采用原型模式。也就是在bean上添加注解:@Scope("prototype") 但,问题来了,比如在一个producer bean里注入RabbitMQTemplate,他最终还是认为你这个RabbitMQTemplate是单列,又和上面原型违背了,网上很多办法是给这个producer也搞成原型模式。
这个确实能解决这个问题。
说白了就是 从请求开始的bean开始到最后发送消息,这个过程的bean要都是原型模式才行。
比如:controller--service--producer
挖了个蛐蛐,问题又来了,项目中定时任务采用的是xxl-job,它的每个job都必须是单列的,上面的办法又不行了。
绝招:用Spring中的ApplicationContext**的getBean方法直接获取对应的Bean就不存在问题。
这里有点绕哈,说白了就是必须使用原型,不能使用单例。
核心代码
定义原型的rabbitTemplate。
@Configuration
public class RabbitConfig {
//省略部分非核心代码
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
}
confirm模式
@Component
public class UserPointProducer {
@Resource
private RetryMessageMapper chargeUserMapper;
public void sendMessage(String message) {
RabbitTemplate rabbitTemplate = ApplicationContextFactory.getBean(RabbitTemplate.class);
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);
} else {
RetryMessage retryMessage = new RetryMessage();
retryMessage.setContent(message);
retryMessage.setRetry(5);
retryMessage.setCreateTime(new Date());
retryMessage.setStatus(0);
retryMessage.setRetriedTimes(0);
retryMessage.setType(RabbitMQConstantEnum.USER_POINT.getType());
chargeUserMapper.insert(retryMessage);
log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause);
}
});
//后面+1 主要是为了掩饰消息发送失败
rabbitTemplate.convertAndSend(RabbitMQConstantEnum.USER_POINT.getExchange()+"1"
, RabbitMQConstantEnum.USER_POINT.getRoutingKey(), message, message1 -> {
message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化
return message1;
}, correlationId);
}
}
下面来写个测试发送案例:
/**
* {@code @description:} 测试案例
*
* @author tianwc 公众号:Java后端技术全栈
* 在线刷题 1200+java面试题和1000+篇技术文章:https://woaijava.cc/
* {@code @date:} 2024-03-24 9:19
* {@code @version:} 1.0
*/
@GetMapping("/send")
public void send() {
UserPointMessage userPointMessage = new UserPointMessage();
userPointMessage.setType(UserUpdatePointEnum.ADD.getType());
userPointMessage.setUserId(1L);
userPointMessage.setPoint(9);
userPointMessage.setReqId(MessageReqIdPrefixConstant.USER_POINT + UUID.randomUUID()+ DateUtils.formatDefaultDateMs());
userPointProducer.sendMessage(JSON.toJSONString(userPointMessage));
}
消费者:
/**
* {@code @description:} 用户积分消息消费者
*
* @author tianwc 公众号:Java后端技术全栈
* 在线刷题 1200+java面试题和1000+篇技术文章:<a href="https://woaijava.cc/">博客地址</a>
* {@code @date:} 2024-03-24 9:19
* {@code @version:} 1.0
*/
@RabbitListener(queues = "user.point.queue")
@Component
@Slf4j
public class UserPointConsumer {
@Resource
private UserPointService userPointService;
@RabbitHandler
public void process(Object data, Channel channel, Message message) throws IOException {
try {
log.info("消费者接受到的消息是:{},消息体为:{}", data, message);
UserPointMessage userPointMessage = JSON.parseObject(new String(message.getBody()), UserPointMessage.class);
userPointService.updateUserPoint(userPointMessage);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception exception) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
具体业务逻辑实现:
/**
* {@code @description:} 用户积分扣除消费者服务实现类
*
* @author tianwc 公众号:Java后端技术全栈
* 在线刷题 1200+java面试题和1000+篇技术文章:<a href="https://woaijava.cc/">博客地址</a>
* {@code @date:} 2024-03-24 9:19
* {@code @version:} 1.0
*/
@Slf4j
@Service
public class UserPointServiceImpl implements UserPointService {
@Resource
private ChargeUserMapper chargeUserMapper;
@Resource
private RedissonClient redissonClient;
@Resource
private PointsChangeRecordMapper pointsChangeRecordMapper;
@Transactional(rollbackFor = Exception.class)
@Override
public void updateUserPoint(UserPointMessage userPointMessage) {
RLock lock = redissonClient.getLock(RedisConstantPre.USER_INFO_ID_PRE + userPointMessage.getUserId());
lock.lock();
try {
int count = pointsChangeRecordMapper.selectByReqId(userPointMessage.getReqId());
if (count > 0) {
log.info("消息体参数 【重复消息】:{}", userPointMessage);
return;
}
PointsChangeRecord pointsChangeRecord = new PointsChangeRecord();
pointsChangeRecord.setUserId(userPointMessage.getUserId());
pointsChangeRecord.setPoint(userPointMessage.getPoint());
pointsChangeRecord.setType(userPointMessage.getType());
pointsChangeRecord.setCreateTime(new Date());
pointsChangeRecord.setReqId(userPointMessage.getReqId());
//积分变动记录
pointsChangeRecordMapper.insert(pointsChangeRecord);
ChargeUser chargeUser = chargeUserMapper.selectByPrimaryKey(userPointMessage.getUserId());
if (userPointMessage.getType() == UserUpdatePointEnum.ADD.getType()) {
chargeUser.setPoint(chargeUser.getPoint() + userPointMessage.getPoint());
} else {
chargeUser.setPoint(chargeUser.getPoint() - userPointMessage.getPoint());
}
//用户积分变动
chargeUserMapper.updateByPrimaryKey(chargeUser);
} finally {
lock.unlock();
}
}
}
这里的分布式锁的好处:
- 保证了这个重复消费部分代码的原子性
- 保证了此时只有一个线程对用户积分进行修改
其实,正常情况下,不会走失败消息记录表,但是作为程序不得不多考虑点。
定时任务部分代码:
/**
* {@code @description:} 用户积分消息发送job
*
* @author tianwc 公众号:Java后端技术全栈
* 在线刷题 1200+java面试题和1000+篇技术文章:https://woaijava.cc/
* {@code @date:} 2024-03-24 9:19
* {@code @version:} 1.0
*/
@Slf4j
@Component
public class UserPointRetryMessageJob {
@Resource
private RetryMessageMapper retryMessageMapper;
@Resource
private UserPointProducer userPointProducer;
@XxlJob("userPointRetryMessageJob")
public void process() {
log.info("开始执行 userPointRetryMessageJob 定时任务");
XxlJobHelper.log("start userPointRetryMessageJob job");
int countRetryMessage = retryMessageMapper.countRetryMessage(0, 0);
if (countRetryMessage == 0) {
log.info(" 执行结束 userPointRetryMessageJob 没有消息需要重发");
}
List<RetryMessage> retryMessages = retryMessageMapper.selectRetryMessage(0, 0);
for (RetryMessage retryMessage : retryMessages) {
userPointProducer.sendMessage(retryMessage);
}
}
}
这里还可以优化,你能想到吗?
List<RetryMessage> retryMessages = retryMessageMapper.selectRetryMessage(0, 0);
这里是一次性全部查出来了,如果出现大量消息发送失败,一次性放到本地缓存里,很容易出问题,所以,我们可以再优化成分页进行处理,比如:每次处理50条,再根据count来计算需要进行分页。
定时任务中生产者代码实现:
/**
* {@code @description:} 用户积分消息发送生产者
*
* @author tianwc 公众号:Java后端技术全栈
* 在线刷题 1200+java面试题和1000+篇技术文章:https://woaijava.cc/
* {@code @date:} 2024-03-24 9:19
* {@code @version:} 1.0
*/
@Slf4j
@Component
public class UserPointProducer {
@Resource
private RetryMessageMapper chargeUserMapper;
public void sendMessage(RetryMessage retryMessage) {
log.info("用户积分消息重试补发,{}", retryMessage);
String message = retryMessage.getContent();
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
RabbitTemplate rabbitTemplate = ApplicationContextFactory.getBean(RabbitTemplate.class);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
retryMessage.setStatus(1);
chargeUserMapper.updateByPrimaryKey(retryMessage);
log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);
} else {
retryMessage.setRetriedTimes(retryMessage.getRetriedTimes() + 1);
chargeUserMapper.updateByPrimaryKey(retryMessage);
log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause);
}
});
rabbitTemplate.convertAndSend(RabbitMQConstantEnum.USER_POINT.getExchange()
, RabbitMQConstantEnum.USER_POINT.getRoutingKey(), message, message1 -> {
message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化
return message1;
}, correlationId);
log.info("用户积分消息重试补发完成");
}
}
失败消息表
CREATE TABLE `retry_message` (
`id` bigint NOT NULL AUTO_INCREMENT,
`type` int DEFAULT NULL,
`content` text,
`retried_times` int DEFAULT NULL,
`retry_limit` int NOT NULL DEFAULT '1',
`create_time` datetime DEFAULT NULL,
`status` int DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 ;
一个数据库里就只要一张表即可,专门用来存储发送失败消息。
tyep:什么业务场景
content:具体消息内容
retried_times:已经重试了几次
retry_limit:重试次数限制
status:状态,是否需要重试
这里的重试次数限制,我们也可以采用配置的方式,这样就可以在分布式配置中心对此进行动态调整,不过,这个好像没什么必要,因为不会频繁地更换这个限制。直接存在表里还可以动态的针对某些业务做特殊处理,比如业务A限制次数2次,业务B次数改成3次....
没有完美的解决方法,但总有相对完美的解决方案即可。
关于积分模块,其实不止有增加积分,还有扣除积分。
比如:用户使用积分兑换优惠券,积分目前设计在用户中心,优惠券又在营销中心,所以,会涉及到分布式事务问题。
我们可以采用Seata、Atomikos、RockSeataetMQ等技术来解决,目前充电桩项目中用到过Atomikos,但是代码量实在是会增加不少,最后使用了Seata来解决分布式事务问题。
最后
希望通过本文学习,下次再遇到面试官问消息队列的两个问题,就不再是背八股文了。
好了,今天就跟大家分享这么多,希望能给你带来点点帮助。
麻烦个三连呗:点赞、转发、再看,谢谢啦!
转载自:https://juejin.cn/post/7361934843209269248