likes
comments
collection
share

【Java项目】关于抢红包这个场景我是如何实现的

作者站长头像
站长
· 阅读数 15

抢红包

使用的就是redis的list集合,然后这里有一个意外情况就是:

redis如果当前用户取出数据成功了,但是此时redis宕机了,然后aof同步失败,那么下一次恢复了后,就会拿到这条没有出队的数据,数据就出错了。

但是这个线程他是成功的,所以它可以成功的执行对数据库的操作。

但是redis恢复之后,后面的线程都会出问题。

所以我们必须得保证就是当前线程拿到的这个积分位置是对的。

这也就是一个很正常的秒杀场景。

就算用Lua也没用。

其实只是其中一台宕机,为了保证数据安全,我们可以用红锁,也就是集群中1/2以上的节点数据操作成功。

其实我应该先交代一下我们公司这个业务的场景,我们公司的Redis集群的配置为20台实例,每台实例80C1TB内存大小。

红包存放

这里需要考虑到一个个小红包放入到Redis中是成功的,原子性的。所以这里使用了Lua脚本进行红包的存放。

并且为了确保减少不必要的索引次数,我在Cluster集群中使用了前缀key,来确保当前红包的操作都会路由到同一个Redis服务器上。

Lua 脚本有几个优点:

  1. 原子性: 如上所述,Lua 脚本在 Redis 中是原子性执行的。
  2. 减少网络开销: 如果你需要执行一系列操作,使用 Lua 脚本会减少客户端和服务器之间的往返次数。
  3. 简化客户端逻辑: 使用 Lua 脚本可以将一些逻辑转移到数据库层,使得客户端代码更简单。
  4. 更强的一致性保证: 由于 Lua 脚本是原子性执行的,因此它们提供了一种有效的方式来处理复杂的、需要多步骤的操作,而不会因并发操作而出现数据不一致的问题。
  5. 服务器端计算: 对于一些计算密集型任务,Lua 脚本允许你在服务器端进行计算,而不是在客户端进行,这样可以减轻客户端的负担。

使用 Lua 脚本的一个潜在缺点是,由于脚本是阻塞性执行的,一个长时间运行的脚本可能会阻塞其他所有操作。因此,务必确保你的 Lua 脚本尽可能地快速和高效。

为了保证红包存放的安全性,由于我已经把当个红包的信息全都路由到了某一台机器上,所以其他机器是没有这个红包的备份信息的,所以我们的节点采用的是Cluster集群模式并且采用的是主从节点。

这样子主机宕机了也会故障切换到从机,就不会出大问题了。

红包获取

上面我们已经保证了红包的存放过程是基本没问题的了,那么接下来的过程就是红包的获取了。

我们知道一个红包肯定只能获取一次,因此当一个用户开始抢红包的时候,并且抢到红包之后,我们可以在redis中为他生成一个token来表示当前用户已经抢过红包,不允许下次再抢了,这个逻辑很简单。但是也有如下的需要考虑的地方:

原子性:生成token和判断用户是否已经抢过红包需要在一个原子操作中完成,以防止同一用户多次抢红包。这里也可以考虑使用Lua脚本。

对于我的抢红包场景,有几种其他方法可以考虑,以提高性能:

  1. 预先分配红包: 在红包被创建时,预先将其分配给各个用户,并在用户实际领取时进行确认。这样,你只需要一个简单的 GET 操作就能完成整个过程。
  2. 后处理: 允许用户“抢”红包,但不立即完成交易。然后在后台进行批处理,确认哪些用户实际上有资格领取红包。
  3. 延迟确认: 在用户抢到红包后,先将其放入一个“待确认”队列。后台服务负责从该队列中取出红包,并进行最终确认和处理。

预分配红包

预先分配红包: 在红包被创建时,预先将其分配给各个用户,并在用户实际领取时进行确认。这样,你只需要一个简单的 GET 操作就能完成整个过程。

