likes
comments
collection
share

2. 选课系统优化:Redis缓存带来的性能突破

作者站长头像
站长
· 阅读数 53

回顾上文

在上文中我们完成一个简单的选课系统,但是单纯的使用Gin+Mysql是完全不够用的。

现存的问题

  • 在这高并发的系统下,MySQL 的性能瓶颈是受限制的,涉及到事务的时候还可能会存在事务超时。

本文解决

在本文将引入Redis缓存来进行减轻Mysql压力,尽可能查询或修改时在Redis进行操作,最终在进行同步到Mysql。

能学到什么

  • go操作redis
  • 如何进行缓存预热
  • 布隆过滤器 go-bloom

涉及技术

  • Gin
  • Mysql
  • Redis

架构设计

暂时无法在飞书文档外展示此内容

2. 选课系统优化:Redis缓存带来的性能突破

需改进地方

从哪着手呢?

这里从选课逻辑进行着手。

  1. 课程和用户的ID是可以来说固定的,那么可以在项目启动时,加载课程ID到布隆过滤器里面。
  2. 在判断用户是否选择了该课程时,可以使用redis缓存的set数据结构来进行记录用户选的课程集合。
  3. 对于一个选课系统来说我们可以制定一个选课流程:凡事扣减库存操作在redis内存数据库操作(预扣减库存)

暂时无法在飞书文档外展示此内容

2. 选课系统优化:Redis缓存带来的性能突破

上布隆

在这里可以将user和course的主键加入到布隆过滤器里面,针对布隆的特性,存在是可能存在,但是不存在就一定不存在,且对可能存在的误判极其的低,那么我们只进行判断不存在就行了,对于不确定性还是需要加锁。但是这里也会有显著的性能提升。


var UserBloom *bloom.BloomFilter
var CourseBloom *bloom.BloomFilter

func InitializeBloom() error {
    // 初始化操作
    CourseBloom = bloom.NewWithEstimates(100000, 0.01)
    UserBloom = bloom.NewWithEstimates(100000, 0.01)
    var wg sync.WaitGroup
    var err error
    wg.Add(2)
    go func() {
       var userList []*models.User
       if err = database.Client.Find(&userList).Error; err != nil {
          return
       }
       for _, user := range userList {
          // 加载id进入布隆过滤器
          UserBloom.AddString(fmt.Sprintf("%d", user.ID))
       }
    }()
    go func() {
       var courseList []*models.Course
       if err = database.Client.Find(&courseList).Error; err != nil {
          return
       }
       for _, course := range courseList {
          // 加载id进入布隆过滤器
          CourseBloom.AddString(fmt.Sprintf("%d", course.ID))
       }
    }()
    wg.Wait()

    return err
}

预热到redis

这里模拟老师上传课程和设置容量

// 预热到redis
func TestPreheatMysql2Redis(t *testing.T) {
    var course []models.Course
    if err := database.Client.
       Preload("Schedule").
       Preload("Category").
       Find(&course).Error; err != nil {
       t.Error(err)
    }
    for _, v := range course {
       if err := cache.RDB.HSet(
          context.Background(),
          fmt.Sprintf(keys.CourseHsetKey, v.ID),
          keys.CourseCategoryIDKey, v.CategoryID,
          keys.CourseScheduleDurationKey, uint(v.Schedule.Duration),
          keys.CourseScheduleWeekKey, uint(v.Schedule.Week),
          keys.CourseCapacityKey, v.Capacity,
       ).Err(); err != nil {
          t.Error(err)
       }
    }
}

如何进行扣减库存呢?

由于在redis扣减课程,使用了redis的hset数据结构,可以通过increBy命令进行针对某个key的增减。

  • 校验课程是否还存在容量
  • 进行扣减

这里先预警一下,这里存在问题。具体问题见下


// 2.6. 扣减课程容量与创建课程操作
var err error
var capacityStr string
if capacityStr, err = cache.RDB.HGet(ctx, key, keys.CourseCapacityKey).Result(); err != nil {
    logger.Logger.WithContext(ctx).Info("获取课程容量失败", err)
}
if capacity, _ := strconv.Atoi(capacityStr); capacity <= 0 {
    logger.Logger.WithContext(ctx).Info("课程容量不足", err)
    resp.Fail(ctx, code.Fail, code.CourseFull, code.CourseFullMsg)
    return
}
// 3. 创建操作
// 3.1 扣减课程容量
if err := cache.RDB.HIncrBy(ctx, key, keys.CourseCapacityKey, -1).Err(); err != nil {
    logger.Logger.WithContext(ctx).Info("扣减课程容量失败", err)
    resp.DBError(ctx)
}

进行加入到用户课程集合

