Skip Headers
Oracle® Streams Concepts and Administration
10g Release 2 (10.2)

Part Number B14229-04
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Contact Us

Go to previous page
Previous
Go to next page
Next
View PDF

27 Single-Database Capture and Apply Example

This chapter illustrates an example of a single database that captures changes to a table, reenqueues the captured changes into a queue, and then uses a DML handler during apply to insert a subset of the changes into a different table.

This chapter contains these topics:

Overview of the Single-Database Capture and Apply Example

The example in this chapter illustrates using Streams to capture and apply data manipulation language (DML) changes at a single database named cpap.net. Specifically, this example captures DML changes to the employees table in the hr schema, placing row logical change records (LCRs) into a queue named streams_queue. Next, an apply process dequeues these row LCRs from the same queue, reenqueues them into this queue, and sends them to a DML handler.

When the row LCRs are captured, they reside in the buffered queue and cannot be dequeued explicitly. After the row LCRs are reenqueued during apply, they are available for explicit dequeue by an application. This example does not create the application that dequeues these row LCRs.

This example illustrates a DML handler that inserts records of deleted employees into an emp_del table in the hr schema. This example assumes that the emp_del table is used to retain the records of all deleted employees. The DML handler is used to determine whether each row LCR contains a DELETE statement. When the DML handler finds a row LCR containing a DELETE statement, it converts the DELETE into an INSERT on the emp_del table and then inserts the row.

Figure 27-1 provides an overview of the environment.

Figure 27-1 Single Database Capture and Apply Example

Description of Figure 27-1 follows
Description of "Figure 27-1 Single Database Capture and Apply Example"

See Also:

Prerequisites

The following prerequisites must be completed before you begin the example in this chapter.

Set Up the Environment

Complete the following steps to create the hr.emp_del table, set up the Streams administrator, and create the queue.

  1. Show Output and Spool Results

  2. Create the hr.emp_del Table

  3. Set Up Users at cpap.net

  4. Create the ANYDATA Queue at cpap.net

  5. Check the Spool Results

Note:

If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line after this note to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database.
/************************* BEGINNING OF SCRIPT ******************************

Step 1   Show Output and Spool Results

Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL streams_setup_capapp.out

/*

Step 2   Create the hr.emp_del Table

Connect to cpap.net as the hr user.

*/
 
CONNECT hr/hr@cpap.net

/*

Create the hr.emp_del table. The columns in the emp_del table is the same as the columns in the employees table, except for one added timestamp column that will record the date when a row is inserted into the emp_del table.

*/

CREATE TABLE emp_del( 
  employee_id    NUMBER(6), 
  first_name     VARCHAR2(20), 
  last_name      VARCHAR2(25), 
  email          VARCHAR2(25), 
  phone_number   VARCHAR2(20), 
  hire_date      DATE, 
  job_id         VARCHAR2(10), 
  salary         NUMBER(8,2), 
  commission_pct NUMBER(2,2), 
  manager_id     NUMBER(6), 
  department_id  NUMBER(4),
  timestamp      DATE);

CREATE UNIQUE INDEX emp_del_id_pk ON emp_del (employee_id);

ALTER TABLE emp_del ADD (CONSTRAINT emp_del_id_pk PRIMARY KEY (employee_id));

/*

Step 3   Set Up Users at cpap.net

Connect to cpap.net as SYSTEM user.

*/
 
CONNECT SYSTEM/MANAGER@cpap.net

/*

Create the Streams administrator named strmadmin and grant this user the necessary privileges. These privileges enable the user to manage queues, execute subprograms in packages related to Streams, create rule sets, create rules, and monitor the Streams environment by querying data dictionary views and queue table. You can choose a different name for this user.

In this example, the Streams administrator will be the apply user for the apply process and must be able to apply changes to the hr.emp_del table. Therefore, the Streams administrator is granted ALL privileges on this table.

Note:

  • For security purposes, use a password other than strmadminpw for the Streams administrator.

  • The ACCEPT command must appear on a single line in the script.

*/