预先分配红包通常意味着在活动开始之前,根据一些特定的规则或算法,将红包或红包的"资格"预先分配给一部分预定的用户。这样做的一个主要目的是为了减轻高并发下对后端系统(比如数据库或缓存系统)的压力。

然而,预先分配确实一些问题:如果预分配的用户没有实际参与抢红包,那么这些红包或红包资格如何处理?

  1. 超时机制: 可以设定一个合理的时间窗口,在这个时间内,如果预分配的用户没有抢到红包,那么这些红包可以重新进入"公共池",供其他用户抢夺。
  2. 动态调整: 预分配只是一个"优先权"而非"确定权"。也就是说,预分配用户在规定时间内有优先抢红包的权利,但过了这个时间,红包即可供其他用户抢夺。
  3. 分阶段处理: 首先给预分配的用户一个优先抢红包的时间窗口,如果他们在这个时间内没有抢到,那么进入第二阶段,允许所有用户都能抢这些红包。
  4. 混合模式: 一部分红包预先分配,一部分随机分配。这样即使预分配的用户没有参与,也不会影响整体的红包分发。
  5. "懒"预分配: 仅在用户实际点击抢红包时才进行预分配,这样可以避免预分配但未参与的问题,但这样做可能无法完全避免高并发带来的压力。

下面是一段预分配红包的代码

@Service
public class RedPacketService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // 生成红包并放入Redis
    public void generateRedPackets(int totalAmount, int numPackets, String redPacketId) {
        List<Integer> redPacketList = splitRedPackets(totalAmount, numPackets);
        redisTemplate.opsForList().leftPushAll("red_packet:" + redPacketId, redPacketList);
    }

    // 将红包预分配给指定用户
    public void preallocateRedPacket(String redPacketId, String userId) {
        redisTemplate.opsForValue().set("preallocated_red_packet:" + userId, redPacketId, 10, TimeUnit.MINUTES);
    }

    // 从Redis中抢红包
    public Integer grabRedPacket(String redPacketId, String userId) {
        // 检查用户是否有预分配的红包
        String preallocatedId = (String) redisTemplate.opsForValue().get("preallocated_red_packet:" + userId);
        if (preallocatedId != null && preallocatedId.equals(redPacketId)) {
            // 这里假设抢到红包的逻辑是从Redis List的左侧弹出一个红包
            Integer amount = (Integer) redisTemplate.opsForList().leftPop("red_packet:" + redPacketId);
            return amount;  // 返回抢到的红包金额
        }
        // 没有预分配或预分配的红包不匹配
        return null;
    }

    // 其他方法,例如splitRedPackets()用于拆分红包金额
}

之后我们就是要考虑如何对预分配的用户进行预分配操作了,这里我是用的是Redission的读写锁。

我可以让没有预分配的用户抢红包的时候只拿读锁,此时不会阻塞,如果出现了当前用户是预分配的用户,那么我给他加一个写锁,那么此时其他用户就会阻塞,只有当前预分配用户抢完红包之后才可以结束阻塞。

  1. 对于预分配的用户:在他们准备抢红包时,获取写锁。这将会阻塞所有其他没有写锁的用户(无论是否预分配)。只有当前预分配的用户完成了红包的抢夺逻辑后,才会释放写锁。
javaCopy code
// Java pseudocode
RReadWriteLock rwlock = redisson.getReadWriteLock("redPacketLock");
RLock writeLock = rwlock.writeLock();
try {
    writeLock.lock();
    // 抢红包逻辑
} finally {
    writeLock.unlock();
}
  1. 对于非预分配的用户:在他们准备抢红包时,获取读锁。如果没有任何写锁(即没有预分配的用户在抢红包),这将不会被阻塞。
javaCopy code
// Java pseudocode
RReadWriteLock rwlock = redisson.getReadWriteLock("redPacketLock");
RLock readLock = rwlock.readLock();
try {
    readLock.lock();
    // 抢红包逻辑
} finally {
    readLock.unlock();
}

