位置: 文档库 > 数据库 > 在Oracle 11g Streams单向传输的基础上配置Streams双向传输测试

在Oracle 11g Streams单向传输的基础上配置Streams双向传输测试

TriumphantDragon 上传于 2023-03-07 17:46

《在Oracle 11g Streams单向传输的基础上配置Streams双向传输测试》

一、引言

Oracle Streams是Oracle数据库提供的一种数据复制和共享技术,能够在不同数据库实例之间实现数据的实时或近实时同步。在Oracle 11g环境中,Streams单向传输功能已广泛应用于数据分发、备份和报表分离等场景。然而,随着业务复杂度的提升,双向数据同步的需求日益迫切,例如实现多数据中心间的数据双向同步、负载均衡或高可用性架构。本文基于已有的Oracle 11g Streams单向传输配置,详细阐述如何扩展为双向传输,并通过测试验证其可行性与性能。

二、Oracle Streams双向传输原理

1. 双向传输架构

双向传输的核心是通过两个单向Streams配置的组合实现。每个数据库实例同时作为源端(捕获变更)和目标端(应用变更),形成闭环同步。关键组件包括:

- 捕获进程(Capture Process):从重做日志(Redo Log)或归档日志中捕获DML/DDL变更。

- 传播进程(Propagation Process):将捕获的变更发送到目标数据库的队列。

- 应用进程(Apply Process):从队列中读取变更并应用到目标表。

- 规则集(Rule Set):定义捕获、传播和应用的变更范围。

2. 冲突避免机制

双向传输需解决数据冲突问题,常见策略包括:

- 时间戳或版本号:通过附加字段判断变更的先后顺序。

- 逻辑分区:按业务规则划分数据主权(如按区域或用户ID)。

- 自定义冲突处理程序:通过PL/SQL函数处理冲突。

三、环境准备

1. 测试环境配置

- 数据库版本:Oracle 11g Release 2 (11.2.0.4)

- 操作系统:Linux x86-64

- 网络配置:双向数据库实例间网络延迟

- 表结构:示例表`CUSTOMERS`(ID, NAME, UPDATE_TIME)

2. 初始化单向传输

假设已配置从数据库A到数据库B的单向传输,步骤如下:

(1)在数据库A上创建捕获进程:

BEGIN
  DBMS_CAPTURE_ADM.CREATE_CAPTURE(
    capture_name   => 'CAP_A',
    queue_name     => 'STREAMS_QUEUE_A',
    source_database=> NULL,
    start_capture  => TRUE);
END;
/

(2)在数据库A上创建传播进程:

BEGIN
  DBMS_PROPAGATION_ADM.CREATE_PROPAGATION(
    propagation_name => 'PROP_A_TO_B',
    source_queue_name => 'STREAMS_QUEUE_A',
    destination_queue => 'DB_B.STREAMS_QUEUE_B',
    service_name      => 'DB_B_SERVICE');
END;
/

(3)在数据库B上创建应用进程:

BEGIN
  DBMS_APPLY_ADM.CREATE_APPLY(
    apply_name       => 'APPLY_B',
    queue_name        => 'STREAMS_QUEUE_B',
    apply_captured   => TRUE);
END;
/

四、扩展为双向传输

1. 配置数据库B到数据库A的单向传输

(1)在数据库B上创建捕获进程:

BEGIN
  DBMS_CAPTURE_ADM.CREATE_CAPTURE(
    capture_name   => 'CAP_B',
    queue_name     => 'STREAMS_QUEUE_B',
    source_database=> NULL,
    start_capture  => TRUE);
END;
/

(2)在数据库B上创建传播进程:

BEGIN
  DBMS_PROPAGATION_ADM.CREATE_PROPAGATION(
    propagation_name => 'PROP_B_TO_A',
    source_queue_name => 'STREAMS_QUEUE_B',
    destination_queue => 'DB_A.STREAMS_QUEUE_A',
    service_name      => 'DB_A_SERVICE');
END;
/

(3)在数据库A上创建应用进程:

BEGIN
  DBMS_APPLY_ADM.CREATE_APPLY(
    apply_name       => 'APPLY_A',
    queue_name        => 'STREAMS_QUEUE_A',
    apply_captured   => TRUE);
END;
/

2. 配置规则集限制同步范围

为避免无限循环,需通过规则集过滤自生成的变更。例如,仅同步`CUSTOMERS`表中`ID`为偶数的记录到数据库B,奇数记录到数据库A:

(1)数据库A的捕获规则:

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name      => 'SCOTT.CUSTOMERS',
    streams_name    => 'CAP_A',
    streams_type    => 'CAPTURE',
    include_dml     => TRUE,
    include_ddl     => FALSE,
    source_database => NULL,
    rule_name       => 'RULE_CAP_A',
    condition       => 'MOD(ID, 2) = 0');
END;
/