GRANT DBA TO strmadmin IDENTIFIED BY strmadminpw;

ACCEPT streams_tbs PROMPT 'Enter Streams administrator tablespace on cpap.net: '

ALTER USER strmadmin DEFAULT TABLESPACE &streams_tbs
                     QUOTA UNLIMITED ON &streams_tbs;

/*

This example executes a subprogram in a Streams packages within a stored procedure. Specifically, the emp_dq procedure created in Step 8 runs the DEQUEUE procedure in the DBMS_STREAMS_MESSAGING package. Therefore, the Streams administrator must be granted EXECUTE privilege explicitly on the package. In this case, EXECUTE privilege cannot be granted through a role. The GRANT_ADMIN_PRIVILEGE procedure grants EXECUTE on all Streams packages, as well as other privileges relevant to Streams.

*/

BEGIN
  DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(
    grantee          => 'strmadmin',    
    grant_privileges => true);
END;
/

/*

Grant the Streams administrator all privileges on the emp_del table, because the Streams administrator will be the apply user and must be able to insert records into this table. Alternatively, you can alter the apply process to specify that hr is the apply user.

*/

GRANT ALL ON hr.emp_del TO STRMADMIN;

/*

Step 4   Create the ANYDATA Queue at cpap.net

Connect to cpap.net as the strmadmin user.

*/

CONNECT strmadmin/strmadminpw@cpap.net

/*

Run the SET_UP_QUEUE procedure to create a queue named streams_queue at cpap.net. This queue is an ANYDATA queue that will stage the captured changes to be dequeued by an apply process and the user-enqueued changes to be dequeued by a dequeue procedure.

Running the SET_UP_QUEUE procedure performs the following actions:

*/

BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE(
    queue_table  => 'strmadmin.streams_queue_table',
    queue_name   => 'strmadmin.streams_queue');
END;
/

/*

Step 5   Check the Spool Results

Check the streams_setup_capapp.out spool file to ensure that all actions finished successfully after this script is completed.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

Configure Capture and Apply

Complete the following steps to capture changes to the hr.employees table and apply these changes on single database in a customized way using a DML handler.

  1. Show Output and Spool Results

  2. Configure the Capture Process at cpap.net

  3. Set the Instantiation SCN for the hr.employees Table

  4. Create the DML Handler Procedure

  5. Set the DML Handler for the hr.employees Table

  6. Create a Messaging Client for the Queue

  7. Configure the Apply Process at cpap.net

  8. Create a Procedure to Dequeue the Messages

  9. Start the Apply Process at cpap.net

  10. Start the Capture Process at cpap.net

  11. Check the Spool Results

Note:

If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line after this note to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect the database.
/************************* BEGINNING OF SCRIPT ******************************

Step 1   Show Output and Spool Results

Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL streams_config_capapp.out

/*

Step 2   Configure the Capture Process at cpap.net

Connect to cpap.net as the strmadmin user.

*/
 
CONNECT strmadmin/strmadminpw@cpap.net

/*

Configure the capture process to capture DML changes to the hr.employees table at cpap.net. This step creates the capture process and adds a rule to its positive rule set that instructs the capture process to capture DML changes to this table. This step also prepares the hr.employees table for instantiation and enables supplemental logging for any primary key, unique key, bitmap index, and foreign key columns in the table.

Supplemental logging places additional information in the redo log for changes made to tables. The apply process needs this extra information to perform some operations, such as unique row identification.

*/

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => 'hr.employees',   
    streams_type   => 'capture',
    streams_name   => 'capture_emp',
    queue_name     => 'strmadmin.streams_queue',
    include_dml    =>  true,
    include_ddl    =>  false,
    inclusion_rule =>  true);
END;
/

