likes
comments
collection
share

4. 从内到外:深入选课系统的全面性能优化

作者站长头像
站长
· 阅读数 35

回顾上文

在上文中我们通过消息队列进行异步化处理我们选课后的入库操作,且有效的避免了数据的丢失。通过一系列的sao操作,达到了最终的一致性。到这里一个看起来还能用的选课系统初步完成了,接下来我们在原基础上进行优化与封装。

能学到什么

  • 优化角度

  • redis的bitMap的使用

如何优化

这期我们主要对选课/退课业务逻辑进行优化与封装,至于其他优化请见后期。

  1. 优化一

在前期的话我们都是通过加锁从数据库读取用户的flag字段和读取课程的时间段,虽然这样的话不会出现什么问题,如果涉及到同一个用户并发的访问和修改flag字段时这就会涉及到锁的互斥了。其实我们只是想通过flag字段去记录某个用户某个时间段的选课记录。这么想一下创建用户的课程都能放入到redis里去操作,那修改用户的flag字段是不是也可以在redis进行操作呢?

if err := database.Client.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
    // 1. 获取课程的week和duration,这里完全可以通过map读取的。从而进一步减数少对db的访问
    var course models.Course
    if err := tx.Model(&models.Course{}).Preload("Schedule").First(&course, req.CourseID).Error; err != nil {
       if errors.Is(err, gorm.ErrRecordNotFound) {
          return err
       }
       return err
    }
    // 2. 获取用户的flag字段
    var user models.User
    //
    if err := tx.Clauses(clause.Locking{Strength: "SHARE"}).Where("id = ?", req.UserID).First(&user).Error; err != nil {
       if errors.Is(err, gorm.ErrRecordNotFound) {
          return err
       }
       return err
    }
    offset := int(course.Schedule.Week*3) + int(course.Schedule.Duration)
    if user.Flag.TestBit(offset) {
       resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
       return errors.New("用户已经选过该课程")
    }
    // 4. 修改用户flag
    user.Flag.SetBit(offset)
    if err := tx.Save(&user).Error; err != nil {
       return err
    }
    return nil
})

方案一

我们可以把每个课程的时间段分别放入到对应时间段的时间里,如果选的课程对应的集合和用户已选的课程存在交集的话,那么就存在时间冲突了,这样的话我们无需通过修改user表的flag字段也能进行操作,避免了直接对数据库层操作,而是内存层面上的计算。

暂时无法在飞书文档外展示此内容

4. 从内到外:深入选课系统的全面性能优化 虽然解决了不从数据库进行判断是否存在冲突,有点类似于在用户选课记录里进行集合遍历的操作。如果redis的数据丢失了那岂不是用户集合也随之丢失了。倒过来看其实除了性能上,并没有之前存入数据库的安全可靠。那是不是可以将数据异步写入到数据库呢?还是保留flag字段,在redis层进行操作,后期某个时间内写入到数据库。

方案二

回想一下我们之前flag字段是分为16bit位,这样刚好可以把上课周的早中晚都进行表示的。其实redis也提供了类似的数据结构BitMap位图,个人认为底层实现原理也是类似通过偏移量来进行置位操作。

4. 从内到外:深入选课系统的全面性能优化

然后通过异步的写入到数据库,当redis的数据丢失时可以将mysql存储的flag信息写入到redis。这样的话我们就可以将选课操作的全部在redis进行计算,然后在通过消息队列在某个时间内进行写库操作。

落地

修改lua脚本

-- 1. 用户是否已经选择了
if redis.call("sismember", KEYS[1], KEYS[2]) == 1 then
    return 1 
end 
-- 2. 是否存在时间冲突 判断某个时间段是否为1
local bitmap = tonumber(redis.call("getbit", KEYS[5], KEYS[6]))
if bitmap and bitmap == 1 then
   return 2
end
-- 3. 选课操作
local count =tonumber(redis.call("hget", KEYS[3], KEYS[4]))
if count and count > 0 then
    -- 课程人数减 一
    redis.call("hincrby", KEYS[3], KEYS[4], -1)
    -- 选课,添加到用户集合
    redis.call("sadd", KEYS[1], KEYS[2])
    -- 课程时间段设置为1
    redis.call("setbit", KEYS[5], KEYS[6], 1) 
    return 0
