DBMS_STREAMS_ADM.MAINTAIN_TABLES

I recently worked with a Streams customer who needed to configure Streams dynamically as new tables were added. Unfortunately their schema design contained bi-directional, uni-directional and unreplicated tables so more obvious option of calling the MAINTAIN_SCHEMA procedure in DBMS_STREAMS_ADM was not available, at least in the short term.

Therefore I decided to investigate the MAINTAIN_TABLES procedure which is also in the DBMS_STREAMS_ADM package. MAINTAIN_TABLE sets up Streams replication for one or more tables. I have used this procedure a few times in the past. I like it because it eliminates the need to code a series of fiddly steps including DataPump replication of the table from source to target database. However, Streams replication already existed for a number of tables and therefore any use of MAINTAIN_TABLES needed to complement this configuration.

Prerequisites

There are a number of pre-requisites that should be observed when using MAINTAIN_TABLES. Both source and target databases should have global names and database links using the global name should exist in both directions. I have been told in the past that the global names are not mandatory, just highly recommended. Also there should be a Streams administrative user (often STRMADMIN) in each database with appropriate privileges. Many customers create a dedicated tablespace for objects used by the Streams administrative user. If you are not using Automatic Memory Management then there are some minimum values for the STREAMS_POOL_SIZE parameter which should be configured.

Documentation

This site had an existing application running 10.2.0.4 and upgrading was not an option. The first thing I learned was that the MAINTAIN_TABLES API is not fully documented in Oracle 10.2. Although the procedure has more than 20 parameters, only five were mentioned at all in the documentation. In this situation I normally look at the most recent version of the manual only nine of the parameters are documented.

The poor documentation means a lot of guesswork is required when working with MAINTAIN_TABLES. The first question is whether MAINTAIN_TABLES can be used to add new tables incrementally to existing configurations or if it is just a single use procedure (like a wizard). Fortunately the answer is that it can be used incrementally without damaging the existing configuration. However extreme care needs to be taken if it is necessary to rollback the recoverable scripts as I noticed some steps were dropping existing captures, queues and propagations. Before using DBMS_RECOVERABLE_SCRIPT to rollback I therefore recommend you check each step carefully.

Table Names

Some care needs to be taken with the TABLE_NAMES parameter as the format is non-standard. The schema name must be included with the table name. For example:

'SCHEMA1.TABLE1
'

My testing never extended to configuration of streams for a list of tables. I did read a couple of notes that suggested that MAINTAIN_TABLES was being used to configure Streams for tables in more than one schema, then it should be called individually for each schema - presumably a bug.

Capture, Propagation and Apply

If you already have a Streams configuration and you wish to use the existing capture, apply, propagation and underlying queues, you will need to do some research before invoking MAINTAIN_TABLES. If you do not explicitly specify any of these objects, MAINTAIN_TABLES will create new ones with system-defined names. MAINTAIN_TABLES is not great at working things out for itself; if you just specify a capture name it does try to figure out the capture queue table name etc, it just creates new ones again with system-defined names. So in Oracle 10.2 at least you need to tell it everything.

You can check the names of existing objects using:

SELECT capture_name FROM dba_capture;

SELECT propagation_name FROM dba_propagation;

SELECT apply_name FROM dba_apply;

SELECT owner, queue_table FROM dba_queue_tables;

SELECT owner,name FROM dba_queues;

If you make a mistake, you can drop any incorrectly created objects. However, this can be quite time-consuming as MAINTAIN_TABLES can create around 50 new objects even for a trivial table.

Capture, propagation and apply can be dropped using:

DBMS_CAPTURE_ADM.drop_capture
(
    capture_name => <capture_name>,
    drop_unused_rule_sets => TRUE
);

DBMS_PROPAGATION_ADM.drop_propagation
(
    propagation_name => <propagation_name>,
    drop_unused_rule_sets => TRUE
);

DBMS_APPLY_ADM.drop_apply
(
    apply_name => <apply_name>,
    drop_unused_rule_sets => TRUE
);

