基于 canal 实现 Mysql Binlog 订阅和解析
本篇主要记录如何使用 canal 实现双机房数据相互同步的实现思路,主要包括 canal 的部署和 canal 解析 binlog。
canal 基本介绍
应用场景
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务,这些业务主要包括:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
基本原理
canal 也是在这种业务场景下诞生的。canal 的基本原理是 canal 通过模拟 mysql slave 的交互协议,将自己伪装为 mysql slave,然后向 mysql master 发送 dump 协议,mysql master 收到 dump 请求,开始推送 binlog 给 slave (也就是canal),canal 解析 binlog 对象(原始为byte流)获取到数据。
canal 中的角色
- Canal Admin:Canal Admin 设计上是为 canal 提供整体配置管理、节点运维等面向运维的功能,提供相对友好的 WebUI 操作界面,方便更多用户快速和安全的操作;canal admin 是在 1.1.4 版本才提供的。
- Canal Server(canal-deployer):代表一个 canal 运行实例,对应于一个 jvm,1个 server 对应 n 个 instance。
- Canal Instance:代表了一个实际运行的数据队列,包括了 EventPaser,EventSink,EventStore,metaManager等组件
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
canal 使用的前置条件
前面提到 Canal 的原理是将自己伪装成 mysql slave 来实现向 mysql master 发送 dump 请求来获取数据的;因此如果需要使用 Canal,则对应的 mysql 需要开启 binlog。binlog 数据格式部分,mysql binlog 按照生成的方式,主要分为:statement-based、row-based、mixed;考虑目前 canal 支持所有模式的增量订阅,但配合同步时,因为statement 只有 sql(时间函数导致数据不一致等问题),没有数据,无法获取原始的变更日志,所以一般建议为ROW模式。
show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
关于 mysql 开启 binlog 以及设置 binlog 格式,可以参考 canal 文档 或者查找其他网络技术文章。除了开启 binlog 和设置其格式之外,还需要提供一个用于同步的账号给 canal;
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
canal 部署
笔者项目中使用的版本是 v.1.1.7。主要部署 canal-admin 和 canal-deployer 两个组件。canal-deployer 可以单独使用;如果需要使用 canal-admin 则建议先部署 canal-admin。
canal-admin 部署
- 下载和解压
#进入安装包
cd /home/middleware/canal
#下载admin安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.admin-1.1.7.tar.gz
#创建admin安装目录
mkdir canal-admin
#解压到admin目录
tar -zxvf canal.admin-1.1.7.tar.gz -C ./canal-admin
- 配置修改
# 进入canal-admin目录
cd canal-admin
# 打开配置文件
vim conf/application.yml
# 修改配置内容后启动admin服务
bin/startup.sh
- 配置文件
server:
port: 8089 # canal-admin 控制台访问的默认端口
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
# admin控制台数据库
spring.datasource:
address: {你的数据库ip地址}:{你的数据库端口} # 如:192.168.0.110:3306
database: canal_manager # canal_admin 自己的数据库
# 需要对 canal 用户授权 canal_manager 的访问,也可以重建一个mysql 账户,不复用 canal 的
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
# 增加 &allowPublicKeyRetrieval=true;笔者在部署时遇到 连接mysql时报错:Public Key Retrieval is not allowed(不允许公钥检索)
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
hikari:
maximum-pool-size: 30
minimum-idle: 1
# canal-deployer连接admin的访问账号,这里密码为名问。canal-deployer中为密文
canal:
adminUser: admin
adminPasswd: 123456 # 官方文档是 admin,但是部署之后使用 admin 密码时会提示报错,默认就是 123456
- 启动之后可以直接通过地址进行访问(需要关注下 linux 端口的访问限制,需要开启防火墙的端口访问策略),界面如下:
安装 canal-deployer
- 下载和解压
#进入安装包
cd /home/middleware/canal
#下载admin安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
#创建admin安装目录
mkdir canal-deployer
#解压到admin目录
tar -zxvf canal.deployer-1.1.7.tar.gz -C ./canal-deployer
-
默认下载之后,canal-deployer 包括两个配置文件,canal_local.properties 和 canal.properties;这里以最简单的 local 方式给出案例。关于这两个配置文件,官方文档也有解释:github.com/alibaba/can…
-
修改 canal_local.properties 文件
# register ip 本服务在 admin 中的显示 ip,名称也是用这个
# 这里主要是考虑到服务器的多网卡问题,笔者在部署时在没有指定这个 ip 时,默认给了一个内网地址
canal.register.ip = 172.16.0.101
# canal admin config
# canal-admin 的访问地址
canal.admin.manager = 172.16.0.101:8089
# canal-deployer 的控制端口
canal.admin.port = 11110
# canal-admin 的访问账号
canal.admin.user = admin
# canal-admin 的访问密码,与 adminPasswd 对应
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register 集群配置
# 启动后自动注册到 admin 中
canal.admin.register.auto = true
# 集群名称 ,这里是单点可空着即可
canal.admin.register.cluster =
# 节点名称名称,里是单点可空着即可
canal.admin.register.name =
- 启动 canal-deployer
# 启动服务以 canal_local.properties 配置方式
sh bin/startup.sh local
# 查看启动日志
tail -f logs/canal/canal.log
- 刷新 admin ,可以看到当前 deployer server 已经被注册到 admin 了
canal 应用
在完成 canal-admin 和 canal-deployer 部署之后,则可以创建 canal instance 实例;在 canal 中,实例化是指创建一个 canal 实例,用于连接到数据库并捕获数据库变更数据。canal 实例可以订阅一个或多个数据库实例,并实时获取数据变更事件。通过实例化,我们可以轻松地监控数据库的变化,并在数据发生变更时采取相应的操作。
这里的载入模版点击之后会生成一份默认的配置文件,也可以参考下载的压缩包中解压之后的 conf/example/instance.properties 配置文件,大致如下(下面是以 TCP mode 为例 的,载入模版默认生成的也是基于 TCP 的模版):
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=true
# position info
canal.instance.master.address=172.16.0.101:3306
#mysql起始的binlog文件,默认最新数据
canal.instance.master.journal.name=
#mysql起始的binlog偏移量,只会在配置binlog文件中寻找
canal.instance.master.position=
#mysql起始的binlog时间戳,只会在配置binlog文件中寻找
canal.instance.master.timestamp=
#ysql起始的binlog的gtid,只会在配置binlog文件中寻找
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# mysql连接用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# 匹配table表达式,需要处理的表
canal.instance.filter.regex=blockchain\..*
# 匹配过滤table表达式,不需要处理的表
canal.instance.filter.black.regex=
# 匹配table字段表达式,指定传递字段,不指定全传。
#(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id
/name/contact/ch
# 匹配过滤table字段表达式,不传递的字段,canal.instance.filter.field为空时生效
@(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq消息配置
# mq topic
canal.mq.topic=example
# 动态topic配置,topic为表名
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
# mq分区
canal.mq.partition=0
# hash分区数量 ,为空默认为1个分区
#canal.mq.partitionsNum=3
# hash分区主键,没有冒号就使用表名进行分区。有冒号使用字段进行分区。
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
关于更多 canal 的配置,可以参考:github.com/alibaba/can…
然后操作中选择启动即可。
使用 canal-spring-boot-starter 来订阅数据变更
canal-spring-boot-starter 的 github 地址是:github.com/NormanGylle…
从 readme 来看,接入比较简单,但是文档中的 1.2.6 版本在 maven 中央仓库是没有,最新是 1.2.1 版本;所以建议使用的时候可以自己拉代码进行构建。
- 表结构
+-------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+----------------+
| id | bigint | NO | PRI | <null> | auto_increment |
| create_time | datetime | YES | | <null> | |
| update_time | datetime | YES | | <null> | |
| group_id | varchar(255) | YES | | <null> | |
| name | varchar(255) | YES | | <null> | |
+-------------+--------------+------+-----+---------+----------------+
- 对应的 java 类
@Data
public class MemberGroupEntity {
private Long id;
private String group_id;
private String name;
private Date create_time;
private Date update_time;
}
- 实现 EntryHandler 接口
/** 表名 member_group */
@CanalTable(value = "member_group")
@Component
public class MemberGroupHandler implements EntryHandler<MemberGroupEntity> {
@Override
public void insert(MemberGroupEntity memberGroupEntity) {
System.out.println("insert memberGroupEntity: " + memberGroupEntity);
}
/**
* 对于更新操作来讲,before 中的属性只包含变更的属性,after 包含所有属性,通过对比可发现那些属性更新了
*
* @param before before
* @param after after
*/
@Override
public void update(MemberGroupEntity before, MemberGroupEntity after) {
System.out.println("update before memberGroupEntity: " + before);
System.out.println("update after memberGroupEntity: " + after);
}
}
测试将 name 从 group2 修改为 group1 ,得到的日志输出如下:
update before memberGroupEntity: MemberGroupEntity(id=null, group_id=null, name=group2, create_time=null, update_time=null)
update after memberGroupEntity: MemberGroupEntity(id=1, group_id=1, name=group1, create_time=Mon Mar 19 14:37:08 CST 2018, update_time=Mon Mar 19 14:37:10 CST 2018)
可以看到 对于更新操作来讲,before 中的属性只包含变更的属性,after 包含所有属性,通过对比可发现那些属性更新了
使用 canal 原生的客户端进行订阅
可以参考这里:github.com/alibaba/can…
这里仅给出关于 CanalConnector 创建连接的部分:
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
- AddressUtils.getHostIp():实际需要换成你的 canal deployer 的地址
- example 换成你的 instance 的名字,比如前面笔者新建的
blockchain-sync
其他的如端口、用户名和密码,按照你项目中的情况实际配置即可。测试情况:将 group1 再修改为 group2,输出如下:
================> binlog[mysql-bin.000001:80145] , name[blockchain,member_group] , eventType : UPDATE
-------> before
id : 1 update=false
create_time : 2018-03-19 14:37:08 update=false
update_time : 2018-03-19 14:37:10 update=false
group_id : 1 update=false
name : group1 update=false
-------> after
id : 1 update=false
create_time : 2018-03-19 14:37:08 update=false
update_time : 2018-03-19 14:37:10 update=false
group_id : 1 update=false
name : group2 update=true
可以看到的是,canal 原生的客户端在执行更新前后都可以拿到完整的行数据,这一点有别于使用 canal-spring-boot-starter
的方式。
使用集群模式
在新版本中集群模式主要依赖在 canal-admin 上来配置;笔者在部署 集群模式时也是踩了相当多的坑,这块主要原因还是在于国内的开源项目多多少少在文档上的重视程度是不够的。此外,canal-spring-boot-starter 这个 canal 的客户端因为本身就是非官方的,从代码提交来看,已经有3 年左右没有在维护了,笔者在基于 cluster 模式使用时,也踩了一些坑,并在基于本地代码做了修改之后重新构建才得以解决。
canal 集群模式依赖 zookeeper 来作为协调者,因此需要先部署 zookeeper,这里不在赘述。关于如何基于 canal-admin 配置集群可以参考这篇文档:【Canal】Canal Admin搭建Canal Server集群 ,内容比较简单。这里笔者主要抛出几个遇到的问题。
-
Unique expecting 0 or 1 results but got [2]:如果你是新建集群,请直接将 canal_manager 中的 canal_config 表清空,这里的问题应该是 canal 在读取数据库配置时存在多条记录;笔者在测试时,当保存主配置时出现过 network error 的报错,浏览器端看应该是跨域问题(很奇怪的跨域问题),从而导致了表单的重复提交。进而导致了查询时出现多条记录的问题。
-
auth :admin is failed 这个问题主要是 canal-admin 部署时,不要去修改 application.yaml 文件中的默认账户和密码(admin/admin),笔者是将其修改为(admin/123456) 之后,导致我在启动 canal-deployer 时会出现这个问题。
-
canal-spring-boot-starter 无法连接 zookeeper 的问题(ClusterCanalConnector : failed to connect to /xxxx:2181):笔者分析了 canal 原生的客户端 cluster api 和 canal-spring-boot-starter 中的实现时候,发现 canal-spring-boot-starter 中在创建 CanalConnector 时底层使用了 SimpleNodeAccessStrategy 这个实现,而不是 ClusterNodeAccessStrategy。详见下面两个图:
图 1(右侧是笔者更换了重载的方法)
图2
总结
笔者的项目中没有基于消息机制,而是完全基于 tcp 的方式;主要考虑是不想引入额外的中间件组件。从整体使用情况来看,canal 能够很好的解决我们实际生产中的一些问题。整体的上手难度,如果完全基于官方文档的话,可能会有一些蹩脚;这也是笔者想将这篇文章分享出来的主要原因,希望读者少踩坑。
那针对文中提到的几个问题,笔者也将进一步深入研究,如果合适的话,也会积极提交相关 PR 进行修复和优化;社区需要每个人的贡献才能走的更好。
转载自:https://juejin.cn/post/7376425155382263848