// 3.2 加入到用户课程集合
if err := cache.RDB.SAdd(ctx, fmt.Sprintf(keys.UserCourseSetKey, req.UserID), req.CourseID).Err(); err != nil {
    logger.Logger.WithContext(ctx).Info("加入到用户课程集合失败", err)
    resp.DBError(ctx)
}

然后我们可以进行异步写入到mysql里

// 异步写入到数据库
go Async2Mysql(ctx, &user, req.CourseID, offset)

以上操作存在很严重的问题。---- 临界问题

因为以上的对redis的操作并不是原子性的。

这里只针对原子错误,由于redis对于写操作都是单线程的,那么就避免了多线程环境下造成的临界问题。

其实redis也提供了事务机制,但是存在一些缺陷:非隔离性、操作失败无回滚、二阶提交等。所有这里就不使用redis的事务,那么怎么解决呢?

其实Redis还提供了原子操作,可以编写Lua脚本,且由于单线程执行(指写)可以像事务一样,且避免了并发问题,确保执行顺序和一致性等。

提示:其实上面的一系列操作也可以使用管道。如果对redis多次访问时,可以使用管道机制将多条命令一次性打包发送到redis中。但是管道并不是原子性的。

原子操作

lua脚本

CourseSelectLuaScript = `
    -- 1. 用户是否已经选择了
    if redis.call("sismember", KEYS[1], KEYS[2]) == 1 then
       return 1 
    end    
    -- 2. 选课操作
    local count =tonumber(redis.call("hget", KEYS[3], KEYS[4]))
    if count and count > 0 then
       -- 课程人数减 一
       redis.call("hincrby", KEYS[3], KEYS[4], -1)
       -- 选课,添加到用户集合
       redis.call("sadd", KEYS[1], KEYS[2])
       return 0
    else
       -- 容量满了
       return 2
    end
`

执行脚本

if err := database.Client.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
    // 1. 获取课程的week和duration
    var course models.Course
    if err := tx.Model(&models.Course{}).Preload("Schedule").First(&course, req.CourseID).Error; err != nil {
       if errors.Is(err, gorm.ErrRecordNotFound) {
          return err
       }
       return err
    }
    // 2. 获取用户的flag字段
    var user models.User
    //
    if err := tx.Clauses(clause.Locking{Strength: "SHARE"}).Where("id = ?", req.UserID).First(&user).Error; err != nil {
       if errors.Is(err, gorm.ErrRecordNotFound) {
          return err
       }
       return err
    }

    /*
       args:
          key1 = 用户key
          key2 = 课程id
          key3 = 课程key
          key4 = capacity key
       return:
          0 : 执行正常
          1 : 用户是否已经选择了
          2 : 课程满了
    */
    // 3. 判断是否选存在时间冲突
    offset := int(course.Schedule.Week*3) + int(course.Schedule.Duration)
    if user.Flag.TestBit(offset) {
       resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
       return errors.New("用户已经选过该课程")
    }
    // 4. 执行脚本进行扣减课程和创建
    script := redis.NewScript(lua.CourseSelectLuaScript)
    var val interface{}
    var err2 error
    if val, err2 = script.Run(ctx, cache.RDB, []string{
       fmt.Sprintf(keys.UserCourseSetKey, req.UserID),
       strconv.Itoa(int(req.CourseID)),
       fmt.Sprintf(keys.CourseHsetKey, req.CourseID),
       keys.CourseCapacityKey,
    }, req.UserID, req.CourseID).Result(); err2 != nil {
       logger.Logger.WithContext(ctx).Info("执行lua脚本失败", err2)
       resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
       return err2
    }
    switch val.(int64) {
    case lua.CourseSelectOK:
       logger.Logger.WithContext(ctx).Info("选课成功")
       go AsyncSelect2Mysql(ctx, req.UserID, req.CourseID)
    case lua.CourseSelected:
       logger.Logger.WithContext(ctx).Info("用户已经选择该门课程")
       resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
       return errors.New("用户已经选择该门课程")
    case lua.CourseFull:
       logger.Logger.WithContext(ctx).Info("课程已满")
       resp.Fail(ctx, code.Fail, code.CourseFull, code.CourseFullMsg)
       return errors.New("课程已满")
    default:
       logger.Logger.WithContext(ctx).Info("未知错误")
       return errors.New("未知错误")
    }
    // 5. 修改用户flag
    user.Flag.SetBit(offset)
    if err := tx.Save(&user).Error; err != nil {
       return err
    }
    
}); err != nil {
    logger.Logger.WithContext(ctx).Info("事务失败", err)
    resp.Fail(ctx, code.Fail, code.DBError, code.DBErrorMsg)
    return
}

进行并发测试

测压

2. 选课系统优化:Redis缓存带来的性能突破

  • mysql

2. 选课系统优化:Redis缓存带来的性能突破

2. 选课系统优化:Redis缓存带来的性能突破

  • redis

2. 选课系统优化:Redis缓存带来的性能突破

