2. 选课系统优化:Redis缓存带来的性能突破
回顾上文
在上文中我们完成一个简单的选课系统,但是单纯的使用Gin+Mysql是完全不够用的。
现存的问题
- 在这高并发的系统下,MySQL 的性能瓶颈是受限制的,涉及到事务的时候还可能会存在事务超时。
本文解决
在本文将引入Redis缓存来进行减轻Mysql压力,尽可能查询或修改时在Redis进行操作,最终在进行同步到Mysql。
能学到什么
- go操作redis
- 如何进行缓存预热
- 布隆过滤器 go-bloom
涉及技术
- Gin
- Mysql
- Redis
架构设计
暂时无法在飞书文档外展示此内容
需改进地方
从哪着手呢?
这里从选课逻辑进行着手。
- 课程和用户的ID是可以来说固定的,那么可以在项目启动时,加载课程ID到布隆过滤器里面。
- 在判断用户是否选择了该课程时,可以使用redis缓存的set数据结构来进行记录用户选的课程集合。
- 对于一个选课系统来说我们可以制定一个选课流程:凡事扣减库存操作在redis内存数据库操作(预扣减库存)
暂时无法在飞书文档外展示此内容
上布隆
在这里可以将user和course的主键加入到布隆过滤器里面,针对布隆的特性,存在是可能存在,但是不存在就一定不存在,且对可能存在的误判极其的低,那么我们只进行判断不存在就行了,对于不确定性还是需要加锁。但是这里也会有显著的性能提升。
var UserBloom *bloom.BloomFilter
var CourseBloom *bloom.BloomFilter
func InitializeBloom() error {
// 初始化操作
CourseBloom = bloom.NewWithEstimates(100000, 0.01)
UserBloom = bloom.NewWithEstimates(100000, 0.01)
var wg sync.WaitGroup
var err error
wg.Add(2)
go func() {
var userList []*models.User
if err = database.Client.Find(&userList).Error; err != nil {
return
}
for _, user := range userList {
// 加载id进入布隆过滤器
UserBloom.AddString(fmt.Sprintf("%d", user.ID))
}
}()
go func() {
var courseList []*models.Course
if err = database.Client.Find(&courseList).Error; err != nil {
return
}
for _, course := range courseList {
// 加载id进入布隆过滤器
CourseBloom.AddString(fmt.Sprintf("%d", course.ID))
}
}()
wg.Wait()
return err
}
预热到redis
这里模拟老师上传课程和设置容量
// 预热到redis
func TestPreheatMysql2Redis(t *testing.T) {
var course []models.Course
if err := database.Client.
Preload("Schedule").
Preload("Category").
Find(&course).Error; err != nil {
t.Error(err)
}
for _, v := range course {
if err := cache.RDB.HSet(
context.Background(),
fmt.Sprintf(keys.CourseHsetKey, v.ID),
keys.CourseCategoryIDKey, v.CategoryID,
keys.CourseScheduleDurationKey, uint(v.Schedule.Duration),
keys.CourseScheduleWeekKey, uint(v.Schedule.Week),
keys.CourseCapacityKey, v.Capacity,
).Err(); err != nil {
t.Error(err)
}
}
}
如何进行扣减库存呢?
由于在redis扣减课程,使用了redis的hset数据结构,可以通过increBy命令进行针对某个key的增减。
- 校验课程是否还存在容量
- 进行扣减
这里先预警一下,这里存在问题。具体问题见下
// 2.6. 扣减课程容量与创建课程操作
var err error
var capacityStr string
if capacityStr, err = cache.RDB.HGet(ctx, key, keys.CourseCapacityKey).Result(); err != nil {
logger.Logger.WithContext(ctx).Info("获取课程容量失败", err)
}
if capacity, _ := strconv.Atoi(capacityStr); capacity <= 0 {
logger.Logger.WithContext(ctx).Info("课程容量不足", err)
resp.Fail(ctx, code.Fail, code.CourseFull, code.CourseFullMsg)
return
}
// 3. 创建操作
// 3.1 扣减课程容量
if err := cache.RDB.HIncrBy(ctx, key, keys.CourseCapacityKey, -1).Err(); err != nil {
logger.Logger.WithContext(ctx).Info("扣减课程容量失败", err)
resp.DBError(ctx)
}
进行加入到用户课程集合
// 3.2 加入到用户课程集合
if err := cache.RDB.SAdd(ctx, fmt.Sprintf(keys.UserCourseSetKey, req.UserID), req.CourseID).Err(); err != nil {
logger.Logger.WithContext(ctx).Info("加入到用户课程集合失败", err)
resp.DBError(ctx)
}
然后我们可以进行异步写入到mysql里
// 异步写入到数据库
go Async2Mysql(ctx, &user, req.CourseID, offset)
以上操作存在很严重的问题。---- 临界问题
因为以上的对redis的操作并不是原子性的。
这里只针对原子错误,由于redis对于写操作都是单线程的,那么就避免了多线程环境下造成的临界问题。
其实redis也提供了事务机制,但是存在一些缺陷:非隔离性、操作失败无回滚、二阶提交等。所有这里就不使用redis的事务,那么怎么解决呢?
其实Redis还提供了原子操作,可以编写Lua脚本,且由于单线程执行(指写)可以像事务一样,且避免了并发问题,确保执行顺序和一致性等。
提示:其实上面的一系列操作也可以使用管道。如果对redis多次访问时,可以使用管道机制将多条命令一次性打包发送到redis中。但是管道并不是原子性的。
原子操作
lua脚本
CourseSelectLuaScript = `
-- 1. 用户是否已经选择了
if redis.call("sismember", KEYS[1], KEYS[2]) == 1 then
return 1
end
-- 2. 选课操作
local count =tonumber(redis.call("hget", KEYS[3], KEYS[4]))
if count and count > 0 then
-- 课程人数减 一
redis.call("hincrby", KEYS[3], KEYS[4], -1)
-- 选课,添加到用户集合
redis.call("sadd", KEYS[1], KEYS[2])
return 0
else
-- 容量满了
return 2
end
`
执行脚本
if err := database.Client.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 1. 获取课程的week和duration
var course models.Course
if err := tx.Model(&models.Course{}).Preload("Schedule").First(&course, req.CourseID).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
return err
}
// 2. 获取用户的flag字段
var user models.User
//
if err := tx.Clauses(clause.Locking{Strength: "SHARE"}).Where("id = ?", req.UserID).First(&user).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
return err
}
/*
args:
key1 = 用户key
key2 = 课程id
key3 = 课程key
key4 = capacity key
return:
0 : 执行正常
1 : 用户是否已经选择了
2 : 课程满了
*/
// 3. 判断是否选存在时间冲突
offset := int(course.Schedule.Week*3) + int(course.Schedule.Duration)
if user.Flag.TestBit(offset) {
resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
return errors.New("用户已经选过该课程")
}
// 4. 执行脚本进行扣减课程和创建
script := redis.NewScript(lua.CourseSelectLuaScript)
var val interface{}
var err2 error
if val, err2 = script.Run(ctx, cache.RDB, []string{
fmt.Sprintf(keys.UserCourseSetKey, req.UserID),
strconv.Itoa(int(req.CourseID)),
fmt.Sprintf(keys.CourseHsetKey, req.CourseID),
keys.CourseCapacityKey,
}, req.UserID, req.CourseID).Result(); err2 != nil {
logger.Logger.WithContext(ctx).Info("执行lua脚本失败", err2)
resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
return err2
}
switch val.(int64) {
case lua.CourseSelectOK:
logger.Logger.WithContext(ctx).Info("选课成功")
go AsyncSelect2Mysql(ctx, req.UserID, req.CourseID)
case lua.CourseSelected:
logger.Logger.WithContext(ctx).Info("用户已经选择该门课程")
resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
return errors.New("用户已经选择该门课程")
case lua.CourseFull:
logger.Logger.WithContext(ctx).Info("课程已满")
resp.Fail(ctx, code.Fail, code.CourseFull, code.CourseFullMsg)
return errors.New("课程已满")
default:
logger.Logger.WithContext(ctx).Info("未知错误")
return errors.New("未知错误")
}
// 5. 修改用户flag
user.Flag.SetBit(offset)
if err := tx.Save(&user).Error; err != nil {
return err
}
}); err != nil {
logger.Logger.WithContext(ctx).Info("事务失败", err)
resp.Fail(ctx, code.Fail, code.DBError, code.DBErrorMsg)
return
}
进行并发测试
测压
- mysql
- redis
结果呢?居然出现了超卖,出现了负库存,这必然是一个非常严重的bug。我不是使用了事务吗?为什么还是出现了错误呢?
代码细读一下
事务开始
- 获取课程的week和duration
这里没问题只是获取课程Schedule的时间段,是并发安全的可以确保一定不会修改 (安全)
// 1. 获取课程的week和duration
var course models.Course
if err := tx.Model(&models.Course{}).Preload("Schedule").First(&course, req.CourseID).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
return err
}
- 获取用户的flag字段
由于不能确保user的flag字段的并发安全,加了一把共享锁,读读共享,读写加锁。 (安全)
// 2. 获取用户的flag字段
var user models.User
if err := tx.Clauses(clause.Locking{Strength: "SHARE"}).Where("id = ?", req.UserID).First(&user).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
return err
}
- 判断是否选存在时间冲突。这里可以确保user.Flag字段的安全,由于在上一步加了共享锁,如果存在修改的话会阻塞到上一步。 (安全)
// 3. 判断是否选存在时间冲突
offset := int(course.Schedule.Week*3) + int(course.Schedule.Duration)
if user.Flag.TestBit(offset) {
resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
return errors.New("用户已经选过该课程")
}
- 执行脚本进行扣减课程和创建,这里的话也可以确保安全,redis(原子+单线程写)。 (安全)
// 4. 执行脚本进行扣减课程和创建
script := redis.NewScript(lua.CourseSelectLuaScript)
var val interface{}
var err2 error
if val, err2 = script.Run(ctx, cache.RDB, []string{
fmt.Sprintf(keys.UserCourseSetKey, req.UserID),
strconv.Itoa(int(req.CourseID)),
fmt.Sprintf(keys.CourseHsetKey, req.CourseID),
keys.CourseCapacityKey,
}, req.UserID, req.CourseID).Result(); err2 != nil {
logger.Logger.WithContext(ctx).Info("执行lua脚本失败", err2)
resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
return err2
}
switch val.(int64) {
case lua.CourseSelectOK:
logger.Logger.WithContext(ctx).Info("选课成功")
go AsyncSelect2Mysql(ctx, req.UserID, req.CourseID)
case lua.CourseSelected:
logger.Logger.WithContext(ctx).Info("用户已经选择该门课程")
resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
return errors.New("用户已经选择该门课程")
case lua.CourseFull:
logger.Logger.WithContext(ctx).Info("课程已满")
resp.Fail(ctx, code.Fail, code.CourseFull, code.CourseFullMsg)
return errors.New("课程已满")
default:
logger.Logger.WithContext(ctx).Info("未知错误")
return errors.New("未知错误")
}
- 修改用户flag字段。这里和步揍3一样 (安全)
user.Flag.SetBit(offset)
if err := tx.Save(&user).Error; err != nil {
return err
}
return nil
事务结束
问题来了?
明明步揍都是安全的,只要存在error就会进行回滚操作,为什么会扣减库存为负数呢?
再来细看
怎么进行解决呢?
其实呢很简单,调换 步骤5 和 步骤4 的顺序。
因为对 步骤5 的不确定性,如果 步骤5 出现错误那么对于 步骤4 是不可逆的,就是这里存在了间隙。对于 步骤5 造成的对Mysql事务的回滚,但是不会对 步骤4 的任何影响。但是如果 步骤5 在 步骤4 先执行,那么即使 步骤5 的操作的不确定性,就不会对 步骤4 造成影响,步骤5 执行错误了回滚,步骤5 执行成功,即使 步骤4 执行错误也会进行撤回redis和回滚mysql。
if err := database.Client.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 1. 获取课程的week和duration
var course models.Course
if err := tx.Model(&models.Course{}).Preload("Schedule").First(&course, req.CourseID).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
return err
}
// 2. 获取用户的flag字段
var user models.User
//
if err := tx.Clauses(clause.Locking{Strength: "SHARE"}).Where("id = ?", req.UserID).First(&user).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
return err
}
// 3. 判断是否选存在时间冲突
offset := int(course.Schedule.Week*3) + int(course.Schedule.Duration)
if user.Flag.TestBit(offset) {
resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
return errors.New("用户已经选过该课程")
}
// 4. 修改用户flag
user.Flag.SetBit(offset)
if err := tx.Save(&user).Error; err != nil {
return err
}
// 5. 执行脚本进行扣减课程和创建
script := redis.NewScript(lua.CourseSelectLuaScript)
var val interface{}
var err2 error
if val, err2 = script.Run(ctx, cache.RDB, []string{
fmt.Sprintf(keys.UserCourseSetKey, req.UserID),
strconv.Itoa(int(req.CourseID)),
fmt.Sprintf(keys.CourseHsetKey, req.CourseID),
keys.CourseCapacityKey,
}, req.UserID, req.CourseID).Result(); err2 != nil {
logger.Logger.WithContext(ctx).Info("执行lua脚本失败", err2)
resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
return err2
}
switch val.(int64) {
case lua.CourseSelectOK:
logger.Logger.WithContext(ctx).Info("选课成功")
go AsyncSelect2Mysql(ctx, req.UserID, req.CourseID)
case lua.CourseSelected:
logger.Logger.WithContext(ctx).Info("用户已经选择该门课程")
resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
return errors.New("用户已经选择该门课程")
case lua.CourseFull:
logger.Logger.WithContext(ctx).Info("课程已满")
resp.Fail(ctx, code.Fail, code.CourseFull, code.CourseFullMsg)
return errors.New("课程已满")
default:
logger.Logger.WithContext(ctx).Info("未知错误")
return errors.New("未知错误")
}
return nil
}); err != nil {
logger.Logger.WithContext(ctx).Info("事务失败", err)
resp.Fail(ctx, code.Fail, code.DBError, code.DBErrorMsg)
return
}
再次进行测试
这里作者进行了多次测试,目前的话对于事务操作是没有出现什么错误的。但是呢?这里还是存在一个隐藏的Bug的。发现的友友可以在评论区留言。有关数据一致相关的。
进一步优化
多级缓存存储架构
暂时无法在飞书文档外展示此内容
业务分析
针对本项目其实很简单,只涉及到了两张表user表和course表,其中user表的flag字段记录着用户时刻表(也就是用户在星期几的那个时间段有课的记录),course表主要是牵扯到课程容量这个字段。
在本业务中user表和course表都是涉及到读多写多的场景,例如:用户选课都会涉及到flag字段和course字段的修改,针对course字段直接在Redis中完成自增或自减的操作,并在某一时刻同步到数据库。
为什么不使用二级缓存呢?
在缓存架构中二级缓存常用于读多写少的场景下,如果读写频率差不多少时不推荐使用二级缓存,因为涉及到二级缓存和数据库同步的问题,对一个热写的数据时不时都需要进行同步,且同步只能是最终的一致性,且涉及到反复的对数据库进行读写和回写,甚至还会造成缓存击穿和雪崩问题。总之弊远大于利。
总之,选择是否使用二级缓存,以及如何设计缓存策略,应基于具体的应用场景和性能需求综合考量。
所以呢?本文并没有什么改进了,下期见。
补充
针对于刚刚提到了二级缓存,其实对于步骤一,由于读取课程的week和duration的值是不会改变的,那么就可以在项目启动时可以把对应课程id的duration和week字段缓存到本地。在明白点来说也不必要缓存到二级缓存中,缓存到单独的一个map就行。至于map嘛,普通的map就行了,只要不涉及到并发读写就行,单纯的并发读是不会panic的。
总结
本期通过使用缓存对上期单独使用mysql遇到的性能瓶颈,做出了一定的优化,我们使用redis进行一个预扣减库存的操作,且在写单线程和原子的一系列操作下确保不会出现“超卖问题”,从而进行预扣课程容量的操作。本文还引入了布隆过滤器,针对于一些恶意的用户,构造非法参数。可以避免大量无效请求对redis造成雪崩的问题。至于缓存穿透是不会了的,因为这里只涉及到了一层,当redis访问不到时是直接返回响应的,并不会向常规的再去访问下一层DB。到了末尾我们通过代码层面上解决mysql事务里也能有效的撤回redis操作。从而避免了并发临界问题。
本期代码请见:github.com/bbz1024/sel…
若阁下感兴趣可以clone下来玩一玩。
bash
复制代码
git clone --branch demo2 https://github.com/bbz1024/select-course.git
转载自:https://juejin.cn/post/7378720599106977802