5:凉凉,怎么又是一致性问题!
回顾上文
在上文中我们通过一系列的手动进行优化,主要针对减少对数据库的操作优化,尽可能的往缓存走,从而进行优化响应速度,本来上期是最后一期的,但是在后期进行测试时又出现了一致性问题,接下来我们进行解决一致性问题。
能学到什么
-
如何进行排除问题
-
条件编译消除
问题重现
在上文末尾本人进行了一次测压总共进行了5次测压,前两次看着还是没问题的,可能是并发量不是很大,但是到了背后几次就出现了问题了。
这里redis的容量和mysql的容量是一致的,但是用户和课程的记录表(user_course)存在114条数据,这里显然是出现问题了,本应该是刚刚好100条的。那是不是消息队列没消费完成呢?还是消息执行失败丢入死信队列补偿去了。
可以看到队列里根本没有数据,也就是说消息是按要求执行完成了的。这里我心想是不是消费的代码逻辑存在问题呢?
定位可疑代码
当时在这里我可以确保在redis创建阶段是不存在问题的,因为只是进行了一个原子操作,即使发生了错误也不会对redis和数据库造成影响,那么有问题的代码就是处理消费的逻辑和发布消息造成丢失了。
处理消费
看到这里进行开启了一段mysql的事务,由于事务的特性,如果存在事务不成功,那么消息就会进行回滚和丢入到了死信队列。刚才我们也看了死信队列是不存在消息的,这里可以排除掉事务有过回滚的操作。我们出现的问题是课程和用户的消息表存在多条数据与redis的用户选择的课程不一致,那么我们看看创建课程的主要逻辑updateUserCourseState
方法
func (s *Select) Consumer() error {
results, err := SelectConsumer.channel.Consume(
variable.SelectQueue,
variable.SelectRoutingKey,
false, // 关闭自动应答
false,
false,
false,
nil,
)
if err != nil {
logger.Logger.Error("消息接收失败", err)
return err
}
for res := range results {
var msg *mqm.CourseReq
if err := json.Unmarshal(res.Body, &msg); err != nil {
logger.Logger.Error("消息反序列化失败", err)
res.Reject(false)
continue
}
if err = database.Client.Transaction(func(tx *gorm.DB) error {
if err := updateCourseCapacity(tx, msg, msg.Type == mqm.SelectType); err != nil {
return err
}
if err := updateUserFlag(tx, msg, msg.Type == mqm.SelectType); err != nil {
return err
}
if err := updateUserCourseState(tx, msg, msg.Type == mqm.SelectType); err != nil {
return err
}
return nil
}); err != nil {
logger.Logger.Error("事务失败", err)
res.Reject(false)
continue
}
// 消息确认
if err := res.Ack(false); err != nil {
logger.Logger.Error("消息确认失败", err)
}
}
return nil
}
创建记录操作
在updateUserCourseState方法里,我们进行对用户创建一条选课记录,如果不存在就进行创建,存在就更新。针对于选课操作我们只是简单的进行软删除isDeleted=true
,并且每次操作都进行记录操作的时间,以便于消息重试造成的顺序错乱和确保执行的操作是用户最后一次的操作。起初我还以为是未进行并发的控制造成的问题,但显然不是,我们这里确保了是一个消费者进行消费,可以说是串行化的。即使不进行加锁也能确保不会出现并发问题。那是不是sql逻辑错误了呢?
// 创建用户选课记录
func updateUserCourseState(tx *gorm.DB, msg *mqm.CourseReq, selectAction bool) error {
var userCourse models.UserCourse
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("user_id=? and course_id=?", msg.UserID, msg.CourseID).
First(&userCourse).Error; err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
logger.Logger.Info("获取用户选课记录失败", err)
return err
}
// 不存在,则创建
if err := tx.Create(&models.UserCourse{
UserID: msg.UserID,
CourseID: msg.CourseID,
CreatedAt: msg.CreatedAt,
IsDeleted: !selectAction,
}).Error; err != nil {
logger.Logger.Info("创建选课记录失败", err)
return err
}
return nil
}
// 记录已存在,检查并可能更新
updateData := map[string]interface{}{"is_deleted": !selectAction}
if err := tx.Model(&userCourse).
Where("user_id=? and course_id=? and created_at < ?", msg.UserID, msg.CourseID, msg.CreatedAt).
Updates(updateData).Error; err != nil {
logger.Logger.Info("更新选课记录失败", err)
return err
}
return nil
}
SQL逻辑
于是我简单进行提取了选课和退课执行的SQL语句,在这里可以看出我们为了确保消息失败重新返回队列或补充造成的顺序问题。如果选课操作在我们进行记录每一次的操作时间,针对于退课操作我们进行通过消息时间和记录操作时间进行过滤msg.Create_At < record.Created_At
,如果消息时间小于数据库消息的时间的话那么该条消息我们就可以判定为失效的消息(过期消息) ,否则的话就进行更新数据库消息的操作时间。这里也不存在什么SQL逻辑错误问题。那是不是消息丢失了呢?
选课
退课
消息丢失
这里只剩下了消息丢失可疑,这里可以直接定位到消息生产阶段,于是针对于上文的发布进行了修改,这里使用了事务进行代替,为什么不是有发布确认机制呢?因为此时对rabbitmq的发布机制使用还未成熟(现学现卖)。
事务确保发布可靠
在这里进行使用rabbitmq的事务机制,如果事务执行失败进行进行重试策略,重试失败就放入死信队列。在处理也出现了问题。
func (s *Select) Product(msg *mqm.CourseReq) {
// 微妙 记录每条消息的时间,确保加入到死信队列后期执行消费的顺序
msg.CreatedAt = time.Now().UnixMicro()
bytes, err := json.Marshal(msg)
if err != nil {
logger.Logger.Error("消息序列化失败", err)
return
}
ch, err := mq.Client.Channel()
if err != nil {
logger.Logger.Error("获取channel失败", err)
return
}
if err := ch.Tx(); err != nil {
logger.Logger.Error("开启事务失败", err)
}
var cnt uint8
if err := retry.Do(func() error {
if err = s.channel.Publish(
variable.SelectExchange,
variable.SelectRoutingKey,
true,
false, amqp.Publishing{
ContentType: "text/plain",
Body: bytes,
}); err != nil {
cnt++
logger.Logger.Errorf("消息发送失败,尝试次数: %d", cnt)
return err
}
return nil
}, retry.Attempts(3), retry.Delay(time.Millisecond*100)); err != nil {
logger.Logger.Error("尝试消息发送失败", err)
if err := ch.TxRollback(); err != nil {
logger.Logger.Error("事务回滚失败", err)
}
return
}
if err := ch.TxCommit(); err != nil {
logger.Logger.Error("事务提交失败", err)
}
ch.Close()
}
事务的使用
在go操作rabbitmq事务时必须是一个信道(channel)对应一个事务,否则会抛异常,且每次事务完毕时需要关闭执行事务的信道(channel),如果不进行手动断开会造成连接过多的异常
// 获取一个信道channel
ch, err := mq.Client.Channel()
// 开启事务
if err := ch.Tx(); err != nil {
logger.Logger.Error("开启事务失败", err)
};
// 事务回滚
if err := ch.TxRollback(); err != nil {
logger.Logger.Error("事务回滚失败", err)
}
// 提交事务
if err := ch.TxCommit(); err != nil {
logger.Logger.Error("事务提交失败", err)
}
// 断开信道
ch.Close()
过多的创建事务造成事务空间id不足问题,
测试
由于进行修改了代码重新进行测试一次,但是在这里还是出现了问题,死信队列也不存在消息。这里显然也不是生产丢失造成的问题了。于是这里就重新进行改回了发布确认机制
发布确认机制
在上文中使用发布确认机制存在问题。
纠正一下:
amqp.Confirmation
:当无法到达交换机时会进行发送ack包进行确认接受。amqp.Return
:当无法路由到队列,但是消息到达交换机,如果设置了死信队列就没必要写了。且这里存在消息的重复发送。
这里显然前者是消息都没进行持久化是生产阶段异常,后者是消息已经持久化在交换机上了,是由于无法正确路由到对应队列造成的异常。所有发布确认只需要进行考虑
amqp.Confirmation
的处理逻辑即可。
结构体消息
这里可以看到Confirmation只包含了消息的tag和ack两个字段都没有消息体,那么我们该如何进行处理呢?在这里我们可以在发布阶段使用哈希表进行记录每个DeliveryTag
对应的消息体,这样就可以处理之后的逻辑了。
type Confirmation struct {
DeliveryTag uint64 Confirm mode
Ack bool
}
发布消息
在这里我们进行每发送一条消息进行将消息的Deliverytag
对应的消息体记录在哈希表里,当发生ack==false
时,我们就行重试操作。在这里可以看到为什么使用cnt
呢?不是Deliverytag
呢?由于在发布阶段是不存在Deliverytag
的。且Deliverytag字段是每次发送一条消息自增的,于是我们就使用了cnt
字段进行解决生产阶段不能设置Deliverytag
的问题,且cnt
是并发安全的
Product
func (s *Select) Product(msg *mqm.CourseReq) {
s.cnt.Add(1)
bytes, err := json.Marshal(msg)
if err != nil {
logger.Logger.Error("消息序列化失败", err)
return
}
if TestMsgLose {
cache.RDB.HIncrBy(context.Background(), "record", "produce-total", 1)
}
var cnt uint8
if err := retry.Do(func() error {
if err = s.channel.Publish(
variable.SelectExchange,
variable.SelectRoutingKey,
true,
false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: bytes,
}); err != nil {
cnt++
logger.Logger.Warningf("消息发送失败,尝试次数: %d", cnt)
return err
}
// 记录消息,以便于消息确认
s.unconfirmedMessages.Store(s.cnt.Load(), UnconfirmedMessage{
Body: bytes,
Exchange: variable.SelectExchange,
RoutingKey: variable.SelectRoutingKey,
})
return nil
}, retry.Attempts(3), retry.Delay(time.Millisecond*100)); err != nil {
logger.Logger.Error("尝试消息发送失败", err)
}
}
ListenConfirm
func (s *Select) ListenConfirm() {
for msg := range s.confirmMsg {
if !msg.Ack {
val, ok := s.unconfirmedMessages.Load(msg.DeliveryTag)
if !ok {
logger.Logger.Error("消息确认失败", msg)
continue
}
data := val.(UnconfirmedMessage)
// 重新发送,尝试3次
var cnt int
if err := retry.Do(func() error {
if err := s.channel.Publish(
variable.SelectExchange,
variable.SelectRoutingKey,
true,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: data.Body,
},
); err != nil {
cnt++
logger.Logger.Error("消息发送失败%d次", cnt)
return err
}
return nil
}, retry.Attempts(3), retry.Delay(time.Millisecond*100)); err != nil {
logger.Logger.Error("消息重新发送失败", err)
// 丢入到死信队列,进行补偿操作
if err := s.channel.Publish(
variable.DeadExchange,
variable.DeadRoutingKey,
true,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: data.Body,
},
); err != nil {
logger.Logger.Error("消息发送失败", err)
}
}
}
s.unconfirmedMessages.Delete(msg.DeliveryTag)
}
}
但是到这里我们还是没有进行解决问题,既不是处理消费逻辑问题,也不是消息丢失问题。那么是不是在消费阶段发送了错误造成丢失了呢?由于是进行并发测试,并不是单个测试,无法使用断点进行调试操作。于是为了证明是不是消费阶段发生的异常,这里使用redis作为计数器,每当进行处理一个消息进行计算与消息总生产量进行校验。
进一步深究
这里进行了设置了9个计数
查看计数器
这里可以看到生产消息总量produce-total
和事务执行成功transaction-success
,消费成功consumer-success
都是一致的,且也没有出现消费失败consumer-fail
和事务失败transaction-fail
的计数,那么可以确定消费阶段也未发生异常。在这里有读者可能发现设置这么多计数器要是放在线上会不会造成一些多余的浪费呢?这里可以说显然不会,因为TestMsgLose
是一个布尔类型常量,当进行预编译时会进行条件编译消除优化。
到这里也是临近崩溃状态了/(ㄒoㄒ)/~~,难道是消息创建时间相同了?
于是进行修改了顺序序列
顺序序列
在生产阶段这里也使用了自增id作为顺序序列,结果还是一样的问题。
s.cnt.Add(1)
msg.CreatedAt = s.cnt.Load()
消除问题
问题分析
躺在床上准备睡觉时,对整个执行流程梳理了一遍。于是就发现了可疑点,我急忙起来验证了我的猜想。接下来来看看我是如何发现这个问题的。
其实我们这个执行的操作很精简,就是进行redis预扣减mysql做最终存储。
但我们并没考虑到若是在redis扣减完成后发生阻塞的情况。
画图
正常情况
阻塞
那么如何在redis阶段生成create_at呢?
当我们执行完lua脚本时可以进行返回每个消息的序列,如果进行传递到消息队列里,通过redis进行生成,进行记录一个消息序列,每当执行选课或者退课时序列递增操作,若是操作成功就进行返回递增后的序列。主这样就能确保消息的执行和create_at的创建顺序一致了。
Lua脚本
创建成功后进行对course_sequence消息通过incr
进行自增操作,并且返回结果作为该消息的create_at。
CourseSelectLuaScript = `
-- 1. 用户是否已经选择了
if redis.call("sismember", KEYS[1], KEYS[2]) == 1 then
return -3
end
-- 2. 是否存在时间冲突 判断某个时间段是否为1
local bitmap = tonumber(redis.call("getbit", KEYS[5], KEYS[6]))
if bitmap and bitmap == 1 then
return -2
end
-- 3. 选课操作
local count =tonumber(redis.call("hget", KEYS[3], KEYS[4]))
if count and count > 0 then
-- 课程人数减 一
redis.call("hincrby", KEYS[3], KEYS[4], -1)
-- 选课,添加到用户集合
redis.call("sadd", KEYS[1], KEYS[2])
-- 课程时间段设置为1
redis.call("setbit", KEYS[5], KEYS[6], 1)
-- 课程序列号加一
local sequence=tonumber(redis.call("incr", KEYS[7]))
return sequence
else
-- 容量满了
return -4
end
`
传入序列值
res := val.(int64) // 序列值,如果为负的话就说明并非序列值,直接返回错误结果。
switch {
case res >= lua.CourseOptOK:
logger.Logger.WithContext(ctx).Info("选课成功")
consumer.SelectConsumer.Product(&mqm.CourseReq{
UserID: req.UserID, CourseID: req.CourseID, Type: mqm.SelectType,
CreatedAt: res,
})
resp.Success(ctx, nil)
case res == lua.CourseSelected:
logger.Logger.WithContext(ctx).Info("用户已经选择该门课程")
resp.Fail(ctx, code.Fail, code.CourseSelected, code.CourseSelectedMsg)
case res == lua.CourseFull:
logger.Logger.WithContext(ctx).Info("课程已满")
resp.Fail(ctx, code.Fail, code.CourseFull, code.CourseFullMsg)
case res == lua.CourseTimeConflict:
logger.Logger.WithContext(ctx).Info("课程时间冲突")
resp.Fail(ctx, code.Fail, code.CourseTimeConflict, code.CourseTimeConflictMsg)
default:
logger.Logger.WithContext(ctx).Warning("未知错误")
resp.Fail(ctx, code.Fail, code.Fail, code.FailMsg)
}
测试
模拟并发请求
一致性校验
func TestValidDataConsistency(t *testing.T) {
var userCourse []models.UserCourse
if err := database.Client.
Where("is_deleted=?", false).
Order("user_id").
Find(&userCourse).
Error; err != nil {
t.Error(err)
}
for i := 0; i < len(userCourse); i++ {
// 判断用户是否在set里
if !cache.RDB.SIsMember(
context.Background(),
fmt.Sprintf(keys.UserCourseSetKey, userCourse[i].UserID),
userCourse[i].CourseID).Val() {
fmt.Println(userCourse[i].UserID, userCourse[i].CourseID)
t.Error("数据不一致")
}
}
}
执行成功,自此一个选课的demo,经过多轮技术的选型与优化,从而实现并发量还行,容错率高的选课demo,这难道还不足以碾压刚刚上线“一坤”年的校园选课系统吗?不行还请见下期《从单体到微服务:选课系统的架构演进之路》
总结
在本文由于上期又发现了数据的不一致问题,经过层层排除最终找到了问题的根源,于是我们通过在redis阶段创建序列号,以确保每条消息的执行顺序时间,最终解决了这个问题。自此一个选课demo单体架构完成。接下来我们进行现学现卖的手法使用微服务架构代替,直到各路大神满意为止,足以捏杀校园loubi选课系统。请见下期《从单体到微服务:选课系统的架构演进之路》。
本期代码请见:github.com/bbz1024/sel…
若阁下感兴趣可以clone下来玩一玩。
git clone --branch demo4 https://github.com/bbz1024/select-course.git
转载自:https://juejin.cn/post/7379861461328232460