这种方式确实可以实现你所描述的并发控制。需要注意的是,使用读写锁会增加系统的复杂性和潜在的性能开销,特别是在高并发场景下。因此,在使用这种方案之前,最好先进行充分的性能和压力测试。

评估了一下预分配红包的优点,如下:

预分配红包也有一系列潜在的优点:

  1. 用户体验提升:对于被预分配红包的用户,因为他们的红包已经预先保留,所以可以提供更流畅和快速的体验。
  2. 降低服务器压力:通过预分配,部分工作可以提前完成,从而在红包开抢的高峰期减轻服务器压力。
  3. 简化并发控制:因为一部分红包已经预先分配给特定用户,所以在实际的抢红包操作中,系统可能需要处理更少的并发请求。
  4. 提高成功率:对于预分配用户,由于红包已经为他们保留,因此抢红包的成功率会相对提高。
  5. 营销和用户粘性:通过预分配,可以为某些特定群体或者忠实用户提供特权,这有助于提高用户粘性和进行更精细化的营销。
  6. 灵活的业务策略:预分配机制允许业务方根据用户行为、历史信息或者其他业务逻辑来动态调整红包分配,增加了业务操作的灵活性。
  7. 减少冲突和竞争:预分配可以减少用户间抢同一个红包的冲突和竞争,可能会减少因并发导致的错误或问题。
  8. 提前验证用户身份:在预分配阶段,还有机会进行额外的身份验证或者安全检查,这可以作为一个额外的安全层。
  9. 可用于数据分析:通过观察哪些用户更可能参与预分配,或者预分配成功后的用户行为,可以收集到有用的数据用于进一步的分析和决策。
  10. 有利于资源规划:了解预分配的数量和特点有助于更准确地进行资源规划,如缓存、数据库和服务器等。

当然这个方案我当初评估了很久,缺点如下:

  1. 复杂性增加:预分配逻辑会增加整体系统的复杂性,包括但不限于锁管理、超时处理和一致性维护等。
  2. 资源占用:预分配意味着你需要额外的数据结构和存储来追踪哪些红包已经被预分配。这可能会增加数据库或缓存的负载。
  3. 响应延迟:如果使用了锁机制(如读写锁)来保证预分配用户有优先权,这可能导致其他用户在高并发场景下遭遇一定的延时。
  4. 预分配的无效性:如果预分配的用户没有按时来领取红包,那么预分配就可能变得无效,需要额外的逻辑来处理这种情况。
  5. 不公平性:预分配策略可能引发一些公平性问题。如果预分配的用户最终没有参与,而其他非预分配用户由于某种原因(例如锁)不能立即访问,这可能会被认为是不公平的。
  6. 调试和监控困难:由于预分配引入了额外的逻辑和状态,这可能会使得系统更难以调试和监控。
  7. 优先级管理:如果预分配用户很多,还需要考虑他们之间的优先级,这也是一个复杂性增加的点。
  8. 潜在的性能问题:使用锁或其他并发控制机制可能会导致性能下降,特别是在高并发的环境中。
  9. 预分配和实际行为不符:用户的行为是不可预测的,即使预分配了红包,也不能保证用户一定会来领。
  10. 滥用风险:如果用户知道了预分配的逻辑,可能会尝试滥用这一特权。

后处理

允许用户“抢”红包,但不立即完成交易。然后在后台进行批处理,确认哪些用户实际上有资格领取红包。这种方案其实类似于抽奖了,也并不是说不行。

后处理(post-processing)策略的核心思想是:先让所有用户的请求通过,即先“接单”,然后在后台异步地处理这些请求。这种做法与实时处理用户请求相反,实时处理需要立即完成所有的业务逻辑,包括数据验证、业务规则应用、数据写入等。