/*

Step 3   Set the Instantiation SCN for the hr.employees Table

Because this example captures and applies changes in a single database, no instantiation is necessary. However, the apply process at the cpap.net database still must be instructed to apply changes that were made to the hr.employees table after a specific system change number (SCN).

This example uses the GET_SYSTEM_CHANGE_NUMBER function in the DBMS_FLASHBACK package to obtain the current SCN for the database. This SCN is used to run the SET_TABLE_INSTANTIATION_SCN procedure in the DBMS_APPLY_ADM package.

The SET_TABLE_INSTANTIATION_SCN procedure controls which LCRs for a table are ignored by an apply process and which LCRs for a table are applied by an apply process. If the commit SCN of an LCR for a table from a source database is less than or equal to the instantiation SCN for that table at a destination database, then the apply process at the destination database discards the LCR. Otherwise, the apply process applies the LCR. In this example, the cpap.net database is both the source database and the destination database.

The apply process will apply transactions to the hr.employees table with SCNs that were committed after SCN obtained in this step.

Note:

The hr.employees table also must be prepared for instantiation. This preparation was done automatically when the the capture process was configured with a rule to capture DML changes to the hr.employees table in Step 2.
*/

DECLARE
  iscn  NUMBER;         -- Variable to hold instantiation SCN value
BEGIN
  iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
  DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
    source_object_name    => 'hr.employees',
    source_database_name  => 'cpap.net',
    instantiation_scn     => iscn);
END;
/

/*

Step 4   Create the DML Handler Procedure

This step creates the emp_dml_handler procedure. This procedure will be the DML handler for DELETE changes to the hr.employees table. It converts any row LCR containing a DELETE command type into an INSERT row LCR and then inserts the converted row LCR into the hr.emp_del table by executing the row LCR.

*/

CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN ANYDATA) IS
  lcr          SYS.LCR$_ROW_RECORD;
  rc           PLS_INTEGER;
  command      VARCHAR2(30);
  old_values   SYS.LCR$_ROW_LIST;
BEGIN    
  -- Access the LCR
  rc := in_any.GETOBJECT(lcr);
  -- Get the object command type
  command := lcr.GET_COMMAND_TYPE();
  -- Check for DELETE command on the hr.employees table
  IF command = 'DELETE' THEN
    -- Set the command_type in the row LCR to INSERT
    lcr.SET_COMMAND_TYPE('INSERT');
    -- Set the object_name in the row LCR to EMP_DEL
    lcr.SET_OBJECT_NAME('EMP_DEL');
    -- Get the old values in the row LCR
    old_values := lcr.GET_VALUES('old');
    -- Set the old values in the row LCR to the new values in the row LCR
    lcr.SET_VALUES('new', old_values);
    -- Set the old values in the row LCR to NULL
    lcr.SET_VALUES('old', NULL);
    -- Add a SYSDATE value for the timestamp column
    lcr.ADD_COLUMN('new', 'TIMESTAMP', ANYDATA.ConvertDate(SYSDATE));
    -- Apply the row LCR as an INSERT into the hr.emp_del table
    lcr.EXECUTE(true);
  END IF;
END;
/

/*

Step 5   Set the DML Handler for the hr.employees Table

Set the DML handler for the hr.employees table to the procedure created in Step 4. Notice that the DML handler must be set separately for each possible operation on the table: INSERT, UPDATE, and DELETE.

*/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'INSERT',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL,
    apply_name          => NULL);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'UPDATE',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL,
    apply_name          => NULL);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'DELETE',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL,
    apply_name          => NULL);
END;
/

/*

Step 6   Create a Messaging Client for the Queue

Create a messaging client that can be used by an application to dequeue the reenqueued messages. A messaging client must be specified before the messages can be reenqueued into the queue.

*/

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => 'hr.employees',   
    streams_type   => 'dequeue',
    streams_name   => 'hr',
    queue_name     => 'strmadmin.streams_queue',
    include_dml    =>  true,
    include_ddl    =>  false,
    inclusion_rule =>  true);
END;
/

