likes
comments
collection
share

Oracle XStream初探

作者站长头像
站长
· 阅读数 15

一、背景

XStream是Oracle官方提供的,是以数据同步为目的,由Oracle数据库组件和应用程序编程接口(API)组成,这些数据库接口可以让我们的客户端具备实时感知数据(DDL/DML)变化的能力。可以用于将数据实时迁移到下游数据库,不管是Oracle还是非Oracle。

二、和LogMiner对比

Logminer是Oracle最早提供的一个日志解析工具,本意是让系统管理员或者DBA在不侵入数据库的情况下,对数据库的历史事情做一些探查分析,并没有进一步将它用于更广泛的用途的计划。目前大多数的第三方开源框架比如Debezium都默认集成了Logminer,主要是因为没有更好的办法去解析闭源商业数据库。 Logminer它主要有两个问题:

性能

Logminer运行在Oracle内部且在日志落地之后,不可避免会消耗数据库自身的算力去完成Logminer的解析工作,所以Oracle对Logminer做了比较严格的资源限制,尽量减少对于数据库主要工作的影响。比如只分配给Logminer一个cpu核心,将Logminer日志解析速度限制在1w/s以下,所以在一些严格的场合,这就不满足性能需求了,可能会使日志解析延迟达到分钟,小时甚至天的级别。

政策不确定

Oracle的一些特性受到版本影响很大,比如Logminer有一个模式叫“Continuous Mining”,可以让用户不关心日志解析的细节,可以直接接受输出的事件流,对于用户来说很方便,但是这个特性在12版本标记为弃用,在19版本去除了,所以很多东西不确定。

三、如何使用

XStream在使用前需要完成一系列配置工作,现在以非CDB数据库举例。

配置管理员和用户

-- Creating an XStream Administrator user

sqlplus / as sysdba
-- sqlplus sys/password@host:port/SID as sysdba

-- 为XStream创建管理员用户和表空间
CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/orcl/xstream_adm_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER xstrmadmin IDENTIFIED BY dbz DEFAULT TABLESPACE xstream_adm_tbs QUOTA UNLIMITED ON xstream_adm_tbs;
GRANT CREATE SESSION TO xstrmadmin;
BEGIN
   DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
    grantee                 => 'xstrmadmin',
    privilege_type          => 'CAPTURE',
    grant_select_privileges => TRUE,
    container             => 'ALL'
   );
END;


-- Creating the connector’s XStream user
-- 为XStream创建连接用户和表空间
CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/orcl/xstream_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER xstrm IDENTIFIED BY dbz DEFAULT TABLESPACE xstream_tbs QUOTA UNLIMITED ON xstream_tbs;
-- 为连接用户授权
GRANT CREATE TABLE TO xstrm;
GRANT CREATE SESSION TO xstrm;
GRANT SELECT ON V_$DATABASE to xstrm;
GRANT FLASHBACK ANY TABLE TO xstrm;
GRANT SELECT ANY TABLE to xstrm;
GRANT LOCK ANY TABLE TO xstrm;
grant select_catalog_role to xstrm;
GRANT EXECUTE_CATALOG_ROLE TO xstrm;

-- 开启补全日志
-- alter database add supplemental log data (all) columns;
alter database add supplemental log data (primary key, unique index) columns;

exit;

配置出站服务

-- Create an XStream Outbound Server

-- XStream管理员登录,创建一个出站服务,参数值[server_name]值可以自定义
sqlplus xstrmadmin/password@host:port/SID

DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := NULL;
schemas(1) := 'XSTRM';
DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
     server_name => 'dbzxout',
     table_names => tables,
     schema_names => schemas
   );
END;
/

exit;

绑定服务

-- Configure the XStream user account to connect to the XStream Outbound Server

-- DBA用户登录,将上一步创建的出站服务[server_name]和XStream连接用户绑定
sqlplus / as sysdba
-- sqlplus sys/password@host:port/SID as sysdba

BEGIN
DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
     server_name  => 'dbzxout',
     connect_user => 'xstrm'
   );
END;
/

exit;

-- 开启自动事务
set autocommit on

确定配置完毕

上面的工作做好之后,可以通过查询这个动态视图来检查XStream是不是处于就绪状态:

-- 查询捕获进程信息
SELECT CAPTURE_NAME, STATE,
	TO_CHAR(CAPTURE_MESSAGE_CREATE_TIME, 'HH24:MI:SS MM/DD/YY') CREATE_MESSAGE,
	TO_CHAR(ENQUEUE_MESSAGE_CREATE_TIME, 'HH24:MI:SS MM/DD/YY') ENQUEUE_MESSAGE
FROM V$XSTREAM_CAPTURE;

-- STATE字段显示为`WAITING FOR TRANSACTION`,说明我们的配置生效了

四、编程实例

