likes
comments
collection
share

一次云迁移比对db数据的总结

作者站长头像
站长
· 阅读数 19

背景

由于某种原因,要做云迁移,即需要将数据从一个云 A 迁移到云 B。目前方案是在不影响现有业务的前提下,用商家现有的工具对数据进行实时同步,待真正迁移前一天,发布公告,然后找一个晚上将原有云 A 的所有东西迁到云 B。

其中最重要的就是迁移前后数据的一致性,这个影响比较大。所以在使用商家工具同步数据后的情况下,自己仍需要写个工具来比对迁移前后数据的一致性。

今天这篇文章主要是给大家总结下,我在写这个工具(脚本)的过程中踩过的坑,以及有哪些地方值得注意和学习的。

写工具前的思考

工欲善其事,必先利其器。

我平时在做一些需求前,都会把不清楚或有疑惑的地方先列出来,然后在脑海里先简单想出一些可行的解决方案,最后带着这些问题和方案找 leader 沟通,再确定最终方案。

同样,在写该比对工具前,需要确认和考虑的问题,有以下这些:

  • 比对的是什么数据?是所有还是只比对一些关键的数据即可?
  • 数据量有多少?需求是要在多少分钟内比对完?
  • 比对完,以什么方式输出不一致的数据?
  • 比对脚本在什么时间节点执行?
  • ......

一些说明

  • 比对的是一些核心业务的数据,包括:订单、支付、账单等数据
  • 百万级别的数据量,共涉及 3 个库,5 千张表,要求 15 分钟左右比对完
  • 对于不一致的数据,先放到带 buffer 的 channel 中,最后以 excel 文件方式输出
  • 比对脚本在迁移期间,db 数据同步完后再执行

考虑到对执行效率、性能以及并发度要求较高,因此选择用 Go 语言来开发此比对脚本。

踩过的坑

并发使用同一数据库局柄

并发使用同一数据库连接会产生争抢的问题,导致报错。

解决方案:

方法一:使用连接池(推荐)

可用封装好的,也可自己实现。用 channel 简单实现如下:

func init() {
  for i := 0; i < DB_POOL_CH; i++ {
  dbPoolMap := make(map[string]*mysqlclient.MysqlDBPool)
  dbPool, err := mysqlclient.NewDBPool(g_Config.User, g_Config.Pwd, g_Config.DBHost, "md_deal""utf8"330660)
  if err != nil {
   md_log.Errorf(-1nil"mysqlclient.InitDBPoolBySet failed. err:%v", err)
   return err
  }
  txDBPool, err := mysqlclient.NewDBPool(g_Config.User, g_Config.Pwd, g_Config.TXDBHost, "md_deal""utf8"330660)
  if err != nil {
   md_log.Errorf(-1nil"mysqlclient.InitDBPoolBySet failed. err:%v", err)
   return err
  }
  dbPoolMap["dbPool"] = dbPool
  dbPoolMap["txDBPool"] = txDBPool
  dbPoolCh <- dbPoolMap
 }

 md_log.Debugf("init db conn poll success")
 fmt.Println("init db conn poll success")
}

方法二:使用同一个数据库连接(不推荐)

该方法并发调用会有问题,示例代码如下:

func init() {
   // 初始化 txDB 连接
 g_TXDBPool, err = mysqlclient.NewDBPool(g_Config.User, g_Config.Pwd, g_Config.TXDBHost, "md_deal""utf8"330610)
 if err != nil {
  md_log.Errorf(-1nil"mysqlclient.NewDBPool failed. err:%v", err)
  return err
 }
}

func doQuery() {
  dbPool := g_TXDBPool.GetOpertBySet(DealSet, 0)
  oRs, err := dbPool.Query(dealSql)
 if err != nil {
  md_log.Errorf(-1nil"查询deal数据失败, err:%v, tableName:%v, dealSql:%v", err, tableName, dealSql)
  return
 }
 defer oRs.Close()
}

过多的开启 Goroutine

开始的思路是:一张表开一个协程,5 千张表就有 5 千个协程序,然后一张表的数据又按时间一个月开一个协程,表中的数据算 07 年开始,即一共开了大概(2022-2007)125000=九十万的协程。

如果对于单独部署的机器来说,这不算很多,但是部署执行该脚本的机器上也有在跑其他业务的程序,导致 cpu 占用飙升,以及短时间内有大量处于 time_wait 状态的连接,另外也偶尔会报 buffer busy 的情况。

得出的结论是:如果我们迅速的开启 goroutine (不控制并发的 goroutine 数量  )的话,会在短时间内占据操作系统的资源(CPU、内存、文件描述符等)。

  • CPU 使用率浮动上涨
  • Memory 占用不断上涨
  • 主进程崩溃(被杀掉了)

这些资源实际上是所有用户态程序共享的资源,所以大批的 goroutine 开启最终引发的问题不仅仅是自身,还会影响到其他运行的程序。