/*

Step 7   Configure the Apply Process at cpap.net

Create an apply process to apply DML changes to the hr.employees table. Although the DML handler for the apply process causes deleted employees to be inserted into the emp_del table, this rule specifies the employees table, because the row LCRs in the queue contain changes to the employees table, not the emp_del table. When you run the ADD_TABLE_RULES procedure to create the apply process, the out parameter dml_rule_name contains the name of the DML rule created. This rule name is then passed to the SET_ENQUEUE_DESTINATION procedure.

The SET_ENQUEUE_DESTINATION procedure in the DBMS_APPLY_ADM package specifies that any apply process using the DML rule generated by ADD_TABLE_RULES will enqueue messages that satisfy this rule into streams_queue. In this case, the DML rule is for row LCRs with DML changes to the hr.employees table. A local queue other than the apply process queue can be specified if appropriate.

*/

DECLARE
          emp_rule_name_dml  VARCHAR2(30);
          emp_rule_name_ddl  VARCHAR2(30);
BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name      => 'hr.employees',
    streams_type    => 'apply', 
    streams_name    => 'apply_emp',
    queue_name      => 'strmadmin.streams_queue',
    include_dml     =>  true,
    include_ddl     =>  false,
    source_database => 'cpap.net',
    dml_rule_name   => emp_rule_name_dml,
    ddl_rule_name   => emp_rule_name_ddl);
  DBMS_APPLY_ADM.SET_ENQUEUE_DESTINATION(
    rule_name               =>  emp_rule_name_dml,
    destination_queue_name  =>  'strmadmin.streams_queue');
END;
/

/*

Step 8   Create a Procedure to Dequeue the Messages

The emp_dq procedure created in this step can be used to dequeue the messages that are reenqueued by the apply process. In Step 7, the SET_ENQUEUE_DESTINATION procedure was used to instruct the apply process to enqueue row LCRs containing changes to the hr.employees table into streams_queue. When the emp_dq procedure is executed, it dequeues each row LCR in the queue and displays the type of command in the row LCR, either INSERT, UPDATE, or DELETE. Any information in the row LCRs can be accessed and displayed, not just the command type.

See Also:

"Displaying Detailed Information About Apply Errors" for more information about displaying information in LCRs
*/

CREATE OR REPLACE PROCEDURE emp_dq (consumer IN VARCHAR2) AS
  msg            ANYDATA;
  row_lcr        SYS.LCR$_ROW_RECORD;
  num_var        pls_integer;
  more_messages  BOOLEAN := true;
  navigation     VARCHAR2(30);
BEGIN
  navigation := 'FIRST MESSAGE';
  WHILE (more_messages) LOOP
    BEGIN
      DBMS_STREAMS_MESSAGING.DEQUEUE(
        queue_name   => 'strmadmin.streams_queue',
        streams_name => consumer,
        payload      => msg,
        navigation   => navigation,
        wait         => DBMS_STREAMS_MESSAGING.NO_WAIT);
      IF msg.GETTYPENAME() = 'SYS.LCR$_ROW_RECORD' THEN
        num_var := msg.GetObject(row_lcr);   
        DBMS_OUTPUT.PUT_LINE(row_lcr.GET_COMMAND_TYPE || ' row LCR dequeued');
      END IF;
      navigation := 'NEXT MESSAGE';
    COMMIT;
    EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN
                navigation := 'NEXT TRANSACTION';
              WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN
                more_messages := false;
                DBMS_OUTPUT.PUT_LINE('No more messages.');
              WHEN OTHERS THEN
                RAISE; 
    END;
  END LOOP;
END;
/

/*

Step 9   Start the Apply Process at cpap.net

Set the disable_on_error parameter to n so that the apply process will not be disabled if it encounters an error, and start the apply process at cpap.net.

*/

BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name  => 'apply_emp', 
    parameter   => 'disable_on_error', 
    value       => 'n');
END;
/
 
BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    apply_name  => 'apply_emp');
END;
/

/*

Step 10   Start the Capture Process at cpap.net

Start the capture process at cpap.net.

*/

BEGIN
  DBMS_CAPTURE_ADM.START_CAPTURE(
    capture_name  => 'capture_emp');
END;
/