结果呢?居然出现了超卖,出现了负库存,这必然是一个非常严重的bug。我不是使用了事务吗?为什么还是出现了错误呢?

代码细读一下

事务开始
  1. 获取课程的week和duration

这里没问题只是获取课程Schedule的时间段,是并发安全的可以确保一定不会修改 (安全)

    // 1. 获取课程的week和duration
    var course models.Course
    if err := tx.Model(&models.Course{}).Preload("Schedule").First(&course, req.CourseID).Error; err != nil {
       if errors.Is(err, gorm.ErrRecordNotFound) {
          return err
       }
       return err
    }
  1. 获取用户的flag字段

由于不能确保user的flag字段的并发安全,加了一把共享锁,读读共享,读写加锁。 (安全)

    // 2. 获取用户的flag字段
    var user models.User
    if err := tx.Clauses(clause.Locking{Strength: "SHARE"}).Where("id = ?", req.UserID).First(&user).Error; err != nil {
       if errors.Is(err, gorm.ErrRecordNotFound) {
          return err
       }
       return err
    }
  1. 判断是否选存在时间冲突。这里可以确保user.Flag字段的安全,由于在上一步加了共享锁,如果存在修改的话会阻塞到上一步。 (安全)
  // 3. 判断是否选存在时间冲突
offset := int(course.Schedule.Week*3) + int(course.Schedule.Duration)
if user.Flag.TestBit(offset) {
   resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
   return errors.New("用户已经选过该课程")
}
  1. 执行脚本进行扣减课程和创建,这里的话也可以确保安全,redis(原子+单线程写)。 (安全)
// 4. 执行脚本进行扣减课程和创建
script := redis.NewScript(lua.CourseSelectLuaScript)
var val interface{}
var err2 error
if val, err2 = script.Run(ctx, cache.RDB, []string{
   fmt.Sprintf(keys.UserCourseSetKey, req.UserID),
   strconv.Itoa(int(req.CourseID)),
   fmt.Sprintf(keys.CourseHsetKey, req.CourseID),
   keys.CourseCapacityKey,
}, req.UserID, req.CourseID).Result(); err2 != nil {
   logger.Logger.WithContext(ctx).Info("执行lua脚本失败", err2)
   resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
   return err2
}
switch val.(int64) {
case lua.CourseSelectOK:
   logger.Logger.WithContext(ctx).Info("选课成功")
   go AsyncSelect2Mysql(ctx, req.UserID, req.CourseID)
case lua.CourseSelected:
   logger.Logger.WithContext(ctx).Info("用户已经选择该门课程")
   resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
   return errors.New("用户已经选择该门课程")
case lua.CourseFull:
   logger.Logger.WithContext(ctx).Info("课程已满")
   resp.Fail(ctx, code.Fail, code.CourseFull, code.CourseFullMsg)
   return errors.New("课程已满")
default:
   logger.Logger.WithContext(ctx).Info("未知错误")
   return errors.New("未知错误")
}
  1. 修改用户flag字段。这里和步揍3一样 (安全)
user.Flag.SetBit(offset)
if err := tx.Save(&user).Error; err != nil {
   return err
}
return nil
事务结束
问题来了?

明明步揍都是安全的,只要存在error就会进行回滚操作,为什么会扣减库存为负数呢?

再来细看

2. 选课系统优化:Redis缓存带来的性能突破

2. 选课系统优化:Redis缓存带来的性能突破

2. 选课系统优化:Redis缓存带来的性能突破

怎么进行解决呢?

其实呢很简单,调换 步骤5 和 步骤4 的顺序。

因为对 步骤5 的不确定性,如果 步骤5 出现错误那么对于 步骤4 是不可逆的,就是这里存在了间隙。对于 步骤5 造成的对Mysql事务的回滚,但是不会对 步骤4 的任何影响。但是如果 步骤5 在 步骤4 先执行,那么即使 步骤5 的操作的不确定性,就不会对 步骤4 造成影响,步骤5 执行错误了回滚,步骤5 执行成功,即使 步骤4 执行错误也会进行撤回redis和回滚mysql。

