一次云迁移比对db数据的总结
背景
由于某种原因,要做云迁移,即需要将数据从一个云 A 迁移到云 B。目前方案是在不影响现有业务的前提下,用商家现有的工具对数据进行实时同步,待真正迁移前一天,发布公告,然后找一个晚上将原有云 A 的所有东西迁到云 B。
其中最重要的就是迁移前后数据的一致性,这个影响比较大。所以在使用商家工具同步数据后的情况下,自己仍需要写个工具来比对迁移前后数据的一致性。
今天这篇文章主要是给大家总结下,我在写这个工具(脚本)的过程中踩过的坑,以及有哪些地方值得注意和学习的。
写工具前的思考
工欲善其事,必先利其器。
我平时在做一些需求前,都会把不清楚或有疑惑的地方先列出来,然后在脑海里先简单想出一些可行的解决方案,最后带着这些问题和方案找 leader 沟通,再确定最终方案。
同样,在写该比对工具前,需要确认和考虑的问题,有以下这些:
- 比对的是什么数据?是所有还是只比对一些关键的数据即可?
- 数据量有多少?需求是要在多少分钟内比对完?
- 比对完,以什么方式输出不一致的数据?
- 比对脚本在什么时间节点执行?
- ......
一些说明
- 比对的是一些核心业务的数据,包括:订单、支付、账单等数据
- 百万级别的数据量,共涉及 3 个库,5 千张表,要求 15 分钟左右比对完
- 对于不一致的数据,先放到带 buffer 的 channel 中,最后以 excel 文件方式输出
- 比对脚本在迁移期间,db 数据同步完后再执行
考虑到对执行效率、性能以及并发度要求较高,因此选择用 Go 语言来开发此比对脚本。
踩过的坑
并发使用同一数据库局柄
并发使用同一数据库连接会产生争抢的问题,导致报错。
解决方案:
方法一:使用连接池(推荐)
可用封装好的,也可自己实现。用 channel 简单实现如下:
func init() {
for i := 0; i < DB_POOL_CH; i++ {
dbPoolMap := make(map[string]*mysqlclient.MysqlDBPool)
dbPool, err := mysqlclient.NewDBPool(g_Config.User, g_Config.Pwd, g_Config.DBHost, "md_deal", "utf8", 3306, 60)
if err != nil {
md_log.Errorf(-1, nil, "mysqlclient.InitDBPoolBySet failed. err:%v", err)
return err
}
txDBPool, err := mysqlclient.NewDBPool(g_Config.User, g_Config.Pwd, g_Config.TXDBHost, "md_deal", "utf8", 3306, 60)
if err != nil {
md_log.Errorf(-1, nil, "mysqlclient.InitDBPoolBySet failed. err:%v", err)
return err
}
dbPoolMap["dbPool"] = dbPool
dbPoolMap["txDBPool"] = txDBPool
dbPoolCh <- dbPoolMap
}
md_log.Debugf("init db conn poll success")
fmt.Println("init db conn poll success")
}
方法二:使用同一个数据库连接(不推荐)
该方法并发调用会有问题,示例代码如下:
func init() {
// 初始化 txDB 连接
g_TXDBPool, err = mysqlclient.NewDBPool(g_Config.User, g_Config.Pwd, g_Config.TXDBHost, "md_deal", "utf8", 3306, 10)
if err != nil {
md_log.Errorf(-1, nil, "mysqlclient.NewDBPool failed. err:%v", err)
return err
}
}
func doQuery() {
dbPool := g_TXDBPool.GetOpertBySet(DealSet, 0)
oRs, err := dbPool.Query(dealSql)
if err != nil {
md_log.Errorf(-1, nil, "查询deal数据失败, err:%v, tableName:%v, dealSql:%v", err, tableName, dealSql)
return
}
defer oRs.Close()
}
过多的开启 Goroutine
开始的思路是:一张表开一个协程,5 千张表就有 5 千个协程序,然后一张表的数据又按时间一个月开一个协程,表中的数据算 07 年开始,即一共开了大概(2022-2007)125000=九十万的协程。
如果对于单独部署的机器来说,这不算很多,但是部署执行该脚本的机器上也有在跑其他业务的程序,导致 cpu 占用飙升,以及短时间内有大量处于 time_wait
状态的连接,另外也偶尔会报 buffer busy
的情况。
得出的结论是:如果我们迅速的开启 goroutine (不控制并发的 goroutine 数量 )的话,会在短时间内占据操作系统的资源(CPU、内存、文件描述符等)。
- CPU 使用率浮动上涨
- Memory 占用不断上涨
- 主进程崩溃(被杀掉了)
这些资源实际上是所有用户态程序共享的资源,所以大批的 goroutine 开启最终引发的问题不仅仅是自身,还会影响到其他运行的程序。
所以我们在平时开发中编写代码的时候,需要注意代码中开启的 goroutine 数量,以及开启的 goroutine 是否可以控制,必须要重视的问题。
解决方案:
用带缓冲的 channel 做+sync 来限制 Goroutine 开启的数量
参考我之前写过的一篇文章:juejin.cn/post/701728…
多层循环嵌套查数据
所谓比对数据就是比对两个 db 的数据,即先从一个 db 中查出数据,然后根据主键再去查另一个 db 的数据,最后比对每一个字段数据是否一致。
这其中涉及循环嵌套查数据库,发现这样做效率比较低。
解决方案:
改为批量查询,然后并发比对数据。参考代码如下:
func DoCompareLogic(start, end time.Time, tableName string) {
startTime, endTime := start.Format(Layout), end.Format(Layout)
md_log.Keyf("[Starting compare logic] - tableName:%v, startTime:%s, endTime:%s \n", tableName, startTime, endTime)
dbPoolMap := <-dbPoolCh
defer func() {
dbPoolCh <- dbPoolMap
}()
dbPool := dbPoolMap["dbPool"]
txDBPool := dbPoolMap["txDBPool"]
dealSql := fmt.Sprintf("select * from %s where Flast_update_time>='%v' and Flast_update_time<'%v'", tableName, startTime, endTime)
oRs, err := dbPool.Query(dealSql)
if err != nil {
md_log.Errorf(-1, nil, "查询deal数据失败, err:%v, tableName:%v, dealSql:%v", err, tableName, dealSql)
return
}
defer oRs.Close()
var txDealSql string
var strDealId, pkName string
primaryIds := make([]int64, 0)
dealDataMap := make(map[int64]map[string]interface{})
//dealDataSlice := make([]map[string]interface{}, 0)
for oRs.Next() {
dealMap := GetDealDataMap(oRs, tableInfoMap)
dealId := oRs.GetInt64("Fdeal_id")
buyerId := oRs.GetInt64("Fbuyer_id")
strDealId = fmt.Sprintf("%08d%04d%04d", dealId, buyerId%1000, oRs.GetInt64("Fseller_id")%1000)
var primaryKey int64
if strings.HasPrefix(tableName, "t_deal") {
primaryKey = dealId
primaryIds = append(primaryIds, dealId)
pkName = "Fdeal_id"
} else if strings.HasPrefix(tableName, "t_recv") {
recvId := oRs.GetInt64("Frecv_fee_id")
primaryKey = recvId
primaryIds = append(primaryIds, recvId)
pkName = "Frecv_fee_id"
} else if strings.HasPrefix(tableName, "t_aftersale") {
afterSaleId := oRs.GetInt64("Faftersale_id")
primaryKey = afterSaleId
primaryIds = append(primaryIds, afterSaleId)
pkName = "Faftersale_id"
} else {
continue
}
dealMap["primaryKey"] = primaryKey
dealMap["strDealId"] = strDealId
dealDataMap[primaryKey] = dealMap
//dealDataSlice = append(dealDataSlice, dealDataMap)
}
if len(primaryIds) == 0 {
return
}
a, _ := json.Marshal(primaryIds)
b := strings.ReplaceAll(string(a), "[", "(")
pIds := strings.ReplaceAll(b, "]", ")")
txDealSql = fmt.Sprintf("SELECT * FROM %v WHERE %v IN %v ", tableName, pkName, pIds)
txRet, err := txDBPool.Query(txDealSql)
if err != nil {
md_log.Errorf(-1, nil, "查询 txDeal 数据失败, tableName:%v, txDealSql:%v, err:%v", tableName, txDealSql, err)
return
}
txHasRecord := false
for txRet.Next() {
CompareField(tableInfoMap, dealDataMap, txRet, pkName, tableName)
txHasRecord= true
}
if !txHasRecord {
difMap := make(map[string]interface{})
md_log.Debugf("查询 txDeal 数据 empty, tableName:%v, strDealId:%v", tableName, strDealId)
difMap["primaryKey"] = string(a)
difMap["dealId"] = strDealId
difMap["tableName"] = tableName
difMap["desc"] = fmt.Sprintf("txCloud no record")
difCh <- difMap
}
txRet.Close()
md_log.Keyf("[End compare logic] - tableName:%v, startTime:%s, endTime:%s \n", tableName, startTime, endTime)
return
}
总结
- 不能并发操作同一个数据库连接
- 不能无限制的开启 Goroutine
- 嵌套循环时,看下时间复杂度,适当做下优化
- ...
比对工具代码路径:
转载自:https://juejin.cn/post/7149921591458512933