/*

Step 11   Check the Spool Results

Check the streams_config_capapp.out spool file to ensure that all actions finished successfully after this script is completed.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

Make DML Changes, Query for Results, and Dequeue Messages

Complete the following steps to confirm that apply process is configured correctly, make DML changes to the hr.employees table, query for the resulting inserts into the hr.emp_del table and the reenqueued messages in the streams_queue_table, and dequeue the messages that were reenqueued by the DML handler.


Step 1   Confirm the Rule Action Context

Step 7 creates an apply process rule that specifies a destination queue into which LCRs that satisfy the rule are enqueued. In this case, LCRs that satisfy the rule are row LCRs with changes to the hr.employees table.

Complete the following steps to confirm that the rule specifies a destination queue:

  1. Run the following query to determine the name of the rule for DML changes to the hr.employees table used by the apply process apply_emp:

    CONNECT strmadmin/strmadminpw@cpap.net
    
    SELECT RULE_OWNER, RULE_NAME FROM DBA_STREAMS_RULES 
      WHERE STREAMS_NAME = 'APPLY_EMP' AND
            STREAMS_TYPE = 'APPLY' AND
            SCHEMA_NAME  = 'HR' AND
            OBJECT_NAME  = 'EMPLOYEES' AND
            RULE_TYPE    = 'DML'
      ORDER BY RULE_NAME;
    

    Your output looks similar to the following:

    RULE_OWNER                     RULE_NAME
    ------------------------------ ------------------------------
    STRMADMIN                      EMPLOYEES3
    
  2. View the action context for the rule returned by the query in Step 1:

    COLUMN RULE_OWNER HEADING 'Rule Owner' FORMAT A15
    COLUMN DESTINATION_QUEUE_NAME HEADING 'Destination Queue' FORMAT A30
    
    SELECT RULE_OWNER, DESTINATION_QUEUE_NAME
      FROM DBA_APPLY_ENQUEUE
      WHERE RULE_NAME = 'EMPLOYEES3'
      ORDER BY DESTINATION_QUEUE_NAME;
    

    Make sure you substitute the rule name returned in Step 1 in the WHERE clause. Your output looks similar to the following:

    Rule Owner      Destination Queue
    --------------- ------------------------------
    STRMADMIN       "STRMADMIN"."STREAMS_QUEUE"
    

    The output should show that LCRs that satisfy the apply process rule are enqueued into streams_queue.

Step 2   Perform an INSERT, UPDATE, and DELETE on hr.employees

Make the following DML changes to the hr.employees table.

CONNECT hr/hr@cpap.net

INSERT INTO hr.employees VALUES(207, 'JOHN', 'SMITH', 'JSMITH@MYCOMPANY.COM', 
  NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110);
COMMIT;

UPDATE hr.employees SET salary=5999 WHERE employee_id=206;
COMMIT;

DELETE FROM hr.employees WHERE employee_id=207;
COMMIT;

Step 3   Query the hr.emp_del Table and the streams_queue_table

After some time passes to allow for capture and apply of the changes performed in the previous step, run the following queries to see the results:

CONNECT strmadmin/strmadminpw@cpap.net

SELECT employee_id, first_name, last_name, timestamp 
  FROM hr.emp_del ORDER BY employee_id;

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME 
  FROM AQ$STREAMS_QUEUE_TABLE ORDER BY MSG_ID;

When you run the first query, you should see a record for the employee with an employee_id of 207. This employee was deleted in the previous step. When you run the second query, you should see the reenqueued messages resulting from all of the changes in the previous step, and the MSG_STATE should be READY for these messages.

Step 4   Dequeue Messages Reenqueued by the DML Handler

Use the emp_dq procedure to dequeue the messages that were reenqueued by the DML handler.

SET SERVEROUTPUT ON SIZE 100000

EXEC emp_dq('HR');

For each row changed by a DML statement, one line is returned, and each line states the command type of the change (either INSERT, UPDATE, or DELETE). If you repeat the query on the queue table in Step 3 after the messages are dequeued, then the dequeued messages should have been consumed. That is, either the MSG_STATE should be PROCESSED for these messages, or the messages should no longer be in the queue.

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME 
  FROM AQ$STREAMS_QUEUE_TABLE ORDER BY MSG_ID;