if err := database.Client.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
    // 1. 获取课程的week和duration
    var course models.Course
    if err := tx.Model(&models.Course{}).Preload("Schedule").First(&course, req.CourseID).Error; err != nil {
       if errors.Is(err, gorm.ErrRecordNotFound) {
          return err
       }
       return err
    }
    // 2. 获取用户的flag字段
    var user models.User
    //
    if err := tx.Clauses(clause.Locking{Strength: "SHARE"}).Where("id = ?", req.UserID).First(&user).Error; err != nil {
       if errors.Is(err, gorm.ErrRecordNotFound) {
          return err
       }
       return err
    }

    // 3. 判断是否选存在时间冲突
    offset := int(course.Schedule.Week*3) + int(course.Schedule.Duration)
    if user.Flag.TestBit(offset) {
       resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
       return errors.New("用户已经选过该课程")
    }
    // 4. 修改用户flag
    user.Flag.SetBit(offset)
    if err := tx.Save(&user).Error; err != nil {
       return err
    }
    // 5. 执行脚本进行扣减课程和创建
    script := redis.NewScript(lua.CourseSelectLuaScript)
    var val interface{}
    var err2 error
    if val, err2 = script.Run(ctx, cache.RDB, []string{
       fmt.Sprintf(keys.UserCourseSetKey, req.UserID),
       strconv.Itoa(int(req.CourseID)),
       fmt.Sprintf(keys.CourseHsetKey, req.CourseID),
       keys.CourseCapacityKey,
    }, req.UserID, req.CourseID).Result(); err2 != nil {
       logger.Logger.WithContext(ctx).Info("执行lua脚本失败", err2)
       resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
       return err2
    }
    switch val.(int64) {
    case lua.CourseSelectOK:
       logger.Logger.WithContext(ctx).Info("选课成功")
       go AsyncSelect2Mysql(ctx, req.UserID, req.CourseID)
    case lua.CourseSelected:
       logger.Logger.WithContext(ctx).Info("用户已经选择该门课程")
       resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
       return errors.New("用户已经选择该门课程")
    case lua.CourseFull:
       logger.Logger.WithContext(ctx).Info("课程已满")
       resp.Fail(ctx, code.Fail, code.CourseFull, code.CourseFullMsg)
       return errors.New("课程已满")
    default:
       logger.Logger.WithContext(ctx).Info("未知错误")
       return errors.New("未知错误")
    }
    return nil
}); err != nil {
    logger.Logger.WithContext(ctx).Info("事务失败", err)
    resp.Fail(ctx, code.Fail, code.DBError, code.DBErrorMsg)
    return
}

再次进行测试

2. 选课系统优化:Redis缓存带来的性能突破

2. 选课系统优化:Redis缓存带来的性能突破

2. 选课系统优化:Redis缓存带来的性能突破

这里作者进行了多次测试,目前的话对于事务操作是没有出现什么错误的。但是呢?这里还是存在一个隐藏的Bug的。发现的友友可以在评论区留言。有关数据一致相关的。

进一步优化

多级缓存存储架构

暂时无法在飞书文档外展示此内容

2. 选课系统优化:Redis缓存带来的性能突破

业务分析

针对本项目其实很简单,只涉及到了两张表user表和course表,其中user表的flag字段记录着用户时刻表(也就是用户在星期几的那个时间段有课的记录),course表主要是牵扯到课程容量这个字段。

在本业务中user表和course表都是涉及到读多写多的场景,例如:用户选课都会涉及到flag字段和course字段的修改,针对course字段直接在Redis中完成自增或自减的操作,并在某一时刻同步到数据库。

为什么不使用二级缓存呢?

在缓存架构中二级缓存常用于读多写少的场景下,如果读写频率差不多少时不推荐使用二级缓存,因为涉及到二级缓存和数据库同步的问题,对一个热写的数据时不时都需要进行同步,且同步只能是最终的一致性,且涉及到反复的对数据库进行读写和回写,甚至还会造成缓存击穿和雪崩问题。总之弊远大于利。

总之,选择是否使用二级缓存,以及如何设计缓存策略,应基于具体的应用场景和性能需求综合考量。

所以呢?本文并没有什么改进了,下期见。

补充

针对于刚刚提到了二级缓存,其实对于步骤一,由于读取课程的week和duration的值是不会改变的,那么就可以在项目启动时可以把对应课程id的duration和week字段缓存到本地。在明白点来说也不必要缓存到二级缓存中,缓存到单独的一个map就行。至于map嘛,普通的map就行了,只要不涉及到并发读写就行,单纯的并发读是不会panic的。

总结

本期通过使用缓存对上期单独使用mysql遇到的性能瓶颈,做出了一定的优化,我们使用redis进行一个预扣减库存的操作,且在写单线程和原子的一系列操作下确保不会出现“超卖问题”,从而进行预扣课程容量的操作。本文还引入了布隆过滤器,针对于一些恶意的用户,构造非法参数。可以避免大量无效请求对redis造成雪崩的问题。至于缓存穿透是不会了的,因为这里只涉及到了一层,当redis访问不到时是直接返回响应的,并不会向常规的再去访问下一层DB。到了末尾我们通过代码层面上解决mysql事务里也能有效的撤回redis操作。从而避免了并发临界问题。

本期代码请见:github.com/bbz1024/sel…

若阁下感兴趣可以clone下来玩一玩。

bash
复制代码
git clone --branch demo2 https://github.com/bbz1024/select-course.git
转载自:https://juejin.cn/post/7378720599106977802
评论
请登录