else
    -- 容量满了
    return 3
end
选课

这里的话由于没涉及到对mysql的操作就不需要使用mysql的事务了,就把执行lua作为整体的一个原子操作即可。


func SelectCourse(ctx *gin.Context) {
    var req request.SelectCourseReq
    if err := ctx.ShouldBind(&req); err != nil {
       handleRequestError(ctx, err)
       return
    }
    offset, err := local.CalOffset(req.CourseID)
    validateAndLogError(ctx, err, code.Fail, "获取课程时间失败")
    val, err := executeLuaScript(ctx, cache.RDB, redis.NewScript(lua.CourseSelectLuaScript), []string{
       fmt.Sprintf(keys.UserCourseSetKey, req.UserID),
       strconv.Itoa(int(req.CourseID)),
       fmt.Sprintf(keys.CourseHsetKey, req.CourseID),
       keys.CourseCapacityKey,
       fmt.Sprintf(keys.UserCourseScheduleBitMapKey, req.UserID),
       strconv.Itoa(offset),
    }, req.UserID, req.CourseID)
    if err != nil {
       logger.Logger.WithContext(ctx).Info("执行lua脚本失败", err)
       resp.Fail(ctx, code.Fail, code.Fail, code.FailMsg)
       return
    }


}

再提一嘴redis的位图大小是随着偏移量的增加而变大,这里偏移量这样15,那么2B就足以记录用户的选课时间了。这样一来我们可以代替了使用方案一带来的多集合开销,“时间空间双得到”。

  1. 优化二

之前我们对于获取课程时间都是从数据库里获取的,由于课程时间的固定性,且一旦确认了就不会去修改课程时间了,那么我们可以将课程时间写入到本地里,这样就能避免到数据库里反复查询了。即使数据库层面上也会对查询做优化,但是使用本地缓存的存储方案来减少对数据库的频繁访问总比走数据库更好。且查询时间表还涉及到了子查询的操作。

package local

import (
    "fmt"
    "select-course/demo4/src/models"
    "select-course/demo4/src/storage/database"
)

var CourseSchedule map[uint]*CourseScheduleModel

type CourseScheduleModel struct {
    Week     int
    Duration int
}

func init() {
    CourseSchedule = make(map[uint]*CourseScheduleModel)
    var courseList []*models.Course
    if err := database.Client.Model(&models.Course{}).Preload("Schedule").Find(&courseList).Error; err != nil {
       panic(err)
    }
    for _, course := range courseList {
       CourseSchedule[course.ID] = &CourseScheduleModel{
          Week:     int(course.Schedule.Week),
          Duration: int(course.Schedule.Duration),
       }
    }
}

抽离

随着选课项目的不断扩展,使其存在冗余和代码质量问题。为了解决这一问题我们对存在冗余的代码进行抽离封装与测试。

根据课程时间计算偏移量

}
func CalOffset(courseID uint) (offset int, err error) {
    model, ok := CourseSchedule[courseID]
    if !ok {
       return 0, fmt.Errorf("courseID %d not found", courseID)
    }
    offset = int(model.Week)*3 + model.Duration
    return offset, nil
}

等....

显藏问题

突然发现一个很低级的问题大家看看

4. 从内到外:深入选课系统的全面性能优化

在上章我们也说过消息的一致性,这很显然是在生产阶段造成的消息丢失现象。

如何解决呢?

方案一:事务机制

先发送在创建

类似与mysql和redis结合一样,我们可以先丢入到消息队列,然后在执行lua脚本,脚本执行错误就回滚。但是这里不能我们先需要校验库存操作才能进行放入消息队列,所以是行不通的。

先创建在发送

那我们先进行校验创建操作,在丢入消息队列。倒过来一看这岂不是跟之前的一样还是先操作了redis。其实在这里我们可以进行重试操作,当重试达到一定量时,我们就需要执行回滚了,在回滚时需要进行补偿操作删除创建,达到确保最终的一致。

方案二:发布确认

