Oracle XStream初探
一、背景
XStream是Oracle官方提供的,是以数据同步为目的,由Oracle数据库组件和应用程序编程接口(API)组成,这些数据库接口可以让我们的客户端具备实时感知数据(DDL/DML)变化的能力。可以用于将数据实时迁移到下游数据库,不管是Oracle还是非Oracle。
二、和LogMiner对比
Logminer是Oracle最早提供的一个日志解析工具,本意是让系统管理员或者DBA在不侵入数据库的情况下,对数据库的历史事情做一些探查分析,并没有进一步将它用于更广泛的用途的计划。目前大多数的第三方开源框架比如Debezium都默认集成了Logminer,主要是因为没有更好的办法去解析闭源商业数据库。 Logminer它主要有两个问题:
性能
Logminer运行在Oracle内部且在日志落地之后,不可避免会消耗数据库自身的算力去完成Logminer的解析工作,所以Oracle对Logminer做了比较严格的资源限制,尽量减少对于数据库主要工作的影响。比如只分配给Logminer一个cpu核心,将Logminer日志解析速度限制在1w/s以下,所以在一些严格的场合,这就不满足性能需求了,可能会使日志解析延迟达到分钟,小时甚至天的级别。
政策不确定
Oracle的一些特性受到版本影响很大,比如Logminer有一个模式叫“Continuous Mining”,可以让用户不关心日志解析的细节,可以直接接受输出的事件流,对于用户来说很方便,但是这个特性在12版本标记为弃用,在19版本去除了,所以很多东西不确定。
三、如何使用
XStream在使用前需要完成一系列配置工作,现在以非CDB数据库举例。
配置管理员和用户
-- Creating an XStream Administrator user
sqlplus / as sysdba
-- sqlplus sys/password@host:port/SID as sysdba
-- 为XStream创建管理员用户和表空间
CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/orcl/xstream_adm_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER xstrmadmin IDENTIFIED BY dbz DEFAULT TABLESPACE xstream_adm_tbs QUOTA UNLIMITED ON xstream_adm_tbs;
GRANT CREATE SESSION TO xstrmadmin;
BEGIN
DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
grantee => 'xstrmadmin',
privilege_type => 'CAPTURE',
grant_select_privileges => TRUE,
container => 'ALL'
);
END;
-- Creating the connector’s XStream user
-- 为XStream创建连接用户和表空间
CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/orcl/xstream_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER xstrm IDENTIFIED BY dbz DEFAULT TABLESPACE xstream_tbs QUOTA UNLIMITED ON xstream_tbs;
-- 为连接用户授权
GRANT CREATE TABLE TO xstrm;
GRANT CREATE SESSION TO xstrm;
GRANT SELECT ON V_$DATABASE to xstrm;
GRANT FLASHBACK ANY TABLE TO xstrm;
GRANT SELECT ANY TABLE to xstrm;
GRANT LOCK ANY TABLE TO xstrm;
grant select_catalog_role to xstrm;
GRANT EXECUTE_CATALOG_ROLE TO xstrm;
-- 开启补全日志
-- alter database add supplemental log data (all) columns;
alter database add supplemental log data (primary key, unique index) columns;
exit;
配置出站服务
-- Create an XStream Outbound Server
-- XStream管理员登录,创建一个出站服务,参数值[server_name]值可以自定义
sqlplus xstrmadmin/password@host:port/SID
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := NULL;
schemas(1) := 'XSTRM';
DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
server_name => 'dbzxout',
table_names => tables,
schema_names => schemas
);
END;
/
exit;
绑定服务
-- Configure the XStream user account to connect to the XStream Outbound Server
-- DBA用户登录,将上一步创建的出站服务[server_name]和XStream连接用户绑定
sqlplus / as sysdba
-- sqlplus sys/password@host:port/SID as sysdba
BEGIN
DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
server_name => 'dbzxout',
connect_user => 'xstrm'
);
END;
/
exit;
-- 开启自动事务
set autocommit on
确定配置完毕
上面的工作做好之后,可以通过查询这个动态视图来检查XStream是不是处于就绪状态:
-- 查询捕获进程信息
SELECT CAPTURE_NAME, STATE,
TO_CHAR(CAPTURE_MESSAGE_CREATE_TIME, 'HH24:MI:SS MM/DD/YY') CREATE_MESSAGE,
TO_CHAR(ENQUEUE_MESSAGE_CREATE_TIME, 'HH24:MI:SS MM/DD/YY') ENQUEUE_MESSAGE
FROM V$XSTREAM_CAPTURE;
-- STATE字段显示为`WAITING FOR TRANSACTION`,说明我们的配置生效了
四、编程实例
官方文档中提供的示例比较简单,是直接使用API将上游Oracle输出同步到下游Oracle输入。下面我贴一下核心代码。
// 自定义的LCR格式转换工厂抽象类
private AbstractFormatTrans formatTrans;
/**
* 创建Connection连接
*
* @param url
* @param username
* @param passwd
* @return
*/
private Connection createConnection(String url, String username, String passwd) {
try {
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
return DriverManager.getConnection(url, username, passwd);
} catch (Exception e) {
LOGGER.error("fail to establish Oracle connection to: " + url, e);
return null;
}
}
/**
* 挂载连接到XStream Outbound Server
*
* @param outConn
* @return
*/
private XStreamOut attachOutbound(Connection outConn) {
XStreamOut xsOut = null;
try {
// when attach to an outbound server, client needs to tell outbound
// server the last position.
xsOut = XStreamOut.attach((OracleConnection) outConn, InitializeSystemParam.ORACLE_OUT_APPLY_NAME, lastPosition, XStreamOut.DEFAULT_MODE);
LOGGER.info("Attached to outbound server [{}]", InitializeSystemParam.ORACLE_OUT_APPLY_NAME);
if (lastPosition != null) {
LOGGER.info("Last Position is [{}]", getHexStr(lastPosition));
} else {
LOGGER.info("Oh~~~~, lastPosition is null");
}
return xsOut;
} catch (Exception e) {
LOGGER.error("cannot attach to outbound server: " + InitializeSystemParam.ORACLE_OUT_APPLY_NAME, e);
return null;
}
}
/**
* 从XStreamOut接收数据
*
* @param xsOutParam
* @return
* @throws StreamsException
*/
private LCR receiveData(XStreamOut xsOutParam) throws StreamsException {
LCR lcr = null;
try {
lcr = xsOutParam.receiveLCR(XStreamOut.ATTACH_EXTENDED_ID_MODE);
} catch (StreamsException e) {
LOGGER.error(e.getMessage());
detachOutbound(xsOutParam);
while (xsOut == null) {
xsOut = attachOutbound(outConn);
}
return receiveData(xsOut);
}
return lcr;
}
/**
* 循环获取LCR
*
* @param xsOut
*/
private void getLcrs(XStreamOut xsOut) {
if (null == xsOut) {
LOGGER.error("xstreamOut is null");
return;
}
try {
while (true) {
LCR lcr = receiveData(xsOut);
if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) {
// batch is active
assert lcr != null;
if (lcr instanceof RowLCR) {
// receive chunk from outbound
if (((RowLCR) lcr).hasChunkData()) {
// 大字段需要额外处理
ChunkColumnValue chunk = null;
Map<String, Object> map = new HashMap<>(8);
do {
chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE);
DefaultChunkColumnValue defaultChunkColumnValue = (DefaultChunkColumnValue) chunk;
String columnName = defaultChunkColumnValue.getColumnName();
int chunkType = defaultChunkColumnValue.getChunkType();
switch (chunkType) {
case 1:
// CLOB
CHAR charVal = (CHAR) defaultChunkColumnValue.getColumnData();
if (charVal.getBytes().length == 0) {
map.put(columnName, null);
} else {
map.put(columnName, charVal);
}
break;
case 2:
// BLOB
RAW rawVal = (RAW) defaultChunkColumnValue.getColumnData();
if (rawVal.getBytes().length == 0) {
map.put(columnName, null);
} else {
map.put(columnName, rawVal);
}
break;
default:
throw new StreamsException(String.format("XStream DefaultChunkColumnValue: unsupported chunk type %d", chunkType));
}
} while (!chunk.isEndOfRow());
// clob/blob 格式转换
formatTrans.trans(lcr, map);
} else {
// DML
formatTrans.filterTrans(lcr);
}
} else if (lcr instanceof DDLLCR) {
// DDL
formatTrans.filterTrans(lcr);
}
processedLowPosition = lcr.getPosition();
} else {
// batch is end
if (null != processedLowPosition) {
xsOut.setProcessedLowWatermark(processedLowPosition, XStreamOut.DEFAULT_MODE);
}
}
}
} catch (Exception e) {
LOGGER.error("exception when processing LCRs", e);
}
}
五、问题排查
XStream数据同步的状态可能会受到外部环境影响而变化,总的来说,当察觉到数据长时间不同步的时候,可以先直接查询表和视图:
-- xstream捕获进程表,status字段须为ENABLED
select capture_name,status from dba_capture;
-- xstream应用进程表,status字段须ENABLED
select apply_name,status from dba_apply;
-- 上面dba_capture和dba_apply的状态必须都是ENABLED,再查看动态视图的状态,STATE的值为[WAITING FOR TRANSACTION],则功能正常
SELECT CAPTURE_NAME, STATE,
TO_CHAR(CAPTURE_MESSAGE_CREATE_TIME, 'HH24:MI:SS MM/DD/YY') CREATE_MESSAGE,
TO_CHAR(ENQUEUE_MESSAGE_CREATE_TIME, 'HH24:MI:SS MM/DD/YY') ENQUEUE_MESSAGE
FROM V$XSTREAM_CAPTURE;
除了查表和视图以外,还可以直接查看Oracle的日志文件精准确定数据同步异常的原因,Trace文件和Alert日志文件。
六、总结
Oracle XStream由Oracle自身的组件和一组编程接口API组成,主要用于数据同步和迁移,是CDC的一个实现方案。同时它也是Oracle官方的数据同步产品OGG的内部实现方案之一,也是一个收费接口,在正式场合使用需要Oracle授权,性能会比LogMiner好很多。
参考文献:
转载自:https://juejin.cn/post/7163011935863046175