MongoDB 6.0 (九)binlog监听
MongoDB 6.0 的 binlog监听
Change Stream
什么是 Change Stream
Change Stream可以直译为"变更流",也就是说会将数据库中的所有变更以流式的方式呈现出来。用户可以很方便地对数据库建立一个监听(订阅)进程,一旦数据库发生变更,使用change stream的客户端都可以收到相应的通知。
Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:
同步触发是指处理逻辑与原来的数据库操作是在同一个事务中完成。比如根据某个插入事件会在另一表中再插入一条数据,这两个操作是在同一个事务中完成。
异步操作是等的原来的数据库操作完成之后才会调用下一个事件。
Change Stream的实现原理
Change Stream是基于oplog 实现的。它在 oplog 表上开启一个 tailablecursor来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。
被追踪的变更事件主要包括:
• insert/update/delete:插入、更新、删除;
• drop:集合被删除;
• rename:集合被重命名;
• dropDatabase:数据库被删除;
• invalidate:drop/rename/dropDatabase将导致invalidate被触发,并关闭 change stream。(因为表都没有了,如果做基于表的事件监听呢)
Change Stream与可重复读
Change Stream 只推送已经在大多数节点上提交的变更操作。即“可重复读”的变更。这个验证是通过 {readConcern: “majority”} 实现的。因此:
• 未开启 majority readConcern 的集群无法使用 Change Stream;
• 当集群无法满足 {w: “majority”} 时,不会触发 Change Stream。
变更过滤
如只对某些类型的变更事件感兴趣,可以使用使用聚合管道的过滤步骤过滤事件。
只对insert和delete监听
Change Stream实战
在复制集中操作
进入容器
docker exec -it mongo01 /bin/bash
连接到主节点
mongo ip:27017
参数1是过滤器,可以选择是否需要过滤监控事件。
参数2是3秒内没有事件则退出(等待时间设置长点,要不还没有插入数据就退出监控了)
db.test.watch([], {maxAwaitTimeMS: 60000})
打开另一个窗口进入到主节点
插入一条数据
db.test.insert({x:123})
查看监控窗口
故障恢复
假设在一系列写入操作的过程中,订阅 Change Stream 的应用在接收到“写3”之后于 t0 时刻崩溃,重启后后续的变更怎么办?
想要从上次中断的地方继续获取变更流,只需要保留上次变更通知(它是一个文档)中的 _id 即可。
下图所示是一次 Change Stream 回调所返回的数据。每条这样的数据都带有一个 _id,这个 _id 可以用于断点恢复。例如:
var cs = db.collection.watch([], {resumeAfter: <_id>})
changeStream表示从指定的_id之后继续处理而不是从头开始
即可从上一条通知中断处继续获取后续的变更通知。
pgsql 的 binlog监听
首先 pglog binlog监听需要满足哪些功能点
机器宕机,能支持断点续接
进行磁盘持久化
如果监听的表 长时间没有数据变动,delay_size 会变大
//设置心跳时间,就算没有数据 也会保持心跳
props.setProperty("heartbeat.interval.ms", "20000");
复制代码
for (ChangeEvent<String, String> r : records) {
try {
if (log.isDebugEnabled()) {
log.debug("{}\n{}", r.key(), r.value());
}
if (r.value() != null && r.value().startsWith("{"ts_ms")) {
continue;
}
xxx 具体数据处理
} catch (Exception e) {
log.error("PGLog-binlog param:[{}]", r, e);
}
}
复制代码
心跳这个是 当时上生产的时候,突然发现没有数据变更的时候 ,有报警,说delay了。。。这顿害怕
大概意思
数据库中有许多更新正在被跟踪,但只有极少数更新与连接器正在为其捕获更改的表和模式相关。这种情况可以通过周期性的心跳事件轻松解决。设置heartbeat.interval.ms连接器配置属性。
由于WAL由所有数据库共享,因此使用的WAL数量趋于增长,直到Debezium为其捕获更改的数据库触发事件。为了克服这一点,有必要:使用heartbeat.interval.ms连接器配置属性启用周期性心跳记录生成。定期从Debezium正在捕捉变化的数据库中发出更改事件。
其中
if (r.value() != null && r.value().startsWith("{"ts_ms")) {
continue;
}
复制代码
这是因为 如果没有数据来的话,会是ts_ms 开头的,代表,没有新数据
每次binlog传的size 太多,导致服务器处理不过来
props.setProperty("max.batch.size", "200");
复制代码
对多个表的监听,应该只有一个流进行监听
props.setProperty("table.include.list", schs.stream().map(BinlogConfig::getSch).map(a -> tables.stream().map(b -> a + "." + b).map(String::valueOf).collect(Collectors.joining(","))).map(String::valueOf).collect(Collectors.joining(",")));
复制代码
希望磁盘持久化offset,保持数据的正确性
props.setProperty("snapshot.mode", "never");
复制代码
小工具
查询数据库 offset推迟多少
select pg_replication_slots.*, pg_current_wal_lsn(), pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn)) as delay_size from pg_replication_slots;
pgsql 的其他的可以看
为了 兼容 pgsql 和 mysql 两个的binlog 推荐的实体
/**
* binlog对应的表名
*/
private String tableName;
/**
* 操作时间
*/
private Long operateTime;
/**
* 操作类型
*/
private String operateType;
/**
* data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串
*/
private List<Map<String, Object>> dataMap;
/**
* data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串
*/
private List<Map<String, Object>> oldMap;
之后的更新
这个专栏 之后的更新会在
转载自:https://juejin.cn/post/7216213764776689721