前提
  • 设置要求队列必须持久化
  • 设置要求消息必须持久化
  • 开启发布确认:channel.confirmSelect()
  1. 单个发布确认
  • 同步确认发布的方式,发布一条原地等待确认,啥时候等到确认啥时候发布下一条。
  1. 批量发布确认
  • 比单个发发布确认好,但是不安全出问题后无法定位哪个消息出了问题,需要通过特特殊手段进行记录。
  1. 异步发布确认
  • 在发布消息前定义一个监听器,这个监听器可以监听发布成功回调、发布失败回调。
  • 发消息的时候尽管发不用管确认的事情,确认逻辑由监听器处理。

confirm回调

当消息无法到达交换机时会进行回调confirm的回调函数

retruns回调

当消息无法路由到正确队列、路由匹配失效,队列不存在回调returns回调函数

go是如何进行声明回调
type Select struct {
    channel *amqp.Channel
    //  confirm
    confirmMsg chan amqp.Confirmation
    // return
    returnMsg chan amqp.Return
}

s.confirmMsg = s.channel.NotifyPublish(make(chan amqp.Confirmation))
s.returnMsg = s.channel.NotifyReturn(make(chan amqp.Return))
后台进行监听
go s.ListenConfirm() // 无法到达交换机、或者消息路由成功
go s.ListenReturns() // 无法路由到正确队列或者路由键规则匹配

func (s *Select) ListenConfirm() {
    for msg := range s.confirmMsg {
       // 无法达到交换机
  
    }
}

func (s *Select) ListenReturns() {
    // 无法路由到队列
    for msg := range s.returnMsg {
    
    }
}

这里需要进行注意发布消息时必须进行指定mandatory=true,这样才能在路由失败时进行错误响应。

实践

这里只进行考虑无法路由情况,当消息发布失败时,进行一定频率的重试机制。若重试超出限制,进行放入到死信队列进行补偿操作。

func (s *Select) ListenReturns() {
    // 无法路由到队列
    for msg := range s.returnMsg {
       // 重新发送,尝试3次
       err := retry.Do(func() error {
          err := s.channel.Publish(
             variable.SelectExchange,
             variable.SelectRoutingKey,
             true,
             false,
             amqp.Publishing{
                ContentType: "text/plain",
                Body:        msg.Body,
             },
          )
          return err
       }, retry.Attempts(3), retry.Delay(time.Millisecond*100))
       if err != nil {
          logger.Logger.Error("消息重新发送失败", err)
          // 丢入到死信队列,进行补偿操作
          err := s.channel.Publish(
             variable.DeadExchange,
             variable.DeadRoutingKey,
             true,
             false,
             amqp.Publishing{
                ContentType: "text/plain",
                Body:        msg.Body,
             },
          )
          if err != nil {
             logger.Logger.Error("消息发送失败", err)
          }
       }
    }
}

到这里你以为就解决了吗?其实我们还需要进行修改消费代码,因为引入了重试也会造成消息顺序的错误。代码与死信处理的代码一样,也需要进行判断消息是否失效。

小结

虽然上面尽可能的避免在发布阶段消息发布失败,在一定量的重试后,丢入到死信队列失败时也是还会造成消息丢失的,但发生的几率非常低。

总结

在这篇文章我们通过redis进行内存计算和本地缓存,一定量的避免了跟数据库打交道来。在编码的过程中我们又发现了一个显在的问题,如果消息发布失败造成消息丢失,那么就丢失了一条创建操作,这就造成了数据不一致的问题。为了解决这一问题我们通过发布确认机制进行尽可能的避免发布失败造成不一致的问题和减少消息发布失败。为什么不选择事务呢?对于事务更好的确保消息不丢失,但是对于该场景我们是先需要判断是否存在容量和时间冲突才能进行放入到消息队列。当放入消息队列失败时进行回滚,但redis的执行是原子的,不支持回滚操作。只能通过rabbitmq的回滚将消息重新放入队列或者放入死信队列。到这里就类似于发布确认的机制了,比起使用事务发布确认的机制更优于事务。

本期代码请见:github.com/bbz1024/sel…

若阁下感兴趣可以clone下来玩一玩。

bash
复制代码
git clone --branch demo4 https://github.com/bbz1024/select-course.git
转载自:https://juejin.cn/post/7379522106725384202
评论
请登录