In each of the above examples I have set DROP_UNUSED_RULE_SETS to TRUE. MAINTAIN_TABLES creates a handful of rules for each object. Each rule is an individual data dictionary object and using the above parameter is the simplest way to clean up redundant ones.

Normally when a queue table is dropped, any queues supported by that table are also dropped. So the following can be used to drop a queue table and all of its queues:

DBMS_AQADM.DROP_QUEUE_TABLE
(
    queue_table => <queue_table_name>,
    force => TRUE
);

If you need to drop individual queues use:

DBMS_AQADM.DROP_QUEUE
(
    queue_name => <queue_name>
);

Instantiation

There are two ways to instantiate the table in the target database; using a Data Pump dump file and using Data Pump across the network. For large files you may want to use the dump file; for small or newly created tables the network option is easier.

The instantiation method is controlled by the INSTANTIATION parameter. To use the network set this parameter to DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK. The default value is DBMS_STREAMS_ADM.INSTANTIATION_TABLE which performs a full Data Pump export on the source and a full Data Pump import on the target. Note that if the table already exists on the target then it will not be modified.

One issue that I hit with instantiation was with tablespace names. After MAINTAIN_TABLES failed to create a table in the target database, I discovered that a tablespace name had been explicitly specified in the CREATE TABLE statement. The only way around this was to create a new tablespace with the correct name in the target database.

Note that there is a PROPAGATION_NAME parameter, but there is no parameter for the propagation owner. In my example a propagation already existed for the Streams administrative user. However if I called MAINTAIN_TABLES for any other user it would create a new propagation together with the underlying objects that was owned by that user. From this I conclude that in most cases it probably only makes sense for the Streams administrative user to call MAINTAIN_TABLES.

Scripts

The PERFORM_ACTIONS parameter specifies whether or not MAINTAIN_TABLES should actually make changes to the database or just generate a script listing the actions it would perform. Beware the default value for this parameter is TRUE (implement changes).

If PERFORM_ACTIONS is set to FALSE then you should also specify a SCRIPT_NAME and a SCRIPT_DIRECTORY_OBJECT. The latter is just an Oracle directory object - I used EXPDP_DIR which already existed.

Experience now suggests that the best way of working with MAINTAIN_TABLES is to set PERFORM_ACTIONS=FALSE in order to generate a script which can be inspected manually. When you are happy with this script, run MAINTAIN_TABLES again with the same parameters and PERFORM_ACTIONS=TRUE. You could alternatively run the script manually. However you would not benefit from the recoverable scripts feature which does allow simple errors to be detected and resolved.

A typical call to MAINTAIN_TABLES to generate a script follows:

BEGIN
  dbms_streams_adm.maintain_tables 
  (
    table_names => 'SCHEMA1.TABLE1',
    source_directory_object => NULL,
    destination_directory_object => NULL,
    source_database => 'EAST',
    destination_database => 'WEST',
    perform_actions => FALSE,
    script_name => 'table1.txt',
    script_directory_object => 'EXPDP_DIR',
    capture_name => 'CAPTURE1',
    capture_queue_table => 'CAPTURE_QUEUE_TABLE',
    capture_queue_name => 'CAPTURE_QUEUE',
    capture_queue_user => 'STRMADMIN',
    propagation_name => 'PROPAGATION1',
    apply_name => 'APPLY1',
    apply_queue_table => 'APPLY_QUEUE_TABLE',
    apply_queue_name => 'APPLY_QUEUE',
    apply_queue_user => 'STRMADMIN',
    include_ddl => TRUE,
    instantiation => DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK
  );
END;
/

The script generated by MAINTAIN_TABLES is long, but highly instructive.

