likes
comments
collection
share

大规模异构数据迁移的探索与实践

作者站长头像
站长
· 阅读数 12

1.背景

近一年交易对系统的技术架构做了重大升级和改造,其中包括订单、运单、账单模型升级,升级过程中很重要的一部分是数据迁移。在数据迁移过程中面临如下挑战:

由于是交易的核心数据的迁移,并且迁移过程中业务并行运行,所以需要保障在迁移中和迁移后都不能影响业务。 数据量大:账单和订单数据规模庞大(订单、账单相关数据在百亿级以上),迁移过程需要高效处理大规模数据,同时也要高质量保障数据无丢失及准确性。 异构数据复杂:新老数据库表结构差异很大,在迁移中需要做大量且复杂的数据转换。 业务运行要稳:由于是交易的核心数据的迁移,并且迁移过程中业务并行运行,所以需要保障在迁移中和迁移后都不能影响业务。

大规模异构数据迁移的探索与实践

2.迁移工具选型

2.1 常见迁移工具

Sqoop:apache开源的一款在关系数据库和Hadoop之间传输数据的工具,它更偏向于关系型数据库与Hadoop间的迁移。 大规模异构数据迁移的探索与实践 yugong:愚公移山,阿里为去IOE开发的一款从Oracle到Mysql的数据迁移工具,它偏向单表间的数据迁移,扩展性相对较弱。

大规模异构数据迁移的探索与实践

Datax:阿里开源的一个被广泛使用的异构数据源离线同步工具,它提供MySQL、Oracle、PgSQL、HDFS、Hive、HBase等各种异构数据源之间高效同步数据的功能,它偏向于离线数据同步。

大规模异构数据迁移的探索与实践

2.2 现状分析

表分片:交易的数据库表是水平分表的,一张逻辑表对应若干分片表,因此迁移需要考虑到表分片的问题。

大规模异构数据迁移的探索与实践

自定义模型转换 :交易数据迁移中最为复杂、工作量最大的一块是数据模型的转换,无法用SQL来完成,需要编写大量的数据读取、转换、持久化的代码。 分布式任务调度:因为数据量庞大,为了提高数据迁移速率,需要拆分为多个任务并分配给多台机器并行执行。 灵活流程控制:迁移周期长,在迁移过程中会涉及代码改动,以适应不同的业务诉求和数据场景,需支持可延伸、可中断、可恢复。 任务编排:我们的数据是按时间段拆分顺序迁移,当同时切分多个时段的任务时,需要编排这些任务的执行顺序。 稳定性保障:数据迁移过程中是要与业务并行运行,需具备可限流、可降级、可熔断、可重试和告警能力,确保万无一失。

2.3 选型结果

工具对比:

大规模异构数据迁移的探索与实践

在面对交易数据迁移的这些特点时,业内常见的迁移工具都不能很好的支持,加之工具使用成本高、难以定制化等方面的考量,因此决定自建一个**适合交易**的数据迁移工具。

3. 自建迁移框架

架构图:

大规模异构数据迁移的探索与实践

3.1 角色

Supervisor:一个JVM实例,负责任务生成、拆分、分发、编排、管理等,supervisor_id为ip:port

Worker:JVM实例,负责具体任务的执行,执行完成后通知Supervisor,worker_id为group:uuid:ip:port

3.2 注册中心

Supervisor与Worker通过注册中心相互发现。

注册中心:通过Redis的Zset和pub/sub实现,Zset做角色的注册及发现,pub/sub做上线/下线的通知。 register:定期保持心跳,更新socre为当前毫秒时间戳。 discovery:定期去更新对方角色列表,使用zremrangebyscore移除过期死亡的角色。 pub/sub:当角色上下线时,会发送事件通知对方角色实时更新discovery列表。

大规模异构数据迁移的探索与实践

大规模异构数据迁移的探索与实践

3.3 核心流程

大规模异构数据迁移的探索与实践3.3.1 创建job

job是用来定义任务配置的表数据,通过job来指定何时触发做何事,使用方可通过Openapi接口来创建job

以下是job的一份配置样例数据(只列了一些主要字段)