后处理主要有以下几个特点:

  1. 高并发接受能力:由于前端只是接收请求而不进行处理,因此系统可以在短时间内接收大量的请求。
  2. 异步处理:具体的业务逻辑会在后台异步执行,这通常会用批处理、队列、定时任务等方式来实现。
  3. 复杂性隐藏:用户不需要等待所有的业务逻辑都执行完成,这些复杂的逻辑被移到了后台。
  4. 容错能力:即使部分后台处理任务失败,也不会直接影响用户的请求。这给了系统更多的机会进行重试或者人工干预。

在红包抢夺的场景下,后处理可以这样工作:

  1. 当用户尝试抢红包时,系统仅记录下用户的请求,通常是将用户ID和红包ID放入一个待处理队列。
  2. 一个后台任务(可以是定时任务或者是一个持续运行的服务)负责从队列中取出待处理的抢红包请求,并进行实际的业务处理。比如,判断红包是否还有剩余,是否已经被这个用户抢过等。
  3. 最终的抢红包结果(成功或失败)会被记录下来,用户可以通过某种方式(比如查询API或者是推送通知)来获取自己是否成功抢到红包。

这种方式允许系统在短时间内接收大量的抢红包请求,而具体的业务逻辑处理则可以在后台慢慢进行。这样既提高了系统的吞吐量,也简化了前端逻辑。缺点是增加了系统复杂性,并且如果后台处理不及时,可能会影响用户体验。

延迟确认

在用户抢到红包后,先将其放入一个“待确认”队列。后台服务负责从该队列中取出红包,并进行最终确认和处理。这个方案和后处理差不多。

延迟确认是一种用于应对高并发场景的策略,它可以有效地降低实时处理的压力。在这种方案中,用户在前端“抢”到红包后,系统先不立即进行最终的确认和处理,而是将相关信息放入一个“待确认”队列中。后台服务会在稍后从这个队列中取出信息进行处理。

这样做有几个好处:

  1. 解耦: 将红包的“抢取”和“确认”逻辑解耦,可以让前端和后端服务各自独立地进行优化。
  2. 缓解压力: 在高峰期,后端服务可以根据实际情况控制从“待确认”队列中取出信息的速度,从而避免因高并发导致的系统压力。
  3. 容错和重试: 如果后台服务在处理某个红包时出现了问题(比如数据库暂时不可用等),可以将这个红包重新放入队列,稍后再试。

然而,这种方案也有一些缺点:

  1. 用户体验: 用户在“抢”到红包后需要等待后台服务的最终确认,这可能会影响用户体验。
  2. 数据一致性: 因为红包的最终确认是异步进行的,如果系统在这个过程中出现问题(比如后台服务宕机等),可能会导致数据不一致。

上面说了这么多,其实很明显,每个方案都有优缺点,权衡性能,实现难度等,我们否了所有上述方案,就直接用lpop这种方式取出红包哈哈哈。

第一种预分配虽然好,但是实现起来复杂,还会耦合我们的其他的业务,所以我们没选。

那么回到开头说的,如果出现了lpop命令执行成功,但是写入日志aof的时候失败了怎么办?

因为我们线上的redis肯定都是选择RDB和AOF两种备份方式一起使用的,RDB在这里明显不可靠。

所以我们得依靠实时性更好的AOF,但是即使你选择牺牲性能的always方案来保证每次提交操作之后都会马上写入日志,也有可能出现数据丢失。那么接下来我们就重点解决一下这个问题吧。

对于数据存储的可靠性,依赖单一存储引擎(如Redis)往往是有风险的,尤其是在高并发、高可用的分布式系统中。AOF(Append Only File)是Redis用于持久化的一种方式,其虽然提供了多种fsync选项(如alwayseverysecno)用于权衡性能与数据安全,但还是有数据丢失的风险。

最终方案

