likes
comments
collection
share

Spring Boot + Canal 实现数据库实时监控

作者站长头像
站长
· 阅读数 30

Canal 工作原理

Spring Boot + Canal 实现数据库实时监控
  1. Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向 MySQL master 发送 dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave(也就是 Canal)
  3. Canal 解析 binary log 对象(原始为 byte 流)

MySQL 打开 binlog 模式

在 MySQL 配置文件 my.cnf 设置如下信息:

[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

改了配置文件之后,重启 MySQL,使用命令查看是否打开 binlog 模式:

SHOW VARIABLES LIKE 'log_bin';

Spring Boot + Canal 实现数据库实时监控

查看 binlog 日志文件列表:SHOW BINARY LOGS;

查看当前正在写入的 binlog 文件:SHOW MASTER STATUS; 记录文件名 File 和 Position 值

启动 Canal 服务

  1. 下载地址(canal.deployer-1.1.5.tar.gz):github.com/alibaba/can…

  2. 参数配置

详细配置可以参考:

blog.csdn.net/zhou12314/a…

www.cnblogs.com/zpan2019/p/…

1)canal.deployer-1.1.5\conf\canal.properties 进行全局配置,可以修改 IP、端口号或实例

# 默认有一个 example,需要增加实例的可以配置
canal.destinations = example

2)canal.deployer-1.1.5\conf\example\instance.properties 进行局部实例配置,可以修改数据库账号和密码、数据库表名、binlog 文件名和 position 等

# 没有改变的就没有贴出来,注意 MySQL 的用户名和密码
canal.instance.master.address=192.168.58.131:3306
# username/password
canal.instance.dbUsername=test
canal.instance.dbPassword=liubihao
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
  1. bin/start.sh 启动服务(Windows 系统为 bin/start.bat
Spring Boot + Canal 实现数据库实时监控

后端进行相应配置

修改 pom.xml

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.4</version>
</dependency>

新建 CanalClient.java

package org.nwpu.atcss.util;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.stereotype.Component;

import java.util.List;
import java.net.InetSocketAddress;

@Component
public class CanalClient {
  private static void printEntries(List<Entry> entries) throws Exception {
    for (Entry entry : entries) {
      if (entry.getEntryType() != EntryType.ROWDATA) {
        continue;
      }

      RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

      EventType eventType = rowChange.getEventType();
      System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                       entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                       entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

      for (RowData rowData : rowChange.getRowDatasList()) {
        switch (rowChange.getEventType()) {
          case INSERT:
            System.out.println("INSERT ");
            printColumns(rowData.getAfterColumnsList());
            break;
          case UPDATE:
            System.out.println("UPDATE ");
            printColumns(rowData.getAfterColumnsList());
            break;
          case DELETE:
            System.out.println("DELETE ");
            printColumns(rowData.getBeforeColumnsList());
            break;
          default:
            break;
        }
      }
    }
  }

  private static void printColumns(List<Column> columns) {
    for (Column column : columns) {
      System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    }
  }

  public static void main(String[] args) throws Exception {
    // hostname, port, destination, username, password
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
    try {
      connector.connect();
      // 监听的表,格式为数据库.表名,数据库.表名
      connector.subscribe(".*\\..*");
      connector.rollback();

      while (true) {
        Message message = connector.getWithoutAck(100); // 获取指定数量的数据
        long batchId = message.getId();
        if (batchId == -1 || message.getEntries().isEmpty()) {
          Thread.sleep(1000);
          continue;
        }
        // System.out.println(message.getEntries());
        printEntries(message.getEntries());
        connector.ack(batchId); // 提交确认,消费成功,通知server删除数据
        // connector.rollback(batchId);// 处理失败, 回滚数据,后续重新获取数据
      }
    } catch (Exception e) {
      System.out.println("Something Error.");
    } finally {
      connector.disconnect();
    }
  }
}

测试

启动 CanalClient.java

修改本地数据库内容之后,控制台成功监听并报告更新信息。

Spring Boot + Canal 实现数据库实时监控

FAQ

Canal 监听不到 MySQL 变化

参考文章:blog.csdn.net/weixin_3222…

CanalEntry 获取不到 RowData

是 MySQL 5.X 版本的原因,建议升级 MySQL 为 8.X 版本。