SET ECHO ON
SET VERIFY OFF
WHENEVER SQLERROR EXIT SQL.SQLCODE;
-------------------------------------------------------------------
-- get TNSNAME and streams admin user details for both the databases
--------------------------------------------------------------------
PROMPT
PROMPT 'Enter TNS Name of site 1 as parameter 1:'
DEFINE db1                 = &1
PROMPT
PROMPT 'Enter streams admin username for site 1 as parameter 2:'
DEFINE strm_adm_db1        = &2
PROMPT
PROMPT 'Enter streams admin password for site 1 as parameter 3:'
DEFINE strm_adm_pwd_db1    = &3
PROMPT
PROMPT 'Enter TNS Name of site 2 as parameter 4:'
DEFINE db2                 = &4
PROMPT
PROMPT 'Enter streams admin username for site 2 as parameter 5:'
DEFINE strm_adm_db2        = &5
PROMPT
PROMPT 'Enter streams admin password for site 2 as parameter 6:'
DEFINE strm_adm_pwd_db2    = &6
-- connect as streams administrator to site 1
PROMPT Connecting as streams administrator to site 1
CONNECT &strm_adm_db1/&strm_adm_pwd_db1@&db1
--
-- Add supplemental log group for table "SCHEMA1"."TABLE1"
--
BEGIN
  EXECUTE IMMEDIATE 'ALTER TABLE "SCHEMA1"."TABLE1" ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, FOREIGN KEY, UNIQUE INDEX) COLUMNS';
EXCEPTION WHEN OTHERS THEN
  IF sqlcode = -32588 THEN NULL;  -- Logging attribute exists
  ELSE RAISE;
  END IF;
END;
/
--
-- Set up queue "STRMADMIN"."CAPTURE_QUEUE"
--
BEGIN
  dbms_streams_adm.set_up_queue(
    queue_table => '"STRMADMIN"."CAPTURE_QUEUE_TABLE"',
    storage_clause => NULL,
    queue_name => '"STRMADMIN"."CAPTURE_QUEUE"',
    queue_user => '"STRMADMIN"');
END;
/
--
-- PROPAGATE changes for table "SCHEMA1"."TABLE1"
--
DECLARE
  version_num            NUMBER := 0;
  release_num            NUMBER := 0;
  pos                    NUMBER;
  initpos                NUMBER;
  q2q                    BOOLEAN;
  stmt                   VARCHAR2(100);
  ver                    VARCHAR2(30);
  compat                 VARCHAR2(30); 
BEGIN
  BEGIN
    stmt := 'BEGIN dbms_utility.db_version@WEST(:ver , :compat); END;';
    EXECUTE IMMEDIATE stmt USING OUT ver, OUT compat;
    -- Extract version number
    initpos := 1;
    pos := INSTR(compat, '.', initpos, 1);
    IF pos > 0 THEN
      version_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos));
      initpos := pos + 1;
      -- Extract release number
      pos := INSTR(compat, '.', initpos, 1);
      IF pos > 0 THEN
        release_num := TO_NUMBER(SUBSTR(compat, initpos,
                                   pos - initpos));
        initpos := pos + 1;
      ELSE
        release_num := TO_NUMBER(SUBSTR(compat, initpos));
      END IF;
    ELSE
      version_num := TO_NUMBER(SUBSTR(compat, initpos));
    END IF;
    -- use q2q propagation if compatibility >= 10.2
    IF version_num > 10 OR
       (version_num = 10 AND release_num >=2) THEN
      q2q := TRUE;
    ELSE
      q2q := FALSE;
    END IF;
  EXCEPTION WHEN OTHERS THEN
    q2q := FALSE;
  END;
  dbms_streams_adm.add_table_propagation_rules(
    table_name => '"SCHEMA1"."TABLE1"',
    streams_name => '"PROPAGATION1"',
    source_queue_name => '"STRMADMIN"."CAPTURE_QUEUE"',
    destination_queue_name => '"STRMADMIN"."APPLY_QUEUE"@WEST',
    include_dml => TRUE,
    include_ddl => TRUE,
    include_tagged_lcr => TRUE,
    source_database => 'EAST',
    inclusion_rule => TRUE,
    and_condition => NULL,
    queue_to_queue => q2q);
END;
/
--
-- Disable propagation. Enable after destination has been setup
--
DECLARE
  q2q       VARCHAR2(10);
  destn_q   VARCHAR2(65);
