likes
comments
collection
share

MongoDB 6.0 (九)binlog监听

作者站长头像
站长
· 阅读数 57

MongoDB 6.0 (九)binlog监听

MongoDB 6.0 的 binlog监听

Change Stream

什么是 Change Stream

Change Stream可以直译为"变更流",也就是说会将数据库中的所有变更以流式的方式呈现出来。用户可以很方便地对数据库建立一个监听(订阅)进程,一旦数据库发生变更,使用change stream的客户端都可以收到相应的通知。

Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:

MongoDB 6.0 (九)binlog监听

同步触发是指处理逻辑与原来的数据库操作是在同一个事务中完成。比如根据某个插入事件会在另一表中再插入一条数据,这两个操作是在同一个事务中完成。

异步操作是等的原来的数据库操作完成之后才会调用下一个事件。

Change Stream的实现原理

Change Stream是基于oplog 实现的。它在 oplog 表上开启一个 tailablecursor来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。

被追踪的变更事件主要包括:

• insert/update/delete:插入、更新、删除;

• drop:集合被删除;

• rename:集合被重命名;

• dropDatabase:数据库被删除;

• invalidate:drop/rename/dropDatabase将导致invalidate被触发,并关闭 change stream。(因为表都没有了,如果做基于表的事件监听呢)

Change Stream与可重复读

Change Stream 只推送已经在大多数节点上提交的变更操作。即“可重复读”的变更。这个验证是通过 {readConcern: “majority”} 实现的。因此:

• 未开启 majority readConcern 的集群无法使用 Change Stream;

• 当集群无法满足 {w: “majority”} 时,不会触发 Change Stream。

变更过滤

如只对某些类型的变更事件感兴趣,可以使用使用聚合管道的过滤步骤过滤事件。

只对insert和delete监听

MongoDB 6.0 (九)binlog监听

Change Stream实战

在复制集中操作

进入容器

docker exec -it mongo01 /bin/bash

连接到主节点

mongo ip:27017

参数1是过滤器,可以选择是否需要过滤监控事件。

参数2是3秒内没有事件则退出(等待时间设置长点,要不还没有插入数据就退出监控了)

db.test.watch([], {maxAwaitTimeMS: 60000})

MongoDB 6.0 (九)binlog监听

打开另一个窗口进入到主节点

插入一条数据

db.test.insert({x:123})

MongoDB 6.0 (九)binlog监听

查看监控窗口

MongoDB 6.0 (九)binlog监听

故障恢复

假设在一系列写入操作的过程中,订阅 Change Stream 的应用在接收到“写3”之后于 t0 时刻崩溃,重启后后续的变更怎么办?

MongoDB 6.0 (九)binlog监听


想要从上次中断的地方继续获取变更流,只需要保留上次变更通知(它是一个文档)中的 _id 即可。

下图所示是一次 Change Stream 回调所返回的数据。每条这样的数据都带有一个 _id,这个 _id 可以用于断点恢复。例如:

var cs = db.collection.watch([], {resumeAfter: <_id>})

changeStream表示从指定的_id之后继续处理而不是从头开始

即可从上一条通知中断处继续获取后续的变更通知。

MongoDB 6.0 (九)binlog监听

pgsql 的 binlog监听

首先 pglog binlog监听需要满足哪些功能点

机器宕机,能支持断点续接

进行磁盘持久化

如果监听的表 长时间没有数据变动,delay_size 会变大

//设置心跳时间,就算没有数据 也会保持心跳
props.setProperty("heartbeat.interval.ms", "20000");
复制代码
for (ChangeEvent<String, String> r : records) {
    try {
        if (log.isDebugEnabled()) {
            log.debug("{}\n{}", r.key(), r.value());
        }
        if (r.value() != null && r.value().startsWith("{"ts_ms")) {
            continue;
        }
        xxx 具体数据处理

    } catch (Exception e) {
        log.error("PGLog-binlog param:[{}]", r, e);
    }
}
复制代码

心跳这个是 当时上生产的时候,突然发现没有数据变更的时候 ,有报警,说delay了。。。这顿害怕

MongoDB 6.0 (九)binlog监听

大概意思

数据库中有许多更新正在被跟踪,但只有极少数更新与连接器正在为其捕获更改的表和模式相关。这种情况可以通过周期性的心跳事件轻松解决。设置heartbeat.interval.ms连接器配置属性。

由于WAL由所有数据库共享,因此使用的WAL数量趋于增长,直到Debezium为其捕获更改的数据库触发事件。为了克服这一点,有必要:使用heartbeat.interval.ms连接器配置属性启用周期性心跳记录生成。定期从Debezium正在捕捉变化的数据库中发出更改事件。

其中

if (r.value() != null && r.value().startsWith("{"ts_ms")) {
    continue;
}
复制代码

这是因为 如果没有数据来的话,会是ts_ms 开头的,代表,没有新数据

每次binlog传的size 太多,导致服务器处理不过来

props.setProperty("max.batch.size", "200");
复制代码

对多个表的监听,应该只有一个流进行监听

props.setProperty("table.include.list", schs.stream().map(BinlogConfig::getSch).map(a -> tables.stream().map(b -> a + "." + b).map(String::valueOf).collect(Collectors.joining(","))).map(String::valueOf).collect(Collectors.joining(",")));
复制代码

希望磁盘持久化offset,保持数据的正确性

props.setProperty("snapshot.mode", "never");
复制代码

小工具

查询数据库 offset推迟多少

select pg_replication_slots.*, pg_current_wal_lsn(), pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn)) as delay_size from pg_replication_slots;

pgsql 的其他的可以看

为了 兼容 pgsql 和 mysql 两个的binlog 推荐的实体

/**
 * binlog对应的表名
 */
private String tableName;
/**
 * 操作时间
 */
private Long operateTime;
/**
 * 操作类型
 */
private String operateType;

/**
 * data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串
 */
private List<Map<String, Object>> dataMap;

/**
 * data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串
 */
private List<Map<String, Object>> oldMap;

之后的更新

这个专栏 之后的更新会在

(98条消息) MongoDB 6.0 入门(二)_孔明兴汉的博客-CSDN博客

转载自:https://juejin.cn/post/7216213764776689721
评论
请登录