likes
comments
collection
share

Canal 结合 RocketMQ 实现数据的增量同步(一)

作者站长头像
站长
· 阅读数 14

前言

Canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,工作原理如下:

  • Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 Canal )
  • Canal 解析 binary log 对象(原始为 byte 流)

Canal 结合 RocketMQ 实现数据的增量同步(一)

可以用于以下业务场景:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 Canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

Tips: 以上内容来自官网,详细内容建议先看一遍官方 wiki 文档。

Canal 使用

数据库配置

在正式安装 Canal 之前,首先需要对 MySQL(本文使用版本为 5.7.34) 的配置进行修改,同时我们可以新建一个测试用户和测试库来完成后续的步骤,首先是修改my.cnf配置文件,增加以下配置内容:

[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1

然后通过root账号新建一个名为canal的用户和名为canal的库并授权给canal

-- 新建 canal 用户, 密码为 canal
CREATE USER canal IDENTIFIED BY 'canal';
-- 新建 canal 数据库并给 canal 用户授予权限
CREATE DATABASE canal CHARACTER SET utf8mb4;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
GRANT ALL PRIVILEGES ON canal.* TO 'canal'@'%';
FLUSH PRIVILEGES;

然后创建一个user表并添加几条数据用于后续测试:

CREATE TABLE `user` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '用户 id',
  `username` varchar(50) DEFAULT NULL COMMENT '用户名',
  `password` varchar(50) DEFAULT NULL COMMENT '密码',
  `email` varchar(45) DEFAULT NULL COMMENT '邮箱',
  `phone` varchar(15) DEFAULT NULL COMMENT '手机号码',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;

INSERT INTO `canal`.`user` (`id`, `username`, `password`, `email`, `phone`) VALUES 
    (1, '姜磊', 'k0VP$l@ru', 't.wazmcs@qxfvsstyo.uy', '18145206808'),
    (2, '丁洋', '8pig73*dW', 'h.wsecj@wmlp.li', '19832458514'),
    (3, '邱秀兰', '5G)c@7RyV', 'c.afkrfcr@rnhewu.org.cn', '18656022523'),
    (4, '孔洋', 'KjvLG*BP', 'r.tbnmdyh@pzzuo.jo', '18674498531'),
    (5, '董霞', '%fqmhybp3', 'o.hnlu@hhyvqxbv.eg', '18192674843');

ps: 以上数据为 mock 生成,如有雷同,纯属巧合。

安装 Canal

下载

访问官网releases页面,下载并安装 1.1.5 版本:

mkdir /data/canal
cd /data/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz
rm -rf canal.deployer-1.1.5.tar.gz

修改配置

修改conf/example/instance.properties的以下配置为自己的数据库连接信息:

# 不能和 mysql 的 server_id 重复
canal.instance.mysql.slaveId=2

# position info
canal.instance.master.address=192.168.3.13:3306

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

启动

sh bin/startup.sh

防火墙设置

firewall-cmd --zone=public --add-port=11111/tcp --permanent
firewall-cmd --reload

测试

编写以下测试代码进行连接测试,完整代码已上传到GitHub

/**
 * Canal 监听
 *
 * @author zjw
 * @date 2022-01-27
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class CanalListener {

    private final CanalConfig canalConfig;

    @PostConstruct
    private void init() {
        listen();
    }

    /**
     * 监听 Canal 增量数据
     */
    private void listen() {
        new Thread(() -> {
            // 连接到 Canal
            CanalConnector connector = connect();
            try {
                // 循环获取数据库变更信息
                while (true) {
                    Message message = connector.getWithoutAck(canalConfig.getBatchSize());
                    long batchId = message.getId();
                    List<CanalEntry.Entry> entryList = message.getEntries();
                    if (batchId == -1 || entryList.isEmpty()) {
                        Util.sleep(500);
                    } else {
                        // 存在变更信息则打印
                        entryList.forEach(this::printEntry);
                    }
                    connector.ack(batchId);
                }
            } finally {
                connector.disconnect();
            }
        }).start();
    }

    /**
      * 连接到 Canal
      *
      * @return 连接信息
      */
    private CanalConnector connect() {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()),
                canalConfig.getTopic(),
                canalConfig.getUsername(),
                canalConfig.getPassword()
        );
        connector.connect();
        connector.subscribe();
        connector.rollback();
        return connector;
    }

    /**
     * 打印变更实体信息
     *
	 * @param entry 实体信息
     */
    @SneakyThrows
    private void printEntry(CanalEntry.Entry entry) {
        CanalEntry.EntryType entryType = entry.getEntryType();
        if (entryType == CanalEntry.EntryType.TRANSACTIONBEGIN 
            	|| entryType == CanalEntry.EntryType.TRANSACTIONEND) {
            return;
        }
        String lineSeparator = System.lineSeparator();
        StringBuilder info = new StringBuilder(lineSeparator);
        info.append("==========数据变更信息==========").append(lineSeparator);
        CanalEntry.Header header = entry.getHeader();
        info.append(String.format(
            "数据库.表名: %s.%s%n", header.getSchemaName(), header.getTableName()));
        info.append(String.format("操作类型: %s%n", header.getEventType()));
        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            CanalEntry.EventType eventType = rowChange.getEventType();
            if (eventType == CanalEntry.EventType.DELETE) {
                info.append(String.format(
                    "delete: %s%n", getDataInfo(rowData.getBeforeColumnsList())));
            } else if (eventType == CanalEntry.EventType.INSERT) {
                info.append(String.format(
                    "insert: %s%n", getDataInfo(rowData.getAfterColumnsList())));
            } else {
                info.append(String.format(
                    "update: %s%n", getDataInfo(rowData.getAfterColumnsList())));
            }
        }
        log.info(info.toString());
    }

    /**
     * 获取行更改信息
     *
	 * @param columns 行列表
     */
    private String getDataInfo(List<CanalEntry.Column> columns) {
        return JSON.toJSONString(
                columns.stream()
                        .collect(Collectors.toMap(
                            CanalEntry.Column::getName, CanalEntry.Column::getValue))
        );
    }

}