{
    "jobGroup": "test",
    "jobName": "XXX数据迁移【20210101~20210331】",
    "jobHandler": "xxx.xxxxxx.xxx.xxx.xxx.DataMigrateJobHandler",
    "jobParam": "{\"batch_size\":200,\"parallelism\":50,\"sharding_number\":1024,\"from_create_time\":\"2021-01-01\",\"to_create_time\":\"2021-03-31\"}",
    "triggerType": 2,
    "triggerConf": "2022-08-21 21:40:00",
    "routeStrategy": 1
}

jobHandler:由使用方编写的自己的任务处理器

triggerType:触发器类型,具体的值在triggerConf字段中配置,支持多种类型

1:Cron表达式,如0 0 23 * * ?

2:指定时间触发一次,如2022-08-21 21:40:00

3:任务依赖,配置父jobId,当父任务执行完成会触发执行子任务(多个父jobId以逗号分隔)

routeStrategy:分发任务给Worker的路由策略

1:轮询

2:随机

3:一致性哈希

4:本地优先,当一个JVM同时是Supervisor和Worker时,Supervisor会优先分发给本地的Worker

3.3.2 生成任务

Supervisor会定时去扫描job表,获取1分钟内将要触发执行的job数据,然后生成instance以及拆分为多个task。

大规模异构数据迁移的探索与实践3.3.3 分发任务

Supervisor会按Job配置的路由策略把task分发给Worker执行。分发方式使用Redis List数据结构实现。分发后如果Worker宕机且未执行,Supervisor有一个定时任务会扫描未执行的task重新分发。

worker_key: tasks.dispatch.group:uuid:ip:port任务分发redis.rPush({worker_key}, {task_data})

大规模异构数据迁移的探索与实践3.3.4 接收任务

Worker有一个定时任务从redis list中接收task,然后放入时间轮中。当到达触发时间后,会把task提交到自定义的线程池WorkerThreadPool中,交由WorkerThread线程执行。

Redis lua脚本

local ret = redis.call('lrange', {worker_key}, 0, {batch_size}-1);
redis.call('ltrim', {worker_key}, {batch_size}, -1);
return ret;

任务的流转过程

大规模异构数据迁移的探索与实践3.3.5 执行任务

WorkerThread会创建JobHandler对象并执行JobHandler#execute方法,execute方法就是使用方的具体业务代码。

大规模异构数据迁移的探索与实践

3.4 Checkpoint

任务执行期间可以使用Checkpoint来保存task的执行快照,以便在宕机/重启/暂停后的恢复时能继续执行,Checkpoint有以下一些使用场景:

1.如果是在循环体中处理业务,可每隔一定周期调用checkpoint保存执行快照。 2.发生异常时,在中断任务前调用checkpoint保存异常前的执行快照。 3.可以在业务处理完的最后调用checkpoint保存最终的执行结果数据。

3.5 暂停/恢复

暂停:客户端可以调用openapi接口来暂停正在执行的任务,暂停时可使用Checkpoint保存任务当前的执行快照。

大规模异构数据迁移的探索与实践

恢复:客户端可以调用openapi接口来恢复已被暂停的任务,恢复时先读取暂停前的执行快照,然后从上一次的执行快照中恢复继续执行。

大规模异构数据迁移的探索与实践

3.6 限流

在数据迁移过程中,为保障平稳运行不影响业务,需要对迁移速率做限流:根据DB资源情况调整流量按业务高低峰分时段限流异常时自动限流压测时手动限流等场景。

{
    "flow_limit":{
        "07": 100,
        "12": 50,  // 中午12点后,限制每个POD每秒最多迁移50个订单
        "20": 200,
        "22": 500,
        "00": 700
    }
}

常见的限流算法有漏桶算法、令牌桶算法等,迁移框架是基于Guava库的令牌桶算法来做的二次封装。

漏桶算法:当请求进入漏桶中,漏桶以一定速度响应,请求速度过大直接溢出丢弃,速率平滑。缺点是无法很好的处理突发流量

大规模异构数据迁移的探索与实践

令牌桶算法:请求获取令牌,当没有令牌可拿时阻塞或者拒绝服务,允许一定程度的突发调用。

大规模异构数据迁移的探索与实践

3.7 错误率熔断

通过bit set来统计最近N条数据迁移的错误率,当错误率高于指定的阈值则自动暂停任务。

1.创建一个指定长度(如10240位)的bit set,初始化全为1(成功)。 2.每批次迁移N条数据,对应迁移成功或失败的结果写入bit set。 3.统计迁移失败的比率,如果大于阈值,则直接自动暂停任务。 大规模异构数据迁移的探索与实践