所以我们在平时开发中编写代码的时候,需要注意代码中开启的 goroutine 数量,以及开启的 goroutine 是否可以控制,必须要重视的问题。

解决方案:

用带缓冲的 channel 做+sync 来限制 Goroutine 开启的数量

参考我之前写过的一篇文章:juejin.cn/post/701728…

多层循环嵌套查数据

所谓比对数据就是比对两个 db 的数据,即先从一个 db 中查出数据,然后根据主键再去查另一个 db 的数据,最后比对每一个字段数据是否一致。

这其中涉及循环嵌套查数据库,发现这样做效率比较低。

解决方案:

改为批量查询,然后并发比对数据。参考代码如下:

func DoCompareLogic(start, end time.Time, tableName string) {
 startTime, endTime := start.Format(Layout), end.Format(Layout)
 md_log.Keyf("[Starting compare logic] - tableName:%v, startTime:%s, endTime:%s \n", tableName, startTime, endTime)

 dbPoolMap := <-dbPoolCh
 defer func() {
  dbPoolCh <- dbPoolMap
 }()

 dbPool := dbPoolMap["dbPool"]
 txDBPool := dbPoolMap["txDBPool"]

 dealSql := fmt.Sprintf("select * from %s where Flast_update_time>='%v' and Flast_update_time<'%v'", tableName, startTime, endTime)
 oRs, err := dbPool.Query(dealSql)
 if err != nil {
  md_log.Errorf(-1nil"查询deal数据失败, err:%v, tableName:%v, dealSql:%v", err, tableName, dealSql)
  return
 }
 defer oRs.Close()

 var txDealSql string
 var strDealId, pkName string
 primaryIds := make([]int640)

 dealDataMap := make(map[int64]map[string]interface{})
 //dealDataSlice := make([]map[string]interface{}, 0)
 for oRs.Next() {

  dealMap := GetDealDataMap(oRs, tableInfoMap)

  dealId := oRs.GetInt64("Fdeal_id")
  buyerId := oRs.GetInt64("Fbuyer_id")
  strDealId = fmt.Sprintf("%08d%04d%04d", dealId, buyerId%1000, oRs.GetInt64("Fseller_id")%1000)

  var primaryKey int64
  if strings.HasPrefix(tableName, "t_deal") {
   primaryKey = dealId
   primaryIds = append(primaryIds, dealId)
   pkName = "Fdeal_id"
  } else if strings.HasPrefix(tableName, "t_recv") {
   recvId := oRs.GetInt64("Frecv_fee_id")
   primaryKey = recvId
   primaryIds = append(primaryIds, recvId)
   pkName = "Frecv_fee_id"
  } else if strings.HasPrefix(tableName, "t_aftersale") {
   afterSaleId := oRs.GetInt64("Faftersale_id")
   primaryKey = afterSaleId
   primaryIds = append(primaryIds, afterSaleId)
   pkName = "Faftersale_id"
  } else {
   continue
  }
  dealMap["primaryKey"] = primaryKey
  dealMap["strDealId"] = strDealId
  dealDataMap[primaryKey] = dealMap
  //dealDataSlice = append(dealDataSlice, dealDataMap)
 }

 if len(primaryIds) == 0 {
  return
 }
 a, _ := json.Marshal(primaryIds)
 b := strings.ReplaceAll(string(a), "[""(")
 pIds := strings.ReplaceAll(b, "]"")")
 txDealSql = fmt.Sprintf("SELECT * FROM %v WHERE %v IN %v ", tableName, pkName, pIds)

 txRet, err := txDBPool.Query(txDealSql)
 if err != nil {
  md_log.Errorf(-1nil"查询 txDeal 数据失败, tableName:%v, txDealSql:%v, err:%v", tableName, txDealSql, err)
  return
 }

 txHasRecord := false
 for txRet.Next() {
  CompareField(tableInfoMap, dealDataMap, txRet, pkName, tableName)
  txHasRecord= true
 }
 if !txHasRecord {
  difMap := make(map[string]interface{})
  md_log.Debugf("查询 txDeal 数据 empty, tableName:%v, strDealId:%v", tableName, strDealId)
  difMap["primaryKey"] = string(a)
  difMap["dealId"] = strDealId
  difMap["tableName"] = tableName
  difMap["desc"] = fmt.Sprintf("txCloud no record")
  difCh <- difMap
 }
 txRet.Close()


 md_log.Keyf("[End compare logic] - tableName:%v, startTime:%s, endTime:%s \n", tableName, startTime, endTime)

 return
}

总结

  • 不能并发操作同一个数据库连接
  • 不能无限制的开启 Goroutine
  • 嵌套循环时,看下时间复杂度,适当做下优化
  • ...

比对工具代码路径:

github.com/Scoefield/g…

转载自:https://juejin.cn/post/7149921591458512933
评论
请登录