然后修改id5的用户的密码由%fqmhybp3变为%fqmhybp4,控制台会打印以下信息:

Canal 结合 RocketMQ 实现数据的增量同步(一)

可以发现我们顺利地获取到了数据的变更信息,通过变更信息,我们就可以执行数据的同步修改、刷新缓存等操作了。

Dokcer 安装 Canal

确保本机已安装 Docker 和 Dokcer Compose,然后创建docker-compose.yml文件:

version: '3'
services:
  canal:
    container_name: canal_latest
    image: canal/canal-server
    restart: always
    ports:
      - 11111:11111
    environment:
      - canal.instance.mysql.slaveId=2
      - canal.instance.master.address=192.168.3.13:3306
      - canal.instance.dbUsername=canal
      - canal.instance.dbPassword=canal
    volumes:                                
      - ./conf:/admin/canal-server/conf
      - ./logs:/admin/canal-server/logs

然后在当前目录执行docker-compose up -d即可,后续连接步骤参考上节即可。

RocketMQ 配置

安装 RocketMQ

完成了 Canal 的基本安装及使用介绍后,下面再来简要介绍 RocketMQ 的安装,这里只展示使用 Docker 的安装方式,首先创建docker-compose.yml文件:

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    networks:
      rmq:
        aliases:
          - rmqnamesrv
  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./conf/broker.conf:/etc/rocketmq/broker.conf
    environment:
      NAMESRV_ADDR: "rmqnamesrv:9876"
      JAVA_OPTS: " -Duser.home=/opt"
      JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker
  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8076:8080
    environment:
      JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole
networks:
  rmq:
    name: rmq
    driver: bridge