BEGIN
  SELECT queue_to_queue INTO q2q
  FROM dba_propagation
  WHERE source_queue_owner = 'STRMADMIN' AND
        source_queue_name = 'CAPTURE_QUEUE' AND
        destination_queue_owner = 'STRMADMIN' AND
        destination_queue_name = 'APPLY_QUEUE' AND
        destination_dblink = 'WEST';
  IF q2q = 'TRUE' THEN
    destn_q := '"STRMADMIN"."APPLY_QUEUE"';
  ELSE
    destn_q := NULL;
  END IF;
  dbms_aqadm.disable_propagation_schedule(
    queue_name => '"STRMADMIN"."CAPTURE_QUEUE"',
    destination => 'WEST',
    destination_queue => destn_q);
EXCEPTION WHEN OTHERS THEN
  IF sqlcode = -24065 THEN NULL;  -- propagation already disabled
  ELSE RAISE;
  END IF;
END;
/
--
-- CAPTURE changes for table "SCHEMA1"."TABLE1"
--
DECLARE
  compat         VARCHAR2(512);
  initpos        NUMBER;
  pos            NUMBER;
  version_num    NUMBER;
  release_num    NUMBER;
  compat_func    VARCHAR2(65);
  get_compatible VARCHAR2(4000);
BEGIN
  SELECT value INTO compat
  FROM v$parameter
  WHERE name = 'compatible';
  -- Extract version number
  initpos := 1;
  pos := INSTR(compat, '.', initpos, 1);
  IF pos > 0 THEN
    version_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos));
    initpos := pos + 1;
    -- Extract release number
    pos := INSTR(compat, '.', initpos, 1);
    IF pos > 0 THEN
      release_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos));
      initpos := pos + 1;
    ELSE
      release_num := TO_NUMBER(SUBSTR(compat, initpos));
    END IF;
  END IF;
  IF version_num < 10 THEN
    compat_func := 'dbms_streams.compatible_9_2';
  ELSIF version_num = 10 THEN
    IF release_num < 2 THEN
      compat_func := 'dbms_streams.compatible_10_1';
    ELSE
      compat_func := 'dbms_streams.compatible_10_2';
    END IF;
  ELSE
    compat_func := 'dbms_streams.compatible_10_2';
  END IF;
  get_compatible := ':lcr.get_compatible() <= '||compat_func;
  dbms_streams_adm.add_table_rules(
    table_name => '"SCHEMA1"."TABLE1"',
    streams_type => 'CAPTURE',
    streams_name => '"CAPTURE_STREAM"',
    queue_name => '"STRMADMIN"."CAPTURE_QUEUE"',
    include_dml => TRUE,
    include_ddl => TRUE,
    include_tagged_lcr => TRUE,
    source_database => 'EAST',
    inclusion_rule => TRUE,
    and_condition => get_compatible);
END;
/
--
-- Start capture process CAPTURE_STREAM
--
BEGIN
  dbms_capture_adm.start_capture(
    capture_name => '"CAPTURE_STREAM"');
EXCEPTION WHEN OTHERS THEN
  IF sqlcode = -26666 THEN NULL;  -- CAPTURE process already running
  ELSE RAISE;
  END IF;
END;
/
-- connect as streams administrator to site 2
PROMPT Connecting as streams administrator to site 2
CONNECT &strm_adm_db2/&strm_adm_pwd_db2@&db2
--
-- Datapump TABLE MODE IMPORT (NETWORK)
--
DECLARE
  h1                NUMBER;       -- data pump job handle
  name_expr_list    VARCHAR2(32767); -- for metadata_filter
  object_name       dbms_utility.uncl_array; -- object names
  cnt               NUMBER;
  object_owner      VARCHAR2(30); -- owner
  job_state         VARCHAR2(30); -- job state
  status            ku$_Status; -- data pump status
  job_not_exist     exception;
  pragma            exception_init(job_not_exist, -31626);