官方文档中提供的示例比较简单,是直接使用API将上游Oracle输出同步到下游Oracle输入。下面我贴一下核心代码。

    // 自定义的LCR格式转换工厂抽象类
    private AbstractFormatTrans formatTrans;

    /**
     * 创建Connection连接
     *
     * @param url
     * @param username
     * @param passwd
     * @return
     */
    private Connection createConnection(String url, String username, String passwd) {
        try {
            DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
            return DriverManager.getConnection(url, username, passwd);
        } catch (Exception e) {
            LOGGER.error("fail to establish Oracle connection to:  " + url, e);
            return null;
        }
    }
    
    /**
     * 挂载连接到XStream Outbound Server
     *
     * @param outConn
     * @return
     */
    private XStreamOut attachOutbound(Connection outConn) {
        XStreamOut xsOut = null;
        try {
            // when attach to an outbound server, client needs to tell outbound
            // server the last position.
            xsOut = XStreamOut.attach((OracleConnection) outConn, InitializeSystemParam.ORACLE_OUT_APPLY_NAME, lastPosition, XStreamOut.DEFAULT_MODE);
            LOGGER.info("Attached to outbound server [{}]", InitializeSystemParam.ORACLE_OUT_APPLY_NAME);
            if (lastPosition != null) {
                LOGGER.info("Last Position is [{}]", getHexStr(lastPosition));
            } else {
                LOGGER.info("Oh~~~~, lastPosition is null");
            }
            return xsOut;
        } catch (Exception e) {
            LOGGER.error("cannot attach to outbound server: " + InitializeSystemParam.ORACLE_OUT_APPLY_NAME, e);
            return null;
        }
    }

    /**
     * 从XStreamOut接收数据
     *
     * @param xsOutParam
     * @return
     * @throws StreamsException
     */
    private LCR receiveData(XStreamOut xsOutParam) throws StreamsException {
        LCR lcr = null;
        try {
            lcr = xsOutParam.receiveLCR(XStreamOut.ATTACH_EXTENDED_ID_MODE);
        } catch (StreamsException e) {
            LOGGER.error(e.getMessage());
            detachOutbound(xsOutParam);
            while (xsOut == null) {
                xsOut = attachOutbound(outConn);
            }
            return receiveData(xsOut);
        }
        return lcr;
    }

    /**
     * 循环获取LCR
     *
     * @param xsOut
     */
    private void getLcrs(XStreamOut xsOut) {
        if (null == xsOut) {
            LOGGER.error("xstreamOut is null");
            return;
        }

        try {
            while (true) {
                LCR lcr = receiveData(xsOut);
                if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) {
                    // batch is active
                    assert lcr != null;
                    if (lcr instanceof RowLCR) {
                        // receive chunk from outbound
                        if (((RowLCR) lcr).hasChunkData()) {
                            // 大字段需要额外处理
                            ChunkColumnValue chunk = null;
                            Map<String, Object> map = new HashMap<>(8);
                            do {
                                chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE);
                                DefaultChunkColumnValue defaultChunkColumnValue = (DefaultChunkColumnValue) chunk;
                                String columnName = defaultChunkColumnValue.getColumnName();
                                int chunkType = defaultChunkColumnValue.getChunkType();
                                switch (chunkType) {
                                    case 1:
                                        // CLOB
                                        CHAR charVal = (CHAR) defaultChunkColumnValue.getColumnData();
                                        if (charVal.getBytes().length == 0) {
                                            map.put(columnName, null);
                                        } else {
                                            map.put(columnName, charVal);
                                        }
                                        break;
                                    case 2:
                                        // BLOB
                                        RAW rawVal = (RAW) defaultChunkColumnValue.getColumnData();
                                        if (rawVal.getBytes().length == 0) {
                                            map.put(columnName, null);
                                        } else {
                                            map.put(columnName, rawVal);
                                        }
                                        break;
                                    default:
                                        throw new StreamsException(String.format("XStream DefaultChunkColumnValue: unsupported chunk type %d", chunkType));
                                }
                            } while (!chunk.isEndOfRow());
                            // clob/blob 格式转换
                            formatTrans.trans(lcr, map);
                        } else {
                            // DML
                            formatTrans.filterTrans(lcr);
                        }
                    } else if (lcr instanceof DDLLCR) {
                        // DDL
                        formatTrans.filterTrans(lcr);
                    }
                    processedLowPosition = lcr.getPosition();
                } else {
                    // batch is end
                    if (null != processedLowPosition) {
                        xsOut.setProcessedLowWatermark(processedLowPosition, XStreamOut.DEFAULT_MODE);
                    }
                }
            }
        } catch (Exception e) {
            LOGGER.error("exception when processing LCRs", e);
        }
    }

五、问题排查

XStream数据同步的状态可能会受到外部环境影响而变化,总的来说,当察觉到数据长时间不同步的时候,可以先直接查询表和视图:

-- xstream捕获进程表,status字段须为ENABLED
select capture_name,status from dba_capture;
-- xstream应用进程表,status字段须ENABLED
select apply_name,status from dba_apply;

-- 上面dba_capture和dba_apply的状态必须都是ENABLED,再查看动态视图的状态,STATE的值为[WAITING FOR TRANSACTION],则功能正常
SELECT CAPTURE_NAME, STATE,
	TO_CHAR(CAPTURE_MESSAGE_CREATE_TIME, 'HH24:MI:SS MM/DD/YY') CREATE_MESSAGE,
	TO_CHAR(ENQUEUE_MESSAGE_CREATE_TIME, 'HH24:MI:SS MM/DD/YY') ENQUEUE_MESSAGE
FROM V$XSTREAM_CAPTURE;

除了查表和视图以外,还可以直接查看Oracle的日志文件精准确定数据同步异常的原因,Trace文件和Alert日志文件。

六、总结

Oracle XStream由Oracle自身的组件和一组编程接口API组成,主要用于数据同步和迁移,是CDC的一个实现方案。同时它也是Oracle官方的数据同步产品OGG的内部实现方案之一,也是一个收费接口,在正式场合使用需要Oracle授权,性能会比LogMiner好很多。

参考文献:

实时数据引擎系列(四): 关于 Oracle 与 Oracle CDC

XStream 同步状态查询

XStream API文档

XStream 19c 官方文档