然后创建conf文件夹与docker-compose.yml文件同级,并在conf文件夹中新建broker.conf文件,内容如下:

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
# 修改为主机 IP
brokerIP1=192.168.3.13
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

然后执行docker-compose up -d即可成功启动。

防火墙设置

firewall-cmd --zone=public --add-port=8076/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --reload

本地访问192.168.3.13:8076测试:

Canal 结合 RocketMQ 实现数据的增量同步(一)

Canal 配置 RocketMQ

修改配置

修改conf/canal.properties中的以下配置:

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ

rocketmq.producer.group = canal_group
rocketmq.namesrv.addr = 192.168.3.13:9876

修改conf/example/instance.properties的以下配置:

# mq config
canal.mq.topic=canal_topic

ps: 以上配置基于前文配置。

然后执行sh bin/restart.sh即可应用修改后的配置。

测试

这里把id5的用户名修改为canal,然后在 RocketMQ 的管理界面查看消息:

Canal 结合 RocketMQ 实现数据的增量同步(一)

可以发现变更信息已经顺利发送到 RocketMQ 中了。

编码测试

下面再通过实际编码进行测试,完整代码及配置访问GitHub即可:

/**
 * 用户表 Canal 变更 RocketMQ 监听器
 *
 * @author zjw
 * @date 2022-01-30
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "canal_topic",
        consumerGroup = "canal_group"
)
public class UserCanalListener implements RocketMQListener<CanalMessage<User>> {

    @Override
    public void onMessage(CanalMessage<User> message) {
        String lineSeparator = System.lineSeparator();
        StringBuilder info = new StringBuilder(lineSeparator);
        info.append("==========数据变更信息==========").append(lineSeparator);
        info.append(String.format(
            "数据库.表名: %s.%s%n", message.getDatabase(), message.getTable()));
        info.append(String.format("操作类型: %s%n", message.getType()));
        message.getData().forEach(user -> info.append(user).append(lineSeparator));
        log.info(info.toString());
    }

}

id5的用户名恢复,然后查看控制台打印信息: Canal 结合 RocketMQ 实现数据的增量同步(一)

配置 Canal 控制台

下载

最后再来简要介绍一下 Canal Admin 的安装及使用,访问官网releases页面,下载并安装 1.1.5 版本:

mkdir /data/canal-admin
cd /data/canal-admin
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
tar -zxvf canal.admin-1.1.5.tar.gz
rm -rf canal.admin-1.1.5.tar.gz

修改配置

执行vi conf/application.yml,修改其中的数据库配置:

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 192.168.3.13:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

然后将conf/canal_manager.sql导入到数据库中(使用 root 用户导入并授权给 canal 用户),其中授权语句如下:

GRANT ALL PRIVILEGES ON canal_manager.* TO 'canal'@'%';
FLUSH PRIVILEGES;

启动

执行以下命令启动控制台:

sh bin/start.sh

防火墙设置

firewall-cmd --zone=public --add-port=8089/tcp --permanent
firewall-cmd --reload

修改 Canal Server 配置

然后修改之前下载的 Canal 中conf/canal.properties中的如下配置:

# canal admin config
canal.admin.manager = 192.168.3.13:8089

然后重启 Canal:

sh bin/restart.sh

然后在本地登录访问192.168.3.13:8089(记得修改为自己的 ip)进行登录,注意这里的密码和 Canal Admin 配置中的密码不是一个东西,这里的密码为123456

Canal 结合 RocketMQ 实现数据的增量同步(一)

之后便可以进行 Canal Server 的管理了:

Canal 结合 RocketMQ 实现数据的增量同步(一)

总结

本文简单介绍了关于 Canal 的安装使用及结合 RocketMQ 的基本使用方式,在后续将会以一些实际应用案例(例如在数据库表的数据变更时,执行更新缓存相关操作,就不再需要在代码有很多分散的缓存更新操作了)再来介绍 Canal 的使用。