I recently worked with a Streams customer who needed to configure Streams dynamically as new tables were added. Unfortunately their schema design contained bi-directional, uni-directional and unreplicated tables so more obvious option of calling the MAINTAIN_SCHEMA procedure in DBMS_STREAMS_ADM was not available, at least in the short term.
Therefore I decided to investigate the MAINTAIN_TABLES procedure which is also in the DBMS_STREAMS_ADM package. MAINTAIN_TABLE sets up Streams replication for one or more tables. I have used this procedure a few times in the past. I like it because it eliminates the need to code a series of fiddly steps including DataPump replication of the table from source to target database. However, Streams replication already existed for a number of tables and therefore any use of MAINTAIN_TABLES needed to complement this configuration.
There are a number of pre-requisites that should be observed when using MAINTAIN_TABLES. Both source and target databases should have global names and database links using the global name should exist in both directions. I have been told in the past that the global names are not mandatory, just highly recommended. Also there should be a Streams administrative user (often STRMADMIN) in each database with appropriate privileges. Many customers create a dedicated tablespace for objects used by the Streams administrative user. If you are not using Automatic Memory Management then there are some minimum values for the STREAMS_POOL_SIZE parameter which should be configured.
This site had an existing application running 10.2.0.4 and upgrading was not an option. The first thing I learned was that the MAINTAIN_TABLES API is not fully documented in Oracle 10.2. Although the procedure has more than 20 parameters, only five were mentioned at all in the documentation. In this situation I normally look at the most recent version of the manual only nine of the parameters are documented.
The poor documentation means a lot of guesswork is required when working with MAINTAIN_TABLES. The first question is whether MAINTAIN_TABLES can be used to add new tables incrementally to existing configurations or if it is just a single use procedure (like a wizard). Fortunately the answer is that it can be used incrementally without damaging the existing configuration. However extreme care needs to be taken if it is necessary to rollback the recoverable scripts as I noticed some steps were dropping existing captures, queues and propagations. Before using DBMS_RECOVERABLE_SCRIPT to rollback I therefore recommend you check each step carefully.
Some care needs to be taken with the TABLE_NAMES parameter as the format is non-standard. The schema name must be included with the table name. For example:
'SCHEMA1.TABLE1 '
My testing never extended to configuration of streams for a list of tables. I did read a couple of notes that suggested that MAINTAIN_TABLES was being used to configure Streams for tables in more than one schema, then it should be called individually for each schema - presumably a bug.
If you already have a Streams configuration and you wish to use the existing capture, apply, propagation and underlying queues, you will need to do some research before invoking MAINTAIN_TABLES. If you do not explicitly specify any of these objects, MAINTAIN_TABLES will create new ones with system-defined names. MAINTAIN_TABLES is not great at working things out for itself; if you just specify a capture name it does try to figure out the capture queue table name etc, it just creates new ones again with system-defined names. So in Oracle 10.2 at least you need to tell it everything.
You can check the names of existing objects using:
SELECT capture_name FROM dba_capture; SELECT propagation_name FROM dba_propagation; SELECT apply_name FROM dba_apply; SELECT owner, queue_table FROM dba_queue_tables; SELECT owner,name FROM dba_queues;
If you make a mistake, you can drop any incorrectly created objects. However, this can be quite time-consuming as MAINTAIN_TABLES can create around 50 new objects even for a trivial table.
Capture, propagation and apply can be dropped using:
DBMS_CAPTURE_ADM.drop_capture ( capture_name => <capture_name>, drop_unused_rule_sets => TRUE ); DBMS_PROPAGATION_ADM.drop_propagation ( propagation_name => <propagation_name>, drop_unused_rule_sets => TRUE ); DBMS_APPLY_ADM.drop_apply ( apply_name => <apply_name>, drop_unused_rule_sets => TRUE );
In each of the above examples I have set DROP_UNUSED_RULE_SETS to TRUE. MAINTAIN_TABLES creates a handful of rules for each object. Each rule is an individual data dictionary object and using the above parameter is the simplest way to clean up redundant ones.
Normally when a queue table is dropped, any queues supported by that table are also dropped. So the following can be used to drop a queue table and all of its queues:
DBMS_AQADM.DROP_QUEUE_TABLE ( queue_table => <queue_table_name>, force => TRUE );
If you need to drop individual queues use:
DBMS_AQADM.DROP_QUEUE ( queue_name => <queue_name> );
There are two ways to instantiate the table in the target database; using a Data Pump dump file and using Data Pump across the network. For large files you may want to use the dump file; for small or newly created tables the network option is easier.
The instantiation method is controlled by the INSTANTIATION parameter. To use the network set this parameter to DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK. The default value is DBMS_STREAMS_ADM.INSTANTIATION_TABLE which performs a full Data Pump export on the source and a full Data Pump import on the target. Note that if the table already exists on the target then it will not be modified.
One issue that I hit with instantiation was with tablespace names. After MAINTAIN_TABLES failed to create a table in the target database, I discovered that a tablespace name had been explicitly specified in the CREATE TABLE statement. The only way around this was to create a new tablespace with the correct name in the target database.
Note that there is a PROPAGATION_NAME parameter, but there is no parameter for the propagation owner. In my example a propagation already existed for the Streams administrative user. However if I called MAINTAIN_TABLES for any other user it would create a new propagation together with the underlying objects that was owned by that user. From this I conclude that in most cases it probably only makes sense for the Streams administrative user to call MAINTAIN_TABLES.
The PERFORM_ACTIONS parameter specifies whether or not MAINTAIN_TABLES should actually make changes to the database or just generate a script listing the actions it would perform. Beware the default value for this parameter is TRUE (implement changes).
If PERFORM_ACTIONS is set to FALSE then you should also specify a SCRIPT_NAME and a SCRIPT_DIRECTORY_OBJECT. The latter is just an Oracle directory object - I used EXPDP_DIR which already existed.
Experience now suggests that the best way of working with MAINTAIN_TABLES is to set PERFORM_ACTIONS=FALSE in order to generate a script which can be inspected manually. When you are happy with this script, run MAINTAIN_TABLES again with the same parameters and PERFORM_ACTIONS=TRUE. You could alternatively run the script manually. However you would not benefit from the recoverable scripts feature which does allow simple errors to be detected and resolved.
A typical call to MAINTAIN_TABLES to generate a script follows:
BEGIN dbms_streams_adm.maintain_tables ( table_names => 'SCHEMA1.TABLE1', source_directory_object => NULL, destination_directory_object => NULL, source_database => 'EAST', destination_database => 'WEST', perform_actions => FALSE, script_name => 'table1.txt', script_directory_object => 'EXPDP_DIR', capture_name => 'CAPTURE1', capture_queue_table => 'CAPTURE_QUEUE_TABLE', capture_queue_name => 'CAPTURE_QUEUE', capture_queue_user => 'STRMADMIN', propagation_name => 'PROPAGATION1', apply_name => 'APPLY1', apply_queue_table => 'APPLY_QUEUE_TABLE', apply_queue_name => 'APPLY_QUEUE', apply_queue_user => 'STRMADMIN', include_ddl => TRUE, instantiation => DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK ); END; /
The script generated by MAINTAIN_TABLES is long, but highly instructive.
SET ECHO ON SET VERIFY OFF WHENEVER SQLERROR EXIT SQL.SQLCODE; ------------------------------------------------------------------- -- get TNSNAME and streams admin user details for both the databases -------------------------------------------------------------------- PROMPT PROMPT 'Enter TNS Name of site 1 as parameter 1:' DEFINE db1 = &1 PROMPT PROMPT 'Enter streams admin username for site 1 as parameter 2:' DEFINE strm_adm_db1 = &2 PROMPT PROMPT 'Enter streams admin password for site 1 as parameter 3:' DEFINE strm_adm_pwd_db1 = &3 PROMPT PROMPT 'Enter TNS Name of site 2 as parameter 4:' DEFINE db2 = &4 PROMPT PROMPT 'Enter streams admin username for site 2 as parameter 5:' DEFINE strm_adm_db2 = &5 PROMPT PROMPT 'Enter streams admin password for site 2 as parameter 6:' DEFINE strm_adm_pwd_db2 = &6 -- connect as streams administrator to site 1 PROMPT Connecting as streams administrator to site 1 CONNECT &strm_adm_db1/&strm_adm_pwd_db1@&db1 -- -- Add supplemental log group for table "SCHEMA1"."TABLE1" -- BEGIN EXECUTE IMMEDIATE 'ALTER TABLE "SCHEMA1"."TABLE1" ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, FOREIGN KEY, UNIQUE INDEX) COLUMNS'; EXCEPTION WHEN OTHERS THEN IF sqlcode = -32588 THEN NULL; -- Logging attribute exists ELSE RAISE; END IF; END; / -- -- Set up queue "STRMADMIN"."CAPTURE_QUEUE" -- BEGIN dbms_streams_adm.set_up_queue( queue_table => '"STRMADMIN"."CAPTURE_QUEUE_TABLE"', storage_clause => NULL, queue_name => '"STRMADMIN"."CAPTURE_QUEUE"', queue_user => '"STRMADMIN"'); END; / -- -- PROPAGATE changes for table "SCHEMA1"."TABLE1" -- DECLARE version_num NUMBER := 0; release_num NUMBER := 0; pos NUMBER; initpos NUMBER; q2q BOOLEAN; stmt VARCHAR2(100); ver VARCHAR2(30); compat VARCHAR2(30); BEGIN BEGIN stmt := 'BEGIN dbms_utility.db_version@WEST(:ver , :compat); END;'; EXECUTE IMMEDIATE stmt USING OUT ver, OUT compat; -- Extract version number initpos := 1; pos := INSTR(compat, '.', initpos, 1); IF pos > 0 THEN version_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos)); initpos := pos + 1; -- Extract release number pos := INSTR(compat, '.', initpos, 1); IF pos > 0 THEN release_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos)); initpos := pos + 1; ELSE release_num := TO_NUMBER(SUBSTR(compat, initpos)); END IF; ELSE version_num := TO_NUMBER(SUBSTR(compat, initpos)); END IF; -- use q2q propagation if compatibility >= 10.2 IF version_num > 10 OR (version_num = 10 AND release_num >=2) THEN q2q := TRUE; ELSE q2q := FALSE; END IF; EXCEPTION WHEN OTHERS THEN q2q := FALSE; END; dbms_streams_adm.add_table_propagation_rules( table_name => '"SCHEMA1"."TABLE1"', streams_name => '"PROPAGATION1"', source_queue_name => '"STRMADMIN"."CAPTURE_QUEUE"', destination_queue_name => '"STRMADMIN"."APPLY_QUEUE"@WEST', include_dml => TRUE, include_ddl => TRUE, include_tagged_lcr => TRUE, source_database => 'EAST', inclusion_rule => TRUE, and_condition => NULL, queue_to_queue => q2q); END; / -- -- Disable propagation. Enable after destination has been setup -- DECLARE q2q VARCHAR2(10); destn_q VARCHAR2(65); BEGIN SELECT queue_to_queue INTO q2q FROM dba_propagation WHERE source_queue_owner = 'STRMADMIN' AND source_queue_name = 'CAPTURE_QUEUE' AND destination_queue_owner = 'STRMADMIN' AND destination_queue_name = 'APPLY_QUEUE' AND destination_dblink = 'WEST'; IF q2q = 'TRUE' THEN destn_q := '"STRMADMIN"."APPLY_QUEUE"'; ELSE destn_q := NULL; END IF; dbms_aqadm.disable_propagation_schedule( queue_name => '"STRMADMIN"."CAPTURE_QUEUE"', destination => 'WEST', destination_queue => destn_q); EXCEPTION WHEN OTHERS THEN IF sqlcode = -24065 THEN NULL; -- propagation already disabled ELSE RAISE; END IF; END; / -- -- CAPTURE changes for table "SCHEMA1"."TABLE1" -- DECLARE compat VARCHAR2(512); initpos NUMBER; pos NUMBER; version_num NUMBER; release_num NUMBER; compat_func VARCHAR2(65); get_compatible VARCHAR2(4000); BEGIN SELECT value INTO compat FROM v$parameter WHERE name = 'compatible'; -- Extract version number initpos := 1; pos := INSTR(compat, '.', initpos, 1); IF pos > 0 THEN version_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos)); initpos := pos + 1; -- Extract release number pos := INSTR(compat, '.', initpos, 1); IF pos > 0 THEN release_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos)); initpos := pos + 1; ELSE release_num := TO_NUMBER(SUBSTR(compat, initpos)); END IF; END IF; IF version_num < 10 THEN compat_func := 'dbms_streams.compatible_9_2'; ELSIF version_num = 10 THEN IF release_num < 2 THEN compat_func := 'dbms_streams.compatible_10_1'; ELSE compat_func := 'dbms_streams.compatible_10_2'; END IF; ELSE compat_func := 'dbms_streams.compatible_10_2'; END IF; get_compatible := ':lcr.get_compatible() <= '||compat_func; dbms_streams_adm.add_table_rules( table_name => '"SCHEMA1"."TABLE1"', streams_type => 'CAPTURE', streams_name => '"CAPTURE_STREAM"', queue_name => '"STRMADMIN"."CAPTURE_QUEUE"', include_dml => TRUE, include_ddl => TRUE, include_tagged_lcr => TRUE, source_database => 'EAST', inclusion_rule => TRUE, and_condition => get_compatible); END; / -- -- Start capture process CAPTURE_STREAM -- BEGIN dbms_capture_adm.start_capture( capture_name => '"CAPTURE_STREAM"'); EXCEPTION WHEN OTHERS THEN IF sqlcode = -26666 THEN NULL; -- CAPTURE process already running ELSE RAISE; END IF; END; / -- connect as streams administrator to site 2 PROMPT Connecting as streams administrator to site 2 CONNECT &strm_adm_db2/&strm_adm_pwd_db2@&db2 -- -- Datapump TABLE MODE IMPORT (NETWORK) -- DECLARE h1 NUMBER; -- data pump job handle name_expr_list VARCHAR2(32767); -- for metadata_filter object_name dbms_utility.uncl_array; -- object names cnt NUMBER; object_owner VARCHAR2(30); -- owner job_state VARCHAR2(30); -- job state status ku$_Status; -- data pump status job_not_exist exception; pragma exception_init(job_not_exist, -31626); BEGIN object_name(1) := 'TABLE1'; object_owner := 'SCHEMA1'; FOR idx IN 1..1LOOP SELECT COUNT(1) INTO cnt FROM all_tables@WEST WHERE owner = object_owner AND table_name = object_name(idx); -- table does not exist locally, need instantiation IF cnt = 0 THEN IF name_expr_list IS NULL THEN name_expr_list := '('; ELSE name_expr_list := name_expr_list ||','; END IF; name_expr_list := name_expr_list||''''||object_name(idx)||''''; END IF; ENDLOOP; IF name_expr_list IS NOT NULL THEN name_expr_list := name_expr_list || ')'; ELSE COMMIT; RETURN; END IF; h1 := dbms_datapump.open(operation=>'IMPORT',job_mode=>'TABLE', remote_link=>'EAST', job_name=>NULL, version=>'COMPATIBLE'); dbms_datapump.metadata_filter( handle=>h1, name=>'NAME_EXPR', value=>'IN'||name_expr_list); dbms_datapump.metadata_filter( handle=>h1, name=>'SCHEMA_EXPR', value=>'IN'||'(''SCHEMA1'')'); dbms_datapump.start_job(h1); job_state := 'UNDEFINED'; BEGIN WHILE (job_state != 'COMPLETED') AND (job_state != 'STOPPED') LOOP status := dbms_datapump.get_status( handle => h1, mask => dbms_datapump.ku$_status_job_error + dbms_datapump.ku$_status_job_status + dbms_datapump.ku$_status_wip, timeout => -1); job_state := status.job_status.state; dbms_lock.sleep(10); ENDLOOP; EXCEPTION WHEN job_not_exist THEN dbms_output.put_line('job finished'); END; COMMIT; EXCEPTION WHEN OTHERS THEN ROLLBACK; RAISE; END; / -- -- Set up queue "STRMADMIN"."APPLY_QUEUE" -- BEGIN dbms_streams_adm.set_up_queue( queue_table => '"STRMADMIN"."APPLY_QUEUE_TABLE"', storage_clause => NULL, queue_name => '"STRMADMIN"."APPLY_QUEUE"', queue_user => '"STRMADMIN"'); END; / -- -- APPLY changes for table "SCHEMA1"."TABLE1" -- DECLARE compat VARCHAR2(512); initpos NUMBER; pos NUMBER; version_num NUMBER; release_num NUMBER; compat_func VARCHAR2(65); get_compatible VARCHAR2(4000); BEGIN SELECT value INTO compat FROM v$parameter WHERE name = 'compatible'; -- Extract version number initpos := 1; pos := INSTR(compat, '.', initpos, 1); IF pos > 0 THEN version_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos)); initpos := pos + 1; -- Extract release number pos := INSTR(compat, '.', initpos, 1); IF pos > 0 THEN release_num := TO_NUMBER(SUBSTR(compat, initpos, pos - initpos)); initpos := pos + 1; ELSE release_num := TO_NUMBER(SUBSTR(compat, initpos)); END IF; END IF; IF version_num < 10 THEN compat_func := 'dbms_streams.compatible_9_2'; ELSIF version_num = 10 THEN IF release_num < 2 THEN compat_func := 'dbms_streams.compatible_10_1'; ELSE compat_func := 'dbms_streams.compatible_10_2'; END IF; ELSE compat_func := 'dbms_streams.compatible_10_2'; END IF; get_compatible := ':lcr.get_compatible() <= '||compat_func; dbms_streams_adm.add_table_rules( table_name => '"SCHEMA1"."TABLE1"', streams_type => 'APPLY', streams_name => '"APPLY_STREAM"', queue_name => '"STRMADMIN"."APPLY_QUEUE"', include_dml => TRUE, include_ddl => TRUE, include_tagged_lcr => TRUE, source_database => 'EAST', inclusion_rule => TRUE, and_condition => get_compatible); END; / -- -- Get tag value to be used for Apply -- DECLARE found BINARY_INTEGER := 0; tag_num NUMBER; BEGIN -- Use the apply object id as the tag SELECT o.object_id INTO tag_num FROM dba_objects o WHERE o.object_name= 'APPLY_STREAM' AND o.object_type='APPLY'; LOOP BEGIN found := 0; SELECT 1 INTO found FROM dba_apply WHERE apply_name != 'APPLY_STREAM' AND apply_tag = hextoraw(tag_num); EXCEPTION WHEN no_data_found THEN EXIT; END; EXIT WHEN (found = 0); tag_num := tag_num + 1; ENDLOOP; -- alter apply dbms_apply_adm.alter_apply( apply_name => '"APPLY_STREAM"', apply_tag => hextoraw(tag_num)); END; / -- -- Start apply process APPLY_STREAM -- BEGIN dbms_apply_adm.start_apply( apply_name => '"APPLY_STREAM"'); EXCEPTION WHEN OTHERS THEN IF sqlcode = -26666 THEN NULL; -- APPLY process already running ELSE RAISE; END IF; END; / -- connect as streams administrator to site 1 PROMPT Connecting as streams administrator to site 1 CONNECT &strm_adm_db1/&strm_adm_pwd_db1@&db1 -- -- Enable propagation schedule for "STRMADMIN"."CAPTURE_QUEUE" -- to WEST -- DECLARE q2q VARCHAR2(10); destn_q VARCHAR2(65); BEGIN SELECT queue_to_queue INTO q2q FROM dba_propagation WHERE source_queue_owner = 'STRMADMIN' AND source_queue_name = 'CAPTURE_QUEUE' AND destination_queue_owner = 'STRMADMIN' AND destination_queue_name = 'APPLY_QUEUE' AND destination_dblink = 'WEST'; IF q2q = 'TRUE' THEN destn_q := '"STRMADMIN"."APPLY_QUEUE"'; ELSE destn_q := NULL; END IF; dbms_aqadm.enable_propagation_schedule( queue_name => '"STRMADMIN"."CAPTURE_QUEUE"', destination => 'WEST', destination_queue => destn_q); EXCEPTION WHEN OTHERS THEN IF sqlcode = -24064 THEN NULL; -- propagation already enabled ELSE RAISE; END IF; END; /
When you are happy with the actions described in the script, you can use MAINTAIN_TABLES to update the objects using:
BEGIN dbms_streams_adm.maintain_tables ( table_names => 'SCHEMA1.TABLE1', source_directory_object => NULL, destination_directory_object => NULL, source_database => 'EAST', destination_database => 'WEST', perform_actions => TRUE, capture_name => 'CAPTURE1', capture_queue_table => 'CAPTURE_QUEUE_TABLE', capture_queue_name => 'CAPTURE_QUEUE', capture_queue_user => 'STRMADMIN', propagation_name => 'PROPAGATION1', apply_name => 'APPLY1', apply_queue_table => 'APPLY_QUEUE_TABLE', apply_queue_name => 'APPLY_QUEUE', apply_queue_user => 'STRMADMIN', include_ddl => TRUE, instantiation => DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK ); END; /
Whilst I was initially testing MAINTAIN_TABLES I used anonymous PL/SQL blocks to invoke the procedure. However the customer requirement was to configure Streams from within a package. As a first step I therefore attempted to encapsulate the call to MAINTAIN_TABLES in a PL/SQL procedure. I spent several hours trying to get this simple change to work, but once the call was included in the procedure it failed repeatedly with:
ORA-00942: table or view does not existI never solved this. Event 10046 level 4 trace suggested the API was encountering a parse error when attempting to select from DBA_OBJECTS. I tried copying all parameters to temporary variables, using IN OUT parameters and setting PLSQL_OPTIMIZER_LEVEL to 0, but none were successful.