BEGIN
  object_name(1) := 'TABLE1';
  object_owner := 'SCHEMA1';
  FOR idx IN 1..1LOOP
    SELECT COUNT(1) INTO cnt FROM all_tables@WEST
      WHERE owner = object_owner AND table_name = object_name(idx);
    -- table does not exist locally, need instantiation
    IF cnt = 0 THEN
      IF name_expr_list IS NULL THEN
        name_expr_list := '(';
      ELSE
        name_expr_list := name_expr_list ||',';
      END IF;
      name_expr_list := name_expr_list||''''||object_name(idx)||'''';
    END IF;
  ENDLOOP;
  IF name_expr_list IS NOT NULL THEN
    name_expr_list := name_expr_list || ')';
  ELSE
    COMMIT;
    RETURN;
  END IF;
  h1 := dbms_datapump.open(operation=>'IMPORT',job_mode=>'TABLE',
    remote_link=>'EAST',
    job_name=>NULL, version=>'COMPATIBLE');
  dbms_datapump.metadata_filter(
    handle=>h1,
    name=>'NAME_EXPR',
    value=>'IN'||name_expr_list);
  dbms_datapump.metadata_filter(
    handle=>h1,
    name=>'SCHEMA_EXPR',
    value=>'IN'||'(''SCHEMA1'')');
  dbms_datapump.start_job(h1);
  job_state := 'UNDEFINED';
  BEGIN
    WHILE (job_state != 'COMPLETED') AND (job_state != 'STOPPED') LOOP
      status := dbms_datapump.get_status(
        handle => h1,
        mask => dbms_datapump.ku$_status_job_error +
                dbms_datapump.ku$_status_job_status +
                dbms_datapump.ku$_status_wip,
        timeout => -1);
      job_state := status.job_status.state;
      dbms_lock.sleep(10);
    ENDLOOP;
  EXCEPTION WHEN job_not_exist THEN
    dbms_output.put_line('job finished');
  END;
  COMMIT;
EXCEPTION WHEN OTHERS THEN
  ROLLBACK;
  RAISE;
END;
/
--
-- Set up queue "STRMADMIN"."APPLY_QUEUE"
--
BEGIN
  dbms_streams_adm.set_up_queue(
    queue_table => '"STRMADMIN"."APPLY_QUEUE_TABLE"',
    storage_clause => NULL,
    queue_name => '"STRMADMIN"."APPLY_QUEUE"',
    queue_user => '"STRMADMIN"');
END;
/
--
-- APPLY changes for table "SCHEMA1"."TABLE1"
--
DECLARE
  compat         VARCHAR2(512);
  initpos        NUMBER;
  pos            NUMBER;
  version_num    NUMBER;
  release_num    NUMBER;
  compat_func    VARCHAR2(65);
  get_compatible VARCHAR2(4000);
BEGIN
  SELECT value INTO compat
  FROM v$parameter
  WHERE name = 'compatible';
  -- Extract version number
  initpos := 1;
  pos := INSTR(compat, '.', initpos, 1);
  IF pos > 0 THEN
    version_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos));
    initpos := pos + 1;
    -- Extract release number
    pos := INSTR(compat, '.', initpos, 1);
    IF pos > 0 THEN
      release_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos));
      initpos := pos + 1;
    ELSE
      release_num := TO_NUMBER(SUBSTR(compat, initpos));
    END IF;
  END IF;
  IF version_num < 10 THEN
    compat_func := 'dbms_streams.compatible_9_2';
  ELSIF version_num = 10 THEN
    IF release_num < 2 THEN
      compat_func := 'dbms_streams.compatible_10_1';
    ELSE
      compat_func := 'dbms_streams.compatible_10_2';
    END IF;
  ELSE
    compat_func := 'dbms_streams.compatible_10_2';
  END IF;
  get_compatible := ':lcr.get_compatible() <= '||compat_func;
  dbms_streams_adm.add_table_rules(
    table_name => '"SCHEMA1"."TABLE1"',
    streams_type => 'APPLY',
    streams_name => '"APPLY_STREAM"',
    queue_name => '"STRMADMIN"."APPLY_QUEUE"',
    include_dml => TRUE,
    include_ddl => TRUE,
    include_tagged_lcr => TRUE,
    source_database => 'EAST',
    inclusion_rule => TRUE,
    and_condition => get_compatible);
END;
/
--
-- Get tag value to be used for Apply
--
DECLARE
  found   BINARY_INTEGER := 0;
  tag_num NUMBER;