4. 迁移实践

方案图:

大规模异构数据迁移的探索与实践

4.1 迁移流程

数据迁移流程分为4个步骤:

源数据扫描:基于各个业务的基表的物理分片进行源数据扫描,可按照时间范围、业务类型等条件筛选。 数据模型转换:根据各个业务需求,将源数据结构转换为目标数据的结构。 目标数据持久化:将转换好的目标数据,持久化到目标数据库中。 结果数据核对:持久化目标数据后,对目标数据进行数据质量核对。

大规模异构数据迁移的探索与实践

4.2 迁移保障

4.2.1 失败重试

当迁移的任务出现异常时,会立即进行一次重试,重试失败后, 我们会将失败的任务数据记录到重试表中,当达到最大重试次数后仍失败时,系统将触发告警,通知相关人员介入处理。

我们通过SQL语法特性来支持数据的可重复迁移

INSERT INTO target_table (...) VALUES (...) ON DUPLICATE KEY UPDATE ...

4.2.2 数据核对

为了确保数据迁移的准确性,我们使用了以下核对机制:

**迁移前离线核对:**在双写后,我们通过大禹核对新老库数据是否一致,保障双写准确性以及为后续迁移核对做铺垫。 **迁移中实时核对:**在迁移过程中,我们通过调用业务新老查询接口,将迁移数据重新转换为源数据的异源同构数据后,进行字段核对。 **迁移后离线核对:**在迁移完成后,我们通过数仓抽取全量的数据,在大禹进行全量数据的核对,确保数据的一致性。 迁移后流量回放:在迁移完成后,我们将从线上录制真实的旧链路请求,并回放到新链路的对应接口中,对比新旧链路结果的差异,确保差异都是预期内的,以确认线上查询请求的结果符合预期。

大规模异构数据迁移的探索与实践

4.3 迁移调控

为了确保在迁移过程中不影响业务系统,我们通过以下方法进行调控:

**任务中断恢复:**异常情况下,可中断迁移任务以避免进一步影响业务流程。在问题解决后,可以重新启动新的迁移任务或恢复中断的任务,确保迁移流程的稳定性和连续性。 **速率动态调整:**对于迁移过程中的性能指标和异常情况,可以通过实时配置,调整读取源数据的速率,降低迁移对业务系统的影响。 **错误率阈值熔断:**可以设定迁移错误率的阈值,如果错误率超过阈值,将终止迁移任务,待问题确认或修复后再重新进行迁移。在错误率未达到阈值之前,迁移错误不会影响整个迁移任务的执行。

大规模异构数据迁移的探索与实践

4.4 迁移策略

通过配置多个任务,指定任务的执行优先级,指定任务迁移数据的时间、业务类型等限制范围,按照下面的策略进行数据迁移:

**测试数据:**迁移测试地区全量数据,确认测试地区数据迁移无问题。 **早期数据:**逐步迁移最早半年10%的数据,用于灰度观察及验证,确认早期数据迁移无问题。 **近期数据:**逐步迁移最近半年10%的数据,用于灰度观察及验证,确认近期数据迁移无问题。 **全量数据:**从前往后按时间段分多批次迁移所有数据。

大规模异构数据迁移的探索与实践

4.5 降级预案

当迁移后如果发现数据有问题时,我们需要有快速的方案做降级止血,防止问题进一步扩大影响,我们提供了两种方案降级:

**打标降级:**在迁移后可批量或按时间范围对订单的双写标记置为false,使订单沿用旧链路逻辑操作。 **删除降级:**在迁移后通过接口能批量删除订单新库数据及打标信息,使订单沿用旧链路逻辑操作。

5、总结与展望

通过本次大规模数据迁移探索与实践,我们沉淀了一套通用、高效、可靠的迁移工具及方案。尽管不同的迁移任务可能在复杂程度和侧重点上有所差异,但总体的迁移诉求及过程基本相似。通过这套工具及方案的实施,能高效的完成大规模数据迁移及保障迁移中数据的完整性、准确性。

在未来我们重点会去探索迁移工具平台化,当前主要是在面对不同任务需要针对性的编码,无法快速高效的复用到其他团队,未来我们将建成平台,让不同业务团队可以通过平台动态配置化的能力来自动完成迁移。