(2)数据库B的捕获规则:

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name      => 'SCOTT.CUSTOMERS',
    streams_name    => 'CAP_B',
    streams_type    => 'CAPTURE',
    include_dml     => TRUE,
    include_ddl     => FALSE,
    source_database => NULL,
    rule_name       => 'RULE_CAP_B',
    condition       => 'MOD(ID, 2) = 1');
END;
/

3. 冲突处理配置

通过`UPDATE_TIME`字段解决冲突,优先应用时间较新的变更:

BEGIN
  DBMS_APPLY_ADM.SET_APPLY_HANDLER(
    apply_name    => 'APPLY_A',
    handler_type  => 'DML_HANDLER',
    handler_name  => 'CUSTOM_CONFLICT_HANDLER',
    enable        => TRUE);
END;
/

自定义冲突处理函数示例:

CREATE OR REPLACE FUNCTION custom_conflict_handler(
  p_commit_scn   IN NUMBER,
  p_source_rowid IN ROWID,
  p_target_rowid IN ROWID)
RETURN BOOLEAN IS
  v_source_time TIMESTAMP;
  v_target_time TIMESTAMP;
BEGIN
  SELECT UPDATE_TIME INTO v_source_time
  FROM SCOTT.CUSTOMERS WHERE ROWID = p_source_rowid;
  
  SELECT UPDATE_TIME INTO v_target_time
  FROM SCOTT.CUSTOMERS WHERE ROWID = p_target_rowid;
  
  IF v_source_time > v_target_time THEN
    RETURN TRUE; -- 应用源端变更
  ELSE
    RETURN FALSE; -- 忽略源端变更
  END IF;
END;
/

五、测试验证

1. 测试场景设计

- 场景1:数据库A更新偶数ID记录,验证数据库B同步。

- 场景2:数据库B更新奇数ID记录,验证数据库A同步。

- 场景3:同时更新相同记录,验证冲突处理。

2. 测试步骤

(1)在数据库A执行:

UPDATE SCOTT.CUSTOMERS SET NAME = 'A_UPDATED', UPDATE_TIME = SYSTIMESTAMP WHERE ID = 2;

(2)在数据库B查询:

SELECT NAME, UPDATE_TIME FROM SCOTT.CUSTOMERS WHERE ID = 2;

预期结果:数据库B的`NAME`和`UPDATE_TIME`与数据库A一致。

(3)在数据库B执行:

UPDATE SCOTT.CUSTOMERS SET NAME = 'B_UPDATED', UPDATE_TIME = SYSTIMESTAMP WHERE ID = 1;

(4)在数据库A查询:

SELECT NAME, UPDATE_TIME FROM SCOTT.CUSTOMERS WHERE ID = 1;

预期结果:数据库A的`NAME`和`UPDATE_TIME`与数据库B一致。

(5)同时更新相同记录:

数据库A:

UPDATE SCOTT.CUSTOMERS SET NAME = 'CONFLICT_A', UPDATE_TIME = TO_TIMESTAMP('2023-01-01 10:00:00', 'YYYY-MM-DD HH24:MI:SS') WHERE ID = 3;

数据库B:

UPDATE SCOTT.CUSTOMERS SET NAME = 'CONFLICT_B', UPDATE_TIME = TO_TIMESTAMP('2023-01-01 10:01:00', 'YYYY-MM-DD HH24:MI:SS') WHERE ID = 3;

预期结果:数据库A应用数据库B的变更(因时间更新)。

3. 性能测试

通过`V$STREAMS_CAPTURE`、`V$STREAMS_APPLY`等视图监控延迟:

SELECT capture_name, messages_captured, bytes_captured 
FROM V$STREAMS_CAPTURE;
SELECT apply_name, messages_applied, bytes_applied 
FROM V$STREAMS_APPLY;

六、常见问题与解决

1. 变更循环问题

现象:变更在两个数据库间无限循环。

解决:严格定义规则集条件,确保变更仅单向传播。

2. 冲突未处理

现象:应用进程报错`ORA-26786`(冲突检测失败)。

解决:检查冲突处理程序是否启用,或调整规则集。

3. 传播延迟过高

现象:`V$STREAMS_PROPAGATION`中`LATENCY`值过大。

解决:优化网络带宽,或调整传播进程参数(如`MAX_PARALLELISM`)。

七、总结

本文通过扩展Oracle 11g Streams单向传输配置,实现了双向数据同步。关键步骤包括:

1. 在两个数据库上分别配置捕获、传播和应用进程。

2. 通过规则集限制同步范围,避免循环。

3. 定义冲突处理机制,确保数据一致性。

4. 通过测试验证功能与性能。

双向传输适用于多数据中心、混合负载等场景,但需注意冲突管理和性能调优。未来工作可探索Oracle GoldenGate等更灵活的解决方案。

关键词:Oracle 11g、Streams技术、双向传输、数据同步、冲突处理、规则集、性能测试

简介:本文详细介绍了在Oracle 11g环境中基于已有Streams单向传输配置扩展为双向传输的方法,包括架构原理、环境准备、配置步骤、冲突处理和测试验证,适用于多数据中心数据同步场景。