BEGIN
  -- Use the apply object id as the tag
  SELECT o.object_id INTO tag_num
  FROM dba_objects o
  WHERE o.object_name= 'APPLY_STREAM' AND
        o.object_type='APPLY';
 LOOP
    BEGIN
      found := 0;
      SELECT 1 INTO found FROM dba_apply
      WHERE apply_name != 'APPLY_STREAM' AND
            apply_tag = hextoraw(tag_num);
    EXCEPTION WHEN no_data_found THEN
      EXIT;
    END;
    EXIT WHEN (found = 0);
    tag_num := tag_num + 1;
  ENDLOOP;
  -- alter apply
  dbms_apply_adm.alter_apply(
    apply_name => '"APPLY_STREAM"',
    apply_tag => hextoraw(tag_num));
END;
/
--
-- Start apply process APPLY_STREAM
--
BEGIN
  dbms_apply_adm.start_apply(
    apply_name => '"APPLY_STREAM"');
EXCEPTION WHEN OTHERS THEN
  IF sqlcode = -26666 THEN NULL;  -- APPLY process already running
  ELSE RAISE;
  END IF;
END;
/
-- connect as streams administrator to site 1
PROMPT Connecting as streams administrator to site 1
CONNECT &strm_adm_db1/&strm_adm_pwd_db1@&db1
--
-- Enable propagation schedule for "STRMADMIN"."CAPTURE_QUEUE"
-- to WEST
--
DECLARE
  q2q       VARCHAR2(10);
  destn_q   VARCHAR2(65);
BEGIN
  SELECT queue_to_queue INTO q2q
  FROM dba_propagation
  WHERE source_queue_owner = 'STRMADMIN' AND
        source_queue_name = 'CAPTURE_QUEUE' AND
        destination_queue_owner = 'STRMADMIN' AND
        destination_queue_name = 'APPLY_QUEUE' AND
        destination_dblink = 'WEST';
  IF q2q = 'TRUE' THEN
    destn_q := '"STRMADMIN"."APPLY_QUEUE"';
  ELSE
    destn_q := NULL;
  END IF;
  dbms_aqadm.enable_propagation_schedule(
    queue_name => '"STRMADMIN"."CAPTURE_QUEUE"',
    destination => 'WEST',
    destination_queue => destn_q);
EXCEPTION WHEN OTHERS THEN
  IF sqlcode = -24064 THEN NULL; -- propagation already enabled
  ELSE RAISE;
  END IF;
END;
/

When you are happy with the actions described in the script, you can use MAINTAIN_TABLES to update the objects using:

BEGIN
  dbms_streams_adm.maintain_tables 
  (
    table_names => 'SCHEMA1.TABLE1',
    source_directory_object => NULL,
    destination_directory_object => NULL,
    source_database => 'EAST',
    destination_database => 'WEST',
    perform_actions => TRUE,
    capture_name => 'CAPTURE1',
    capture_queue_table => 'CAPTURE_QUEUE_TABLE',
    capture_queue_name => 'CAPTURE_QUEUE',
    capture_queue_user => 'STRMADMIN',
    propagation_name => 'PROPAGATION1',
    apply_name => 'APPLY1',
    apply_queue_table => 'APPLY_QUEUE_TABLE',
    apply_queue_name => 'APPLY_QUEUE',
    apply_queue_user => 'STRMADMIN',
    include_ddl => TRUE,
    instantiation => DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK
  );
END;
/

Calling MAINTAIN_TABLES from a PL/SQL procedure

Whilst I was initially testing MAINTAIN_TABLES I used anonymous PL/SQL blocks to invoke the procedure. However the customer requirement was to configure Streams from within a package. As a first step I therefore attempted to encapsulate the call to MAINTAIN_TABLES in a PL/SQL procedure. I spent several hours trying to get this simple change to work, but once the call was included in the procedure it failed repeatedly with:

ORA-00942: table or view does not exist
I never solved this. Event 10046 level 4 trace suggested the API was encountering a parse error when attempting to select from DBA_OBJECTS. I tried copying all parameters to temporary variables, using IN OUT parameters and setting PLSQL_OPTIMIZER_LEVEL to 0, but none were successful.