其实对于上面的问题,我们可以想到的最基本的解决方案如下,我们先列出来:

  1. 双写策略:除了写入Redis外,也同时写入一个更为可靠的存储(比如关系数据库)。这样即使Redis的持久化出现问题,也可以从这个可靠的存储中恢复。
  2. 消息队列:在业务逻辑和数据持久化之间引入消息队列。业务逻辑只负责将操作写入消息队列,由单独的服务负责读取队列并更新Redis和其他存储。这样即使某个环节失败,也可以通过消息队列进行重试。
  3. 数据校验和重试机制:在写入AOF后,可以有一个异步的校验过程确保数据已经持久化。如果校验失败,则进行重试。
  4. 高可用架构:使用主从复制和哨兵模式来提高Redis的可用性。这样即使主节点出现问题,从节点还可以继续提供服务。
  5. 事务性操作与备份:利用Redis的事务特性(虽然不是ACID的全功能事务)和定期备份来降低数据丢失的风险。
  6. 应用层补偿机制:应用层记录关键操作,如果确认数据丢失,利用这些记录进行补偿。
  7. 分布式事务或两阶段提交:在涉及多个存储或服务时,使用更为复杂的事务机制来确保数据一致性。
  8. 监控与报警:通过实时监控Redis和AOF的状态,一旦发现异常立即触发报警。

其实上面这些东西我们基本都能保证,但是其实考虑的就是最最最坏的情况,就是他真的宕机了怎么办?

所以我就考虑到了redis-check-aof 。这是我们最最最后出现了宕机这种情况的时候的数据恢复方法。

因为如果真的出现了由于宕机导致出现的数据记录失败问题,此时RocketMQ消息也发送到MySQL那边成功了,其实我们可以考虑使用MySQL来在崩溃的时候进行数据同步。

redis-check-aof 是一个专门用于检查和修复 AOF(Append Only File)文件的工具,属于 Redis 发行版的一部分。AOF 文件用于 Redis 的持久化,它包含一个完整的事务日志,可用于在系统崩溃后重建数据库状态。然而,在某些情况下,由于突然的宕机或其他意外情况,AOF 文件可能会损坏。

这里是 redis-check-aof 可以为你做的一些具体事项:

  1. 文件检查

该工具会扫描 AOF 文件,识别任何不一致或损坏的命令序列。这是一种预防性措施,用于检测是否存在潜在问题。

  1. 文件修复

如果 redis-check-aof 发现了问题,它会尝试修复这些问题。具体来说,它会生成一个新的、修复后的 AOF 文件,其中删除了损坏或不一致的命令。

  1. 数据恢复

修复后的 AOF 文件可以用于启动一个新的 Redis 实例,从而尽可能地恢复到故障发生前的状态。请注意,这可能意味着一些最近的事务数据将丢失,但大部分数据应该是安全的。

使用场景

  • 预防性维护: 你可以定期运行 redis-check-aof 来检查 AOF 文件的完整性,特别是在做重要更改或更新之前。
  • 紧急恢复: 如果 Redis 实例因为 AOF 文件损坏而无法启动,该工具可用作紧急恢复手段。

在高可用和高冗余的设置中,redis-check-aof 更多地是一个“最后一道防线”。即使你的架构非常健壮,有时也难以避免硬件故障或其他不可预见的问题。在这种情况下,拥有一个能快速恢复数据的工具是非常有价值的。

需要注意的是,使用 redis-check-aof 是一个有损过程,可能会导致一部分数据丢失。因此,它通常应作为最后的恢复选项,而不是首选解决方案。对于极端重要的数据,最好还是有多重备份和恢复策略。

消息发送

完成了红包的获取,那么接下来就是发送RocketMQ消息到消费者那边去进行处理来保存到数据库中了。

在这个过程中,我们需要保证的就是这个消息的可靠性。

对于方案我们有如下几种:

为了确保这个过程中RocketMQ的可靠性,我思考了如下几个点:

  1. 同步发送(Synchronous Send) :最基础的方式就是采用同步发送,等待服务器返回确认。这种方式最直观,但可能会影响到服务的吞吐量。
  2. 异步发送(Asynchronous Send) :为了提高性能,可以采用异步发送。但这种方式需要一个回调机制来处理消息确认或者失败。
  3. 发送重试(Retry Mechanism) :为了应对网络抖动或是短暂的服务不可用,RocketMQ 客户端通常会内置重试机制,你可以设置重试次数和间隔。
  4. 事务消息(Transactional Messages) :如果消息发送和其他资源(比如数据库更新)需要在一个事务中,RocketMQ 支持事务消息。这个已经在前面的回答中详细描述了。
  5. 消息持久化(Message Durability) :确保 RocketMQ 服务器配置了持久化存储,即使宕机也不会丢失消息。
  6. 生产者流控(Producer Flow Control) :通过合理设置生产者端的流控,防止因为生产消息速度太快而导致消息发送失败。
  7. 集群容错(Cluster Fault Tolerance) :在 RocketMQ 集群中,多个 Broker 可以共同工作以提供更高的可用性。
  8. 消息确认(Ack) : 在高可靠性要求的场景中,可以等待 Broker 的确认消息,确认消息已经写入磁盘或者已经复制到 Slave。
  9. DLQ(死信队列) : 对于无法成功处理的消息,将其转移到一个专门的死信队列,以便后续分析和处理。
  10. 消费者幂等性: 确保消费者处理消息是幂等的,即重复处理同一消息不会产生不同效果。这通常需要业务侧的支持。

事务消息

这里我们一开始考虑的是,既然后面的消息是否成功被处理,都得依靠RocketMQ的消息是否成功发出,所以一开始我考虑的是使用RocketMQ的事务消息。

pom.xml 文件中:

xmlCopy code
<dependencies>
    <!-- Spring Boot starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- RocketMQ Spring Boot starter -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.0</version>
    </dependency>
    <!-- 其他依赖,比如 JPA,数据库等 -->
</dependencies>

然后在你的 Spring Boot 应用中:

  1. 定义一个消息发送服务
javaCopy code
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class RedPacketService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Transactional
    public void grabRedPacket(String userId, String redPacketId) {
        // 抢红包逻辑
        // ...

        // 发送事务消息
        rocketMQTemplate.sendMessageInTransaction(
            "redPacketTransactionGroup", 
            "RedPacketTopic", 
            "RedPacket grabbed by " + userId, 
            redPacketId
        );
    }
}
  1. 创建一个 RocketMQ 事务监听器
javaCopy code
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;

@RocketMQTransactionListener(txProducerGroup = "redPacketTransactionGroup")
public class RedPacketTransactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    private YourDatabaseService yourDatabaseService;

    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String redPacketId = (String) arg;

        try {
            // 本地事务逻辑,比如存储消息至数据库
            yourDatabaseService.saveRedPacketTransaction(redPacketId, msg);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 事务状态检查逻辑
        String redPacketId = msg.getKeys();
        boolean isRedPacketTransactionExist = yourDatabaseService.isRedPacketTransactionExist(redPacketId);

        if (isRedPacketTransactionExist) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

在这个示例中,YourDatabaseService 是一个用于执行数据库操作的服务。当发送事务消息时,RocketMQ 会首先调用 executeLocalTransaction 方法以尝试执行本地事务。之后,它会根据该方法的返回值来决定是否提交或回滚消息。

但是,事务消息有一个很大的问题在这个场景中!

在高并发场景下,直接在消息发送过程中进行数据库操作确实可能会引入性能瓶颈或其他问题。RocketMQ 的事务消息功能通常更适用于需要确保消息和本地事务同时成功或失败的场景,而不一定适用于高并发场景。

所以这里我的消息发送使用的就是一个同步消息,然后使用RocketMQ集群以及消息持久化这些机制保证消息的可靠性。

转载自:https://juejin.cn/post/7278247059509264395
评论
请登录