Skip Headers
Oracle® Database XStream Guide
11g Release 2 (11.2)

Part Number E16545-05
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Contact Us

Go to previous page
Previous
Go to next page
Next
PDF · Mobi · ePub

3 Configuring XStream

This chapter describes configuring the Oracle Database components that are used by XStream. This chapter also includes sample client applications that communicate with an XStream outbound server and inbound server.

This chapter contains these topics:

See Also:

Preparing for XStream

This section describes preparing for an XStream configuration.

This section contains the following topics:

Granting Privileges for the XStream Administrator

An XStream administrator configures and manages XStream components in an XStream Out or XStream In environment. This section describes configuring an XStream administrator by granting a user the appropriate privileges. You must configure an XStream administrator in each Oracle database included in the XStream configuration.

Prerequisites

Before configuring an XStream administrator, ensure that the following prerequisites are met:

  • Ensure that you can log in to each database in the XStream configuration as an administrative user who can create users, grant privileges, and create tablespaces.

  • Identify a user who will be the XStream administrator. Either create a new user with the appropriate privileges or grant these privileges to an existing user.

    Do not use the SYS or SYSTEM user as an XStream administrator, and ensure that the XStream administrator does not use the SYSTEM tablespace as its default tablespace.

  • If a new tablespace is required for the XStream administrator, then ensure that there is enough disk space on each computer system in the XStream configuration for the tablespace. The recommended size of the tablespace is 25 MB.

Assumptions

This section makes the following assumptions:

  • The username of the XStream administrator is xstrmadmin.

  • The tablespace used by the XStream administrator is xstream_tbs.

To configure an XStream administrator: 

  1. In SQL*Plus, connect as an administrative user who can create users, grant privileges, and create tablespaces. Remain connected as this administrative user for all subsequent steps.

    See Also:

    Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus
  2. Either create a tablespace for the XStream administrator or use an existing tablespace.

    This tablespace stores any objects created in the XStream administrator's schema, including any spillover of messages from the buffered queues owned by the schema.

    For example, the following statement creates a new tablespace for the XStream administrator:

    CREATE TABLESPACE xstream_tbs DATAFILE '/usr/oracle/dbs/xstream_tbs.dbf' 
      SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    
  3. Create a new user to act as the XStream administrator or identify an existing user.

    For example, to create a user named xstrmadmin and specify that this user uses the xstream_tbs tablespace, run the following statement:

    CREATE USER xstrmadmin IDENTIFIED BY password 
      DEFAULT TABLESPACE xstream_tbs
      QUOTA UNLIMITED ON xstream_tbs;
    

    Note:

    Enter an appropriate password for the administrative user.

    See Also:

    Oracle Database Security Guide for guidelines about choosing passwords
  4. Grant the Oracle Streams administrator the DBA role:

    GRANT DBA TO xstrmadmin;
    

    Note:

    The DBA role is required for a user to create or alter outbound servers, inbound servers, capture processes, synchronous captures, and apply processes. When the user does not need to perform these tasks, the DBA role can be revoked from the user.
  5. Run the GRANT_ADMIN_PRIVILEGE procedure in the DBMS_XSTREAM_AUTH package.

    A user must have explicit EXECUTE privilege on a package to execute a subprogram in the package inside of a user-created subprogram, and a user must have explicit SELECT privilege on a data dictionary view to query the view inside of a user-created subprogram. These privileges cannot be granted through a role. You can run the GRANT_ADMIN_PRIVILEGE procedure to grant such privileges to the XStream administrator, or you can grant them directly.

    Depending on the parameter settings for the GRANT_ADMIN_PRIVILEGE procedure, it either grants the privileges for an XStream administrator directly, or it generates a script that you can edit and then run to grant these privileges.

    Note:

    Starting with Oracle Database 11g Release 2 (11.2.0.2), the DBMS_XSTREAM_AUTH package is available.

    Use the GRANT_ADMIN_PRIVILEGE procedure to grant privileges directly:

    Run the following procedure:

    BEGIN
       DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
          grantee          => 'xstrmadmin',    
          grant_privileges => TRUE);
    END;
    /
    

    Use the GRANT_ADMIN_PRIVILEGE procedure to generate a script:

    Complete the following steps:

    1. Use the SQL statement CREATE DIRECTORY to create a directory object for the directory into which you want to generate the script. A directory object is similar to an alias for the directory. For example, to create a directory object called xstrm_dir for the /usr/admin directory on your computer system, run the following procedure:

      CREATE DIRECTORY xstrm_dir AS '/usr/admin';
      
    2. Run the GRANT_ADMIN_PRIVILEGE procedure to generate a script named grant_xstrm_privs.sql and place this script in the /usr/admin directory on your computer system:

      BEGIN
         DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
            grantee          => 'xstrmadmin',    
            grant_privileges => FALSE,
            file_name        => 'grant_xstrm_privs.sql',
            directory_name   => 'xstrm_dir');
      END;
      /
      

      Notice that the grant_privileges parameter is set to FALSE so that the procedure does not grant the privileges directly. Also, notice that the directory object created in Step a is specified for the directory_name parameter.

    3. Edit the generated script if necessary and save your changes.

    4. Execute the script in SQL*Plus:

      SET ECHO ON
      SPOOL grant_xstrm_privs.out
      @/usr/admin/grant_xstrm_privs.sql
      SPOOL OFF
      
    5. Check the spool file to ensure that all of the grants executed successfully. If there are errors, then edit the script to correct the errors and rerun it.

  6. If necessary, grant the following additional privileges:

    • If you plan to use Oracle Enterprise Manager to manage databases with XStream components, then configure the XStream administrator to be a Database Control administrator. Doing so grants additional privileges required by Oracle Enterprise Manager, such as the privileges required to run Oracle Enterprise Manager jobs. See Oracle Database 2 Day DBA for instructions.

    • Grant the privileges for a remote XStream administrator to perform actions in the local database. Grant these privileges using the GRANT_REMOTE_ADMIN_ACCESS procedure in the DBMS_XSTREAM_AUTH package. Grant this privilege if a remote XStream administrator will use a database link that connects to the local XStream administrator to perform administrative actions. Specifically, grant these privileges if either of the following conditions are true:

      • You plan to configure a downstream capture process at a remote downstream database that captures changes originating at the local source database, and the downstream capture process will use a database link to perform administrative actions at the source database.

      • You plan to use a remote XStream administrator to set the instantiation system change number (SCN) values for replicated database objects at the local database.

    • If no apply user is specified for an inbound server, then grant the XStream administrator the necessary privileges to perform DML and DDL changes on the apply objects owned by other users. If an apply user is specified, then the apply user must have these privileges. These privileges can be granted directly or through a role.

    • If no apply user is specified for an inbound server, then grant the XStream administrator EXECUTE privilege on any PL/SQL subprogram owned by another user that is executed by an inbound server. These subprograms can be used in apply handlers or error handlers. If an apply user is specified, then the apply user must have these privileges. These privileges must be granted directly. They cannot be granted through a role.

    • Grant the XStream administrator EXECUTE privilege on any PL/SQL function owned by another user that is specified in a custom rule-based transformation for a rule used by a capture process, synchronous capture, propagation, outbound server, or inbound server. For a capture process or synchronous capture, if a capture user is specified, then the capture user must have these privileges. For an inbound server, if an apply user is specified, then the apply user must have these privileges. These privileges must be granted directly. They cannot be granted through a role.

    • Grant the XStream administrator privileges to alter database objects where appropriate. For example, if the XStream administrator must create a supplemental log group for a table in another schema, then the XStream administrator must have the necessary privileges to alter the table. These privileges can be granted directly or through a role.

    • If the XStream administrator does not own the queue used by a capture process, synchronous capture, propagation, outbound server, or inbound server, and is not specified as the queue user for the queue when the queue is created, then the XStream administrator must be configured as a secure queue user of the queue if you want the XStream administrator to be able to enqueue messages into or dequeue messages from the queue. The XStream administrator might also need ENQUEUE or DEQUEUE privileges on the queue, or both. See Oracle Streams Concepts and Administration for information about managing queues.

    • Grant the XStream administrator EXECUTE privilege on any object types that the XStream administrator might need to access. These privileges can be granted directly or through a role.

    • If the XStream administrator will use Data Pump to perform export and import operations on database objects in other schemas during an instantiation, then grant the EXP_FULL_DATABASE and IMP_FULL_DATABASE roles to the XStream administrator.

    • If Oracle Database Vault is installed, then the user who performs the following actions must be granted the BECOME USER system privilege:

      • Creates or alters a capture process

      • Creates or alters an outbound server

      • Creates or alters an inbound server

      Granting the BECOME USER system privilege to the user who performs these actions is not required if Oracle Database Vault is not installed. You can revoke the BECOME USER system privilege from the user after the completing one of these actions, if necessary.

  7. Repeat all of the previous steps at each Oracle database in the environment that will use XStream.

Preparing for XStream Out

This section describes the decisions to make and the tasks to complete to prepare for an XStream Out configuration.

Decide How to Configure XStream

When you configure XStream Out, you must configure XStream components to capture database changes and send these changes to the outbound server in the form of logical change records (LCRs). These components include a capture process and at least one queue. The capture process can be a local capture process or a downstream capture process. For some configurations, you must also configure a propagation.

Local capture means that a capture process runs on the source database. Downstream capture means that a capture process runs on a database other than the source database. The source database is the database where the changes were generated. The primary reason to use downstream capture is to reduce the load on the source database, thereby improving its performance. The primary reason to use a local capture is because it is easier to configure and maintain.

The database that captures changes made to the source database is called the capture database. One of the following databases can be the capture database:

  • Source database (local capture)

  • Destination database (downstream capture)

  • A third database (downstream capture)

If the database running the outbound server is not the capture database, then a propagation sends changes from the capture database to the database running the outbound server. If the database running the outbound server is the capture database, then this propagation between databases is not needed because the capture process and outbound server use the same queue.

You can configure the components in the following ways:

  • Local capture and outbound server in the same database: The database objects, capture process, and outbound server are all in the same database. This configuration is the easiest to configure and maintain because all of the components are contained in one database.

  • Local capture and outbound server in different databases: The database objects and capture process are in one database, and the outbound server is in another database. A propagation sends LCRs from the source database to the outbound server database. This configuration is best when you want easy configuration and maintenance and when you want to optimize the performance of the outbound server database.

  • Downstream capture and outbound server in the same database: The database objects are in one database, and the capture process and outbound server are in another database. This configuration is best when you want to optimize the performance of the database with the database objects and want to offload change capture to another database. With this configuration, most of the components run on the database with the outbound server.

  • Downstream capture and outbound server in different databases: The database objects are in one database, the outbound server is in another database, and the capture process is in a third database. This configuration is best when you want to optimize the performance of both the database with the database objects and the database with the outbound server. With this configuration, the capture process runs on a third database, and a propagation sends LCRs from the capture database to the outbound server database.

If you decide to configure a downstream capture process, then you must decide which type of downstream capture process you want to configure. The following types are available:

  • A real-time downstream capture process configuration means that redo transport services use the log writer process (LGWR) at the source database to send redo data to the downstream database, and a remote file server process (RFS) at the downstream database receives the redo data over the network and stores the redo data in the standby redo log.

  • An archived-log downstream capture process configuration means that archived redo log files from the source database are copied to the downstream database, and the capture process captures changes in these archived redo log files. These log files can be transferred automatically using redo transport services, or they can be transferred manually using a method such as FTP.

The advantage of real-time downstream capture over archived-log downstream capture is that real-time downstream capture reduces the amount of time required to capture changes made to the source database. The time is reduced because the real-time downstream capture process does not need to wait for the redo log file to be archived before it can capture changes from it. You can configure multiple real-time downstream capture processes that capture changes from the same source database, but you cannot configure real-time downstream capture for multiple source databases at one downstream database.

The advantage of archived-log downstream capture over real-time downstream capture is that archived-log downstream capture allows downstream capture processes for multiple source databases at a downstream database. You can copy redo log files from multiple source databases to a single downstream database and configure multiple archived-log downstream capture processes to capture changes in these redo log files.

Prerequisites for Configuring XStream Out

Preparing for an XStream Out outbound server is similar to preparing for an Oracle Streams replication environment. The components used in an Oracle Streams replication environment to capture changes and send them to an apply process are the same components used to capture changes and send them to an outbound server. These components include a capture process and one or more queues. If the capture process runs on a different database than the outbound server, then a propagation is also required.

Several of the tasks described in this section are described in more detail in Oracle Streams Replication Administrator's Guide. This section provides an overview of each task and specific information about completing the task for an XStream Out configuration.

Ensure that the following prerequisites are met before configuring XStream Out:

Configure an XStream Administrator on All Databases

To configure and manage an XStream Out configuration, create an XStream administrator on each Oracle database that is involved in the XStream Out configuration.

If Required, Configure Network Connectivity and Database Links

Network connectivity and database links are not required when all of the components run on the same database. These components include the capture process, queue, and outbound server.

You must configure network connectivity and database links if you decided to configure XStream in either of the following ways:

  • The capture process and the outbound server will run on different databases.

  • Downstream capture will be used.

See "Decide How to Configure XStream" for more information about these decisions.

If network connectivity is required, then configure your network and Oracle Net so that the databases can communicate with each other.

The following database links are required:

  • When the capture process runs on a different database from the outbound server, create a database link from the capture database to the outbound server database. A propagation uses this database link to send changes from the capture database to the outbound server database.

  • When you use downstream capture, create a database link from the capture database to the source database. The source database is the database that generates the redo data that the capture process uses to capture changes. The capture process uses this database link to perform administrative tasks at the source database.

The name of each database link must match the global name of the destination database, and each database link should be created in the XStream administrator's schema.

For example, assume that you want to create a database link in a configuration with the following characteristics:

  • The global name of the source database is dbs1.example.com.

  • The global name of the destination database is dbs2.example.com.

  • The XStream administrator is xstrmadmin at each database.

Given these assumptions, the following statement creates a database link from dbs1.example.com to dbs2.example.com:

CONNECT xstrmadmin@dbs1.example.com
Enter password: password

CREATE DATABASE LINK dbs2.example.com CONNECT TO xstrmadmin 
   IDENTIFIED BY password USING 'dbs2.example.com';

See Also:

Ensure That Each Source Database Is in ARCHIVELOG Mode

Each source database that generates changes that will be captured by a capture process must be in ARCHIVELOG mode. For downstream capture processes, the downstream database also must be in ARCHIVELOG mode if you plan to configure a real-time downstream capture process. The downstream database does not need to be in ARCHIVELOG mode if you plan to run only archived-log downstream capture processes on it.

If you are configuring XStream in an Oracle Real Application Clusters (Oracle RAC) environment, then the archived redo log files of all threads from all instances must be available to any instance running a capture process. This requirement pertains to both local and downstream capture processes.

See Also:

Oracle Database Administrator's Guide for instructions about running a database in ARCHIVELOG mode
Set the Relevant Initialization Parameters Relevant

Some initialization parameters are important for the configuration, operation, reliability, and performance of the components in an XStream configuration. Set these parameters appropriately.

Oracle Streams Replication Administrator's Guide contains detailed information about all of the initialization parameters that are important for an Oracle Streams environment. The guidelines for setting these parameters also apply to an XStream configuration. In addition to the requirements described in Oracle Streams Replication Administrator's Guide for all Oracle Streams components, the following requirements apply to XStream outbound servers:

  • Ensure that the PROCESSES initialization parameter is set to a value large enough to accommodate the outbound server background processes and all of the other Oracle Database background processes.

  • Ensure that the SESSIONS initialization parameter is set to a value large enough to accommodate the sessions used by the outbound server background processes and all of the other Oracle Database sessions.

Configure the Oracle Streams Pool

The Oracle Streams pool is a portion of memory in the System Global Area (SGA) that is used by Oracle Streams. The Oracle Streams pool stores buffered queue messages in memory, and it provides memory for capture processes and outbound servers. The Oracle Streams pool always stores LCRs captured by a capture process, and it stores LCRs and messages that are enqueued into a buffered queue by applications. Ensure that there is enough space in the Oracle Streams pool at each database to store LCRs and run the components properly.

Each outbound server requires 1 MB of memory. The Oracle Streams pool is initialized the first time an outbound server is started.

See Also:

Oracle Streams Replication Administrator's Guide for information about Oracle Streams pool requirements
If Required, Configure Log File Transfer to a Downstream Database

If you decided to use a local capture process, then log file transfer is not required. However, if you decided to use downstream capture that uses redo transport services to transfer archived redo log files to the downstream database automatically, then configure log file transfer from the source database to the capture database. See "Decide How to Configure XStream" for information about this decision.

If Required, Add Standby Redo Logs for Real-Time Downstream Capture

If you decided to configure real-time downstream capture, then add standby redo logs to the capture database. See "Decide How to Configure XStream" for information about this decision.

Preparing for XStream In

Ensure that the following prerequisites are met before configuring XStream In:

Configure an XStream Administrator

To configure and manage an XStream In configuration, create an XStream administrator on the Oracle database that will run the XStream inbound server.

Set the Relevant Initialization Parameters Relevant

Some initialization parameters are important for the configuration, operation, reliability, and performance of XStream inbound servers. Set these parameters appropriately.

Oracle Streams Replication Administrator's Guide contains detailed information about all of the initialization parameters that are important for an Oracle Streams environment. The guidelines for setting these parameters also apply to an XStream configuration. In addition to the requirements described in Oracle Streams Replication Administrator's Guide for all Oracle Streams components, the following requirements apply to XStream inbound servers:

  • Ensure that the PROCESSES initialization parameter is set to a value large enough to accommodate the inbound server background processes and all of the other Oracle Database background processes.

  • Ensure that the SESSIONS initialization parameter is set to a value large enough to accommodate the sessions used by the inbound server background processes and all of the other Oracle Database sessions.

Configure the Oracle Streams Pool

The Oracle Streams pool is a portion of memory in the System Global Area (SGA) that provides memory for inbound servers. Ensure that there is enough space in the Oracle Streams pool for the inbound server to run properly. An inbound server requires 1 MB for each inbound server parallelism. For example, if parallelism is set to 10 for an inbound server, then at least 10 MB of memory is required for the inbound server. The Oracle Streams pool also must have enough space to store LCRs before they are applied. The Oracle Streams pool is initialized the first time an inbound server is started.

Configuring XStream Out

An outbound server in an XStream Out configuration streams Oracle database changes to a client application. The client application attaches to the outbound server using the Oracle Call Interface (OCI) or Java interface to receive these changes.

Configuring an outbound server involves creating the components that send captured database changes to the outbound server. It also involves configuring the outbound server itself, which includes specifying the connect user that the client application will use to attach to the outbound server.

This section contains these topics:

Configuring an XStream Outbound Server

You can create an outbound server using the following procedures in the DBMS_XSTREAM_ADM package:

  • The CREATE_OUTBOUND procedure creates an outbound server, a queue, and a capture process in a single database with one procedure call.

  • The ADD_OUTBOUND procedure only creates an outbound server. You must create the capture process and queue separately, and they must exist before you run the ADD_OUTBOUND procedure. You can configure the capture process on the same database as the outbound server or on a different database.

In both cases, you must create the client application that communicates with the outbound server and receives LCRs from the outbound server.

If you require multiple outbound servers, then you can use the CREATE_OUTBOUND procedure to create the capture process that captures database changes for the first outbound server. Next, you can run the ADD_OUTBOUND procedure to add additional outbound servers that receive the same captured changes. The capture process can reside on the same database as the outbound servers or on a different database.

This section contains these topics:

Configuring Multiple XStream Out Components Using CREATE_OUTBOUND

The CREATE_OUTBOUND procedure in the DBMS_XSTREAM_ADM package creates a capture process, queue, and outbound server in a single database. Both the capture process and the outbound server use the queue created by the procedure. When you run the procedure, you provide the name of the new outbound server, while the procedure generates a name for the capture process and queue. If you want all of the components to run on the same database, then the CREATE_OUTBOUND procedure is the fastest and easiest way to create an outbound server.

Prerequisites

Before configuring XStream Out, ensure that the following prerequisites are met:

Assumptions

This section makes the following assumptions:

  • The capture process will be a local capture process, and it will run on the same database as the outbound server.

    The instructions in this section can only set up the local capture and outbound server on the same database configuration described in "Decide How to Configure XStream".

  • The name of the outbound server is xout.

  • Data manipulation language (DML) and data definition language (DDL) changes made to the oe.orders and oe.order_items tables are sent to the outbound server.

  • DML and DDL changes made to the hr schema are sent to the outbound server.

To create an outbound server using the CREATE_OUTBOUND procedure: 

  1. In SQL*Plus, connect to the database as the XStream administrator.

    See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.

  2. Run the CREATE_OUTBOUND procedure.

    Given the assumptions for this section, run the following CREATE_OUTBOUND procedure:

    DECLARE
      tables  DBMS_UTILITY.UNCL_ARRAY;
      schemas DBMS_UTILITY.UNCL_ARRAY;
      BEGIN
        tables(1)  := 'oe.orders';
        tables(2)  := 'oe.order_items';
        schemas(1) := 'hr';
      DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
        server_name     =>  'xout',
        table_names     =>  tables,
        schema_names    =>  schemas);
    END;
    /
    

    Running this procedure performs the following actions:

    • Configures supplemental logging for the oe.orders and oe.order_items tables and for all of the tables in the hr schema.

    • Creates a queue with a system-generated name that is used by the capture process and the outbound server.

    • Creates and starts a capture process with a system-generated name with rule sets that instruct it to capture DML and DDL changes to the oe.orders table, the oe.order_items table, and the hr schema.

    • Creates and starts an outbound server named xout with rule sets that instruct it to send DML and DDL changes to the oe.orders table, the oe.order_items table, and the hr schema to the client application.

    • Sets the current user as the connect user for the outbound server. In this example, the current user is the XStream administrator. The client application must connect to the database as the connect user to interact with the outbound server.

    Tip:

    To capture and send all database changes to the outbound server, specify NULL (the default) for the table_names and schema_names parameters.
  3. Create and run the client application that will connect to the outbound server and receive the LCRs. See "Sample XStream Client Application" for a sample application.

  4. To add one or more additional outbound servers that receive LCRs from the capture process created in Step 2, follow the instructions in "Adding an Additional Outbound Server to a Capture Process Stream".

Configuring an Outbound Server Using ADD_OUTBOUND

The ADD_OUTBOUND procedure in the DBMS_XSTREAM_ADM package creates an outbound server. This procedure does not create the capture process or the queue. You must configure these components manually.

The instructions in this section can set up any of the configurations described in "Decide How to Configure XStream". However, if you chose the local capture and outbound server on the same database configuration, then it is usually easier to use the CREATE_OUTBOUND procedure to configure all of the components simultaneously. See "Configuring Multiple XStream Out Components Using CREATE_OUTBOUND".

Prerequisites

Before configuring XStream Out, ensure that the following prerequisites are met:

Assumptions

This section makes the following assumptions:

  • The name of the outbound server is xout.

  • The queue used by the outbound server is xstrmadmin.xstream_queue.

  • The source database is db1.example.com.

  • DML and DDL changes made to the oe.orders and oe.order_items tables are sent to the outbound server.

  • DML and DDL changes made to the hr schema are sent to the outbound server.

  • The capture process for the outbound server does not exist. (If the capture process exists, then skip Steps 1 to 3, and go to Step 4.)

To create an outbound server using the ADD_OUTBOUND procedure: 

  1. In SQL*Plus, connect to the database that will run the capture process (the capture database) as the XStream administrator.

    See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.

  2. Create the queue that will be used by the capture process.

    See Oracle Streams Replication Administrator's Guide for instructions.

  3. Create the capture process.

    Add rules to the capture process's rule sets to capture changes to the hr schema, the oe.orders table, and the oe.order_items table. Do not start the capture process.

    See Oracle Streams Replication Administrator's Guide for instructions.

  4. If the capture process will run on a different database than the outbound server, then set the xout_client_exists capture process parameter to Y.

    Setting this parameter to Y enables the capture process to send LCRs to an outbound server.

    Skip this step if the capture process will run on the same database as the outbound server. In this case, the xout_client_exists capture process parameter will be set to Y automatically.

    See Oracle Streams Concepts and Administration for information about setting a capture process parameter. See Oracle Database PL/SQL Packages and Types Reference for information about the xout_client_exists capture process parameter.

  5. Connect to the source database.

    The source database is the database that contains the database objects for which the capture process will capture changes. The source database and the capture database might be the same database.

  6. Ensure that required supplemental logging is specified for the database objects at the source database.

    Supplemental logging is required for the database objects for which the capture process will capture changes. If the capture database and the source database are the same database, then supplemental logging might have been specified during capture process creation.

    Ensure that the following supplemental logging is specified at the source database:

    • Any columns at the source database that are used in a primary key in tables for which changes are processed by the outbound server must be unconditionally logged in a log group or by database supplemental logging of primary key columns.

    • Any columns at the source database that are used by a rule or a rule-based transformation must be unconditionally logged.

    For the example in this section, ensure that supplemental logging is configured for the hr schema, the oe.orders table, and the oe.order_items table.

    See Oracle Streams Replication Administrator's Guide for instructions about specifying supplemental logging.

  7. Connect to the database that will run the outbound server as the XStream administrator.

  8. Create the queue that will be used by the outbound server.

    This step is not required if the capture process and the outbound server run on the same database and use the same queue.

    See Oracle Streams Replication Administrator's Guide for instructions.

  9. Run the ADD_OUTBOUND procedure.

    Given the assumption for this section, run the following ADD_OUTBOUND procedure:

    DECLARE
      tables  DBMS_UTILITY.UNCL_ARRAY;
      schemas DBMS_UTILITY.UNCL_ARRAY;
      BEGIN
        tables(1)  := 'oe.orders';
        tables(2)  := 'oe.order_items';
        schemas(1) := 'hr';
      DBMS_XSTREAM_ADM.ADD_OUTBOUND(
        server_name     =>  'xout',
        queue_name      =>  'xstrmadmin.xstream_queue',
        source_database =>  'db1.example.com',
        table_names     =>  tables,
        schema_names    =>  schemas);
    END;
    /
    

    If the capture process runs on the same database as the outbound server, then specify the capture process's queue for the queue_name parameter.

    Running this procedure performs the following actions:

    • Creates an outbound server named xout. The outbound server has rule sets that instruct it to send DML and DDL changes to the oe.orders table, the oe.order_items table, and the hr schema to the client application. The rules specify that these changes must have originated at the db1.example.com database. The outbound server dequeues LCRs from the queue xstrmadmin.xstream_queue.

    • Sets the current user as the connect user for the outbound server. In this example, the current user is the XStream administrator. The client application must connect to the database as the connect user to interact with the outbound server.

    Tip:

    For the outbound server to receive all of the LCRs sent by the capture process, specify NULL (the default) for the table_names and schema_names parameters.
  10. Connect to the capture database as the XStream administrator.

  11. Create the propagation that sends LCRs from the capture process's queue on the local database to the queue used by the outbound server on the outbound server database.

    Add rules to the propagation's rule sets to send changes to the hr schema, the oe.orders table, and the oe.order_items table from the source queue to the destination queue.

    This step is not required if the capture process and the outbound server run on the same database and use the same queue.

    See Oracle Streams Replication Administrator's Guide for instructions.

  12. Create and run the client application that will connect to the outbound server and receive the LCRs. See "Sample XStream Client Application" for a sample application.

  13. Connect to the database that is running outbound server as the XStream administrator.

  14. Start the outbound server if it is disabled. For example:

    exec DBMS_APPLY_ADM.START_APPLY('xout');
    
  15. Connect to the capture database as the XStream administrator.

  16. Start the capture process created in Step 3.

    See Oracle Streams Concepts and Administration for instructions.

  17. To add one or more additional outbound servers that receive LCRs from the capture process created in Step 3, follow the instructions in "Adding an Additional Outbound Server to a Capture Process Stream".

Adding an Additional Outbound Server to a Capture Process Stream

XStream Out configurations often require multiple outbound servers that process a stream of LCRs from a single capture process. This section describes adding an additional outbound server to a database that already includes at least one outbound server. The additional outbound server uses the same queue as another outbound server to receive the LCRs from the capture process. When an XStream Out environment exists, use the ADD_OUTBOUND procedure in the DBMS_XSTREAM_ADM package to add another outbound server to a capture process stream.

Prerequisites

Before completing the steps in this section, configure an XStream Out environment that includes at least one outbound server. The following sections describe configuring and XStream Out environment:

Assumptions

This section makes the following assumptions:

  • The name of the outbound server is xout2.

  • The queue used by the outbound server is xstrmadmin.xstream_queue.

  • DML and DDL changes made to the oe.orders and oe.order_items tables are sent to the outbound server.

  • DML and DDL changes made to the hr schema are sent to the outbound server.

  • The source database for the database changes is db1.example.com.

To add another outbound server to a capture process stream using the ADD_OUTBOUND procedure: 

  1. In SQL*Plus, connect to the database that will run the additional outbound server as the XStream administrator.

    See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.

  2. Determine the name of the queue used by an existing outbound server that receives LCRs from the capture process.

    Run the query in "Displaying General Information About an Outbound Server" to determine the owner and name of the queue. This query also shows the name of the capture process and the source database name.

  3. Run the ADD_OUTBOUND procedure.

    Given the assumptions for this section, run the following ADD_OUTBOUND procedure:

    DECLARE
      tables  DBMS_UTILITY.UNCL_ARRAY;
      schemas DBMS_UTILITY.UNCL_ARRAY;
      BEGIN
        tables(1)  := 'oe.orders';
        tables(2)  := 'oe.order_items';
        schemas(1) := 'hr';
      DBMS_XSTREAM_ADM.ADD_OUTBOUND(
        server_name     =>  'xout2',
        queue_name      =>  'xstrmadmin.xstream_queue',
        source_database =>  'db1.example.com',
        table_names     =>  tables,
        schema_names    =>  schemas);
    END;
    /
    

    Running this procedure performs the following actions:

    • Creates an outbound server named xout2. The outbound server has rule sets that instruct it to send DML and DDL changes to the oe.orders table, the oe.order_items table, and the hr schema to the client application. The rules specify that these changes must have originated at the db1.example.com database. The outbound server dequeues LCRs from the queue xstrmadmin.xstream_queue.

    • Sets the current user as the connect user for the outbound server. In this example, the current user is the XStream administrator. The client application must connect to the database as the connect user to interact with the outbound server.

    Tip:

    For the outbound server to receive all of the LCRs sent by the capture process, specify NULL (the default) for the table_names and schema_names parameters.
  4. If a client application does not exist, then create and run the client application that will connect to the outbound server and receive the LCRs. See "Sample XStream Client Application" for a sample application.

Configuring XStream In

An inbound server in an XStream In configuration receives a stream of changes from a client application. The inbound server can apply these changes to database objects in an Oracle database, or it can process the changes in a customized way. A client application can attach to an inbound server and send row changes and DDL changes encapsulated in LCRs using the OCI or Java interface.

The CREATE_INBOUND procedure in the DBMS_XSTREAM_ADM package creates an inbound server. You must create the client application that communicates with the inbound server and sends LCRs to the inbound server.

Prerequisites

Before configuring XStream In, ensure that the following prerequisites are met:

Assumptions

This section makes the following assumptions:

To create an inbound server: 

  1. In SQL*Plus, connect to the database that will run the inbound server as the XStream administrator.

    See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.

  2. Run the CREATE_INBOUND procedure.

    For example, the following CREATE_INBOUND procedure configures an inbound server named xin:

    BEGIN
      DBMS_XSTREAM_ADM.CREATE_INBOUND(
        server_name => 'xin',
        queue_name  => 'xin_queue');
    END;
    /
    

    Running this procedure performs the following actions:

    • Creates an inbound server named xin.

    • Sets the queue with the name xin_queue as the inbound server's queue, and creates this queue if it does not exist. This queue does not store LCRs sent by the client application. Instead, it stores error transactions if an LCR raises an error. The current user is the queue owner. In this example, the current user is the XStream administrator.

    • Sets the current user as the apply user for the inbound server. In this example, the current user is the XStream administrator. The client application must connect to the database as the apply user to interact with the inbound server.

    Tip:

    By default, an inbound server does not use rules or rule sets. Therefore, it processes all LCRs sent to it by the client application. To add rules and rule sets, use the DBMS_STREAMS_ADM package or the DBMS_RULE_ADM package. See Oracle Streams Concepts and Administration.
  3. If necessary, create apply handlers for the inbound server.

    Apply handlers are optional. Apply handlers process LCRs sent to an inbound server in a customized way.

  4. Create and run the client application that will connect to the inbound server and send LCRs to it.

    See "Sample XStream Client Application" for a sample application.

  5. If the inbound server is disabled, then start the inbound server.

    For example, enter the following:

    exec DBMS_APPLY_ADM.START_APPLY('xin');
    

Sample XStream Client Application

This section contains a sample XStream client application. This application illustrates the basic tasks that are required of an XStream Out and XStream In application.

The application performs the following tasks:

In an XStream Out configuration that does not send LCRs to an inbound server, the client application must obtain the processed low position in another way.

This application waits indefinitely for transactions from the outbound server. To interrupt the application, enter the interrupt command for your operating system. For example, the interrupt command on some operating systems is control-C. If the program is restarted, then the outbound server starts sending LCRs from the processed low position that was set during the previous run.

Figure 3-1 provides an overview of the XStream environment configured in this section.

Figure 3-1 Sample XStream Configuration

Description of Figure 3-1 follows
Description of "Figure 3-1 Sample XStream Configuration"

Before running the sample application, ensure that the following components exist:

The sample applications in the following sections perform the same tasks. One sample application uses the OCI API, and the other uses the Java API.

Note:

An Oracle Database installation includes several XStream demos. These demos are in the following location:
$ORACLE_HOME/rdbms/demo/xstream

Sample XStream Client Application for the Oracle Call Interface API

To run the sample XStream client application for the OCI API, compile and link the application file, and enter the following on a command line:

xio -ob_svr xout_name -ob_db sn_xout_db -ob_usr xout_cu -ob_pwd xout_cu_pass 
-ib_svr xin_name -ib_db sn_xin_db -ib_usr xin_au -ib_pwd xin_au_pass

Substitute the appropriate values for the following placeholders:

  • xout_name is the name of the outbound server.

  • sn_xout_db is the service name for the outbound server's database.

  • xout_cu is the outbound server's connect user.

  • xout_cu_pass is the password for the outbound server's connect user.

  • xin_name is the name of the inbound server.

  • sn_xin_db is the service name for the inbound server's database.

  • xin_au is the inbound server's apply user.

  • xin_au_pass is the password for the inbound server's apply user.

When the sample client application is running, it prints information about the row LCRs it is processing. The output looks similar to the following:

----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=UPDATE txid=17.0.74
  owner=HR oname=COUNTRIES 
 
 ----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=COMMIT txid=17.0.74
 
 ----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=UPDATE txid=12.25.77
  owner=OE oname=ORDERS 
 
 ----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=UPDATE txid=12.25.77
  owner=OE oname=ORDERS 

This output contains the following information for each row LCR:

  • src_db_name shows the source database for the change encapsulated in the row LCR.

  • cmd_type shows the type of SQL statement that made the change.

  • txid shows the transaction ID of the transaction that includes the row LCR.

  • owner shows the owner of the database object that was changed.

  • oname shows the name of the database object that was changed.

This demo is available in the following location:

$ORACLE_HOME/rdbms/demo/xstream/oci

The file name for the demo is xio.c. See the README.txt file in the demo directory for more information about compiling and running the application.

The code for the sample application that uses the OCI API follows:

#ifndef OCI_ORACLE
#include <oci.h>
#endif
 
#ifndef _STDIO_H
#include <stdio.h>
#endif
 
#ifndef _STDLIB_H
#include <stdlib.h>
#endif
 
#ifndef _STRING_H
#include <string.h>
#endif
 
#ifndef _MALLOC_H
#include <malloc.h>
#endif
 
/*---------------------------------------------------------------------- 
 *           Internal structures
 *----------------------------------------------------------------------*/ 
 
#define M_DBNAME_LEN    (128)
 
typedef struct conn_info                                     /* connect info */
{
  oratext * user;
  ub4       userlen;
  oratext * passw;
  ub4       passwlen;
  oratext * dbname;
  ub4       dbnamelen;
  oratext * svrnm;
  ub4       svrnmlen;
} conn_info_t;
 
typedef struct params
{
  conn_info_t  xout;                                        /* outbound info */
  conn_info_t  xin;                                          /* inbound info */
} params_t;
 
typedef struct oci                                            /* OCI handles */
{
  OCIEnv      *envp;                                   /* Environment handle */
  OCIError    *errp;                                         /* Error handle */
  OCIServer   *srvp;                                        /* Server handle */
  OCISvcCtx   *svcp;                                       /* Service handle */
  OCISession  *authp;
  OCIStmt    *stmtp;        
  boolean     attached;
  boolean     outbound;
} oci_t;
 
static void connect_db(conn_info_t *opt_params_p, oci_t ** ocip, ub2 char_csid,
                       ub2 nchar_csid);
static void disconnect_db(oci_t * ocip);
static void ocierror(oci_t * ocip, char * msg);
static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound);
static void detach(oci_t *ocip);
static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip);
static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip);
static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, 
                      oratext **src_db_name, ub2  *src_db_namel);
static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty);
static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, 
                       int argc, char ** argv);
static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, 
                            ub2 *nchar_csid);
static void set_client_charset(oci_t *outbound_ocip);
 
#define OCICALL(ocip, function) do {\
sword status=function;\
if (OCI_SUCCESS==status) break;\
else if (OCI_ERROR==status) \
{ocierror(ocip, (char *)"OCI_ERROR");\
exit(1);}\
else {printf("Error encountered %d\n", status);\
exit(1);}\
} while(0)
 
/*---------------------------------------------------------------------
 *                M A I N   P R O G R A M
 *---------------------------------------------------------------------*/
main(int argc, char **argv)
{
  /* Outbound and inbound connection info */
  conn_info_t   xout_params;
  conn_info_t   xin_params;
  oci_t        *xout_ocip = (oci_t *)NULL;
  oci_t        *xin_ocip = (oci_t *)NULL;
  ub2           obdb_char_csid = 0;                 /* outbound db char csid */
  ub2           obdb_nchar_csid = 0;               /* outbound db nchar csid */
 
  /* parse command line arguments */
  get_inputs(&xout_params, &xin_params, argc, argv); 
 
  /* Get the outbound database CHAR and NCHAR character set info */
  get_db_charsets(&xout_params, &obdb_char_csid, &obdb_nchar_csid);
 
  /* Connect to the outbound db and set the client env to the outbound charsets
   * to minimize character conversion when transferring LCRs from outbound 
   * directly to inbound server. 
   */
  connect_db(&xout_params, &xout_ocip, obdb_char_csid, obdb_nchar_csid);
 
  /* Attach to outbound server */
  attach(xout_ocip, &xout_params, TRUE);
 
  /* connect to inbound db and set the client charsets the same as the 
   * outbound db charsets.
   */
  connect_db(&xin_params, &xin_ocip, obdb_char_csid, obdb_nchar_csid);
 
  /* Attach to inbound server */
  attach(xin_ocip, &xin_params, FALSE);
 
  /* Get lcrs from outbound server and send to inbound server */
  get_lcrs(xin_ocip, xout_ocip);
 
  /* Detach from XStream servers */
  detach(xout_ocip);
  detach(xin_ocip);
 
  /* Disconnect from both databases */
  disconnect_db(xout_ocip);
  disconnect_db(xin_ocip);
 
  free(xout_ocip);
  free(xin_ocip);
  exit (0);
}
 
/*---------------------------------------------------------------------
 * connect_db - Connect to the database and set the env to the given
 * char and nchar character set ids. 
 *---------------------------------------------------------------------*/
static void connect_db(conn_info_t *params_p, oci_t **ociptr, ub2 char_csid,
                ub2 nchar_csid)
{
  oci_t        *ocip;
 
  printf ("Connect to Oracle as %.*s@%.*s ",
          params_p->userlen, params_p->user, 
          params_p->dbnamelen, params_p->dbname);
       
  if (char_csid && nchar_csid)
    printf ("using char csid=%d and nchar csid=%d", char_csid, nchar_csid);
 
  printf("\n");
 
  ocip = (oci_t *)malloc(sizeof(oci_t));
 
  if (OCIEnvNlsCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0,
                     (dvoid * (*)(dvoid *, size_t)) 0,
                     (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                     (void (*)(dvoid *, dvoid *)) 0,
                     (size_t) 0, (dvoid **) 0, char_csid, nchar_csid))
  {
    ocierror(ocip, (char *)"OCIEnvCreate() failed");
  }
 
  if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp,
                     (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0))
  {
    ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed");
  }
 
  /* Logon to database */
  OCICALL(ocip,
          OCILogon(ocip->envp, ocip->errp, &ocip->svcp,
                   params_p->user, params_p->userlen,
                   params_p->passw, params_p->passwlen,
                   params_p->dbname, params_p->dbnamelen));
 
  /* allocate the server handle */
  OCICALL(ocip,
          OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->srvp,
                         OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0));
 
  OCICALL(ocip, 
          OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp,
                     (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0));
 
  if (*ociptr == (oci_t *)NULL)
  {
    *ociptr = ocip;
  }
}
 
/*---------------------------------------------------------------------
 * get_db_charsets - Get the database CHAR and NCHAR character set ids.
 *---------------------------------------------------------------------*/
static const oratext GET_DB_CHARSETS[] =  \
 "select parameter, value from nls_database_parameters where parameter = \
 'NLS_CHARACTERSET' or parameter = 'NLS_NCHAR_CHARACTERSET'";
 
#define PARM_BUFLEN      (30)
 
static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, 
                            ub2 *nchar_csid)
{
  OCIDefine  *defnp1 = (OCIDefine *) NULL;
  OCIDefine  *defnp2 = (OCIDefine *) NULL;
  oratext     parm[PARM_BUFLEN];
  oratext     value[OCI_NLS_MAXBUFSZ];
  ub2         parm_len = 0;
  ub2         value_len = 0;
  oci_t       ocistruct; 
  oci_t      *ocip = &ocistruct;
   
  *char_csid = 0;
  *nchar_csid = 0;
  memset (ocip, 0, sizeof(ocistruct));
 
  if (OCIEnvCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0,
                     (dvoid * (*)(dvoid *, size_t)) 0,
                     (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                     (void (*)(dvoid *, dvoid *)) 0,
                     (size_t) 0, (dvoid **) 0))
  {
    ocierror(ocip, (char *)"OCIEnvCreate() failed");
  }
 
  if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp,
                     (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0))
  {
    ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed");
  }
 
  OCICALL(ocip, 
          OCILogon(ocip->envp, ocip->errp, &ocip->svcp,
                   params_p->user, params_p->userlen,
                   params_p->passw, params_p->passwlen,
                   params_p->dbname, params_p->dbnamelen));
 
  OCICALL(ocip, 
          OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp,
                     (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0));
 
  /* Execute stmt to select the db nls char and nchar character set */ 
  OCICALL(ocip, 
          OCIStmtPrepare(ocip->stmtp, ocip->errp,
                         (CONST text *)GET_DB_CHARSETS,
                         (ub4)strlen((char *)GET_DB_CHARSETS),
                         (ub4)OCI_NTV_SYNTAX, (ub4)OCI_DEFAULT));
 
  OCICALL(ocip,
          OCIDefineByPos(ocip->stmtp, &defnp1,
                         ocip->errp, (ub4) 1, parm,
                         PARM_BUFLEN, SQLT_CHR, (void*) 0,
                         &parm_len, (ub2 *)0, OCI_DEFAULT));
 
  OCICALL(ocip,
          OCIDefineByPos(ocip->stmtp, &defnp2,
                         ocip->errp, (ub4) 2, value,
                         OCI_NLS_MAXBUFSZ, SQLT_CHR, (void*) 0,
                         &value_len, (ub2 *)0, OCI_DEFAULT));
 
  OCICALL(ocip, 
          OCIStmtExecute(ocip->svcp, ocip->stmtp, 
                         ocip->errp, (ub4)0, (ub4)0, 
                         (const OCISnapshot *)0,
                         (OCISnapshot *)0, (ub4)OCI_DEFAULT));
 
  while (OCIStmtFetch(ocip->stmtp, ocip->errp, 1,
                      OCI_FETCH_NEXT, OCI_DEFAULT) == OCI_SUCCESS)
  {
    value[value_len] = '\0';
    if (parm_len == strlen("NLS_CHARACTERSET") &&
        !memcmp(parm, "NLS_CHARACTERSET", parm_len))
    {
      *char_csid = OCINlsCharSetNameToId(ocip->envp, value);
      printf("Outbound database NLS_CHARACTERSET = %.*s (csid = %d) \n",
             value_len, value, *char_csid);
    }
    else if (parm_len == strlen("NLS_NCHAR_CHARACTERSET") &&
             !memcmp(parm, "NLS_NCHAR_CHARACTERSET", parm_len))
    {
      *nchar_csid = OCINlsCharSetNameToId(ocip->envp, value);
      printf("Outbound database NLS_NCHAR_CHARACTERSET = %.*s (csid = %d) \n",
             value_len, value, *nchar_csid);
    }
  }
 
  disconnect_db(ocip);
}
 
/*---------------------------------------------------------------------
 * attach - Attach to XStream server specified in connection info
 *---------------------------------------------------------------------*/
static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound)
{
  sword       err;
 
  printf ("Attach to XStream %s server '%.*s'\n", 
          outbound ? "outbound" : "inbound",
          conn->svrnmlen, conn->svrnm);
 
  if (outbound)
  {
    OCICALL(ocip, 
            OCIXStreamOutAttach(ocip->svcp, ocip->errp, conn->svrnm,
                              (ub2)conn->svrnmlen, (ub1 *)0, 0, OCI_DEFAULT));
  }
  else
  {
    OCICALL(ocip, 
            OCIXStreamInAttach(ocip->svcp, ocip->errp, conn->svrnm,
                               (ub2)conn->svrnmlen, 
                               (oratext *)"From_XOUT", 9,
                               (ub1 *)0, 0, OCI_DEFAULT));
  }
 
  ocip->attached = TRUE;
  ocip->outbound = outbound;
}
 
/*---------------------------------------------------------------------
 * ping_svr - Ping inbound server by sending a commit LCR.
 *---------------------------------------------------------------------*/
static void ping_svr(oci_t *xin_ocip, void *commit_lcr,
                     ub1 *cmtpos, ub2 cmtpos_len, 
                     oratext *source_db, ub2 source_db_len)
{
  OCIDate     src_time;
  oratext     txid[128];
 
  OCICALL(xin_ocip, OCIDateSysDate(xin_ocip->errp, &src_time));
  sprintf(txid, "Ping %2d:%2d:%2d",
          src_time.OCIDateTime.OCITimeHH,
          src_time.OCIDateTime.OCITimeMI,
          src_time.OCIDateTime.OCITimeSS);
 
  /* Initialize LCR with new txid and commit position */
  OCICALL(xin_ocip,
          OCILCRHeaderSet(xin_ocip->svcp, xin_ocip->errp,
                          source_db, source_db_len,
                          (oratext *)OCI_LCR_ROW_CMD_COMMIT,
                          (ub2)strlen(OCI_LCR_ROW_CMD_COMMIT),
                          (oratext *)0, 0,                     /* null owner */
                          (oratext *)0, 0,                    /* null object */
                          (ub1 *)0, 0,                           /* null tag */
                          txid, (ub2)strlen((char *)txid),
                          &src_time, cmtpos, cmtpos_len,
                          0, commit_lcr, OCI_DEFAULT));
 
  /* Send commit lcr to inbound server. */
  if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, commit_lcr,
                          OCI_LCR_XROW, 0, OCI_DEFAULT) == OCI_ERROR)
  {
    ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed in ping_svr()");
  }
}
 
/*---------------------------------------------------------------------
 * get_lcrs - Get LCRs from outbound server and send to inbound server.
 *---------------------------------------------------------------------*/
static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip)
{
  sword       status = OCI_SUCCESS;
  void       *lcr;
  ub1         lcrtype;
  oraub8      flag;
  ub1         proclwm[OCI_LCR_MAX_POSITION_LEN];
  ub2         proclwm_len = 0;
  ub1         sv_pingpos[OCI_LCR_MAX_POSITION_LEN];
  ub2         sv_pingpos_len = 0;
  ub1         fetchlwm[OCI_LCR_MAX_POSITION_LEN];
  ub2         fetchlwm_len = 0;
  void       *commit_lcr = (void *)0;
  oratext    *lcr_srcdb = (oratext *)0;
  ub2         lcr_srcdb_len = 0;
  oratext     source_db[M_DBNAME_LEN];
  ub2         source_db_len = 0;
  ub4         lcrcnt = 0;
 
  /* create an lcr to ping the inbound server periodically by sending a
   * commit lcr.
   */
  commit_lcr = (void*)0;
  OCICALL(xin_ocip,
          OCILCRNew(xin_ocip->svcp, xin_ocip->errp, OCI_DURATION_SESSION,
                    OCI_LCR_XROW, &commit_lcr, OCI_DEFAULT));
 
  while (status == OCI_SUCCESS)
  {
    lcrcnt = 0;                         /* reset lcr count before each batch */
 
    while ((status = 
                OCIXStreamOutLCRReceive(xout_ocip->svcp, xout_ocip->errp,
                                        &lcr, &lcrtype, &flag, 
                                        fetchlwm, &fetchlwm_len, OCI_DEFAULT))
                                               == OCI_STILL_EXECUTING)
    {
      lcrcnt++;
 
      /* print header of LCR just received */
      print_lcr(xout_ocip, lcr, lcrtype, &lcr_srcdb, &lcr_srcdb_len);
 
      /* save the source db to construct ping lcr later */
      if (!source_db_len && lcr_srcdb_len)
      {
        memcpy(source_db, lcr_srcdb, lcr_srcdb_len);
        source_db_len = lcr_srcdb_len;
      }
      
      /* send the LCR just received */
      if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, 
                              lcr, lcrtype, flag, OCI_DEFAULT) == OCI_ERROR)
      {
        ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed");
      }
 
      /* If LCR has chunked columns (i.e, has LOB/Long/XMLType columns) */
      if (flag & OCI_XSTREAM_MORE_ROW_DATA)
      {
        /* receive and send chunked columns */
        get_chunks(xin_ocip, xout_ocip); 
      }
    }
 
    if (status == OCI_ERROR)
      ocierror(xout_ocip, (char *)"OCIXStreamOutLCRReceive failed");
 
    /* clear the saved ping position if we just received some new lcrs */
    if (lcrcnt)
    {
      sv_pingpos_len = 0;
    }
 
    /* If no lcrs received during previous WHILE loop and got a new fetch 
     * LWM then send a commit lcr to ping the inbound server with the new
     * fetch LWM position.
     */
    else if (fetchlwm_len > 0 && source_db_len > 0 &&
        (fetchlwm_len != sv_pingpos_len ||
         memcmp(sv_pingpos, fetchlwm, fetchlwm_len))) 
    {
      /* To ensure we don't send multiple lcrs with duplicate position, send
       * a new ping only if we have saved the last ping position.
       */
      if (sv_pingpos_len > 0)
      {     
        ping_svr(xin_ocip, commit_lcr, fetchlwm, fetchlwm_len,
                 source_db, source_db_len); 
      }
 
      /* save the position just sent to inbound server */
      memcpy(sv_pingpos, fetchlwm, fetchlwm_len);
      sv_pingpos_len = fetchlwm_len;
    }
 
    /* flush inbound network to flush all lcrs to inbound server */
    OCICALL(xin_ocip,
            OCIXStreamInFlush(xin_ocip->svcp, xin_ocip->errp, OCI_DEFAULT));
 
    
    /* get processed LWM of inbound server */   
    OCICALL(xin_ocip, 
            OCIXStreamInProcessedLWMGet(xin_ocip->svcp, xin_ocip->errp,
                                        proclwm, &proclwm_len, OCI_DEFAULT));
 
    if (proclwm_len > 0)
    {
      /* Set processed LWM for outbound server */
      OCICALL(xout_ocip, 
              OCIXStreamOutProcessedLWMSet(xout_ocip->svcp, xout_ocip->errp, 
                                           proclwm, proclwm_len, OCI_DEFAULT));
    }
  }
  
  if (status != OCI_SUCCESS)
    ocierror(xout_ocip, (char *)"get_lcrs() encounters error");
}
 
/*---------------------------------------------------------------------
 * get_chunks - Get each chunk for the current LCR and send it to 
 *              the inbound server.
 *---------------------------------------------------------------------*/
static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip)
{
  oratext *colname;
  ub2      colname_len;
  ub2      coldty;
  oraub8   col_flags;
  ub2      col_csid;
  ub4      chunk_len;
  ub1     *chunk_ptr;
  oraub8   row_flag;
  sword    err;
  sb4      rtncode;
 
  do
  {
    /* Get a chunk from outbound server */
    OCICALL(xout_ocip,
            OCIXStreamOutChunkReceive(xout_ocip->svcp, xout_ocip->errp, 
                                      &colname, &colname_len, &coldty, 
                                      &col_flags, &col_csid, &chunk_len, 
                                      &chunk_ptr, &row_flag, OCI_DEFAULT));
   
    /* print chunked column info */
    printf(
      "  Chunked column name=%.*s DTY=%d  chunk len=%d csid=%d col_flag=0x%x\n",
      colname_len, colname, coldty, chunk_len, col_csid, col_flags);
 
    /* print chunk data */
    print_chunk(chunk_ptr, chunk_len, coldty);
 
    /* Send the chunk just received to inbound server */
    OCICALL(xin_ocip,
            OCIXStreamInChunkSend(xin_ocip->svcp, xin_ocip->errp, colname,
                                  colname_len, coldty, col_flags,
                                  col_csid, chunk_len, chunk_ptr,
                                  row_flag, OCI_DEFAULT));
 
  } while (row_flag & OCI_XSTREAM_MORE_ROW_DATA);
}
 
/*---------------------------------------------------------------------
 * print_chunk - Print chunked column information. Only print the first
 *               50 bytes for each chunk.
 *---------------------------------------------------------------------*/
static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty)
{
#define MAX_PRINT_BYTES     (50)          /* print max of 50 bytes per chunk */
 
  ub4  print_bytes;
 
  if (chunk_len == 0)
    return;
 
  print_bytes = chunk_len > MAX_PRINT_BYTES ? MAX_PRINT_BYTES : chunk_len;
 
  printf("  Data = ", chunk_len);
  if (dty == SQLT_CHR)
    printf("%.*s", print_bytes, chunk_ptr);
  else
  {
    ub2  idx;
 
    for (idx = 0; idx < print_bytes; idx++)
      printf("%02x", chunk_ptr[idx]);
  }
  printf("\n");
}
 
/*---------------------------------------------------------------------
 * print_lcr - Print header information of given lcr.
 *---------------------------------------------------------------------*/
static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, 
                      oratext **src_db_name, ub2  *src_db_namel)
{
  oratext     *cmd_type;
  ub2          cmd_type_len;
  oratext     *owner;
  ub2          ownerl;
  oratext     *oname;
  ub2          onamel;
  oratext     *txid;
  ub2          txidl;
  sword        ret;
 
  printf("\n ----------- %s LCR Header  -----------------\n",
         lcrtype == OCI_LCR_XDDL ? "DDL" : "ROW");
 
  /* Get LCR Header information */
  ret = OCILCRHeaderGet(ocip->svcp, ocip->errp, 
                        src_db_name, src_db_namel,              /* source db */
                        &cmd_type, &cmd_type_len,            /* command type */
                        &owner, &ownerl,                       /* owner name */
                        &oname, &onamel,                      /* object name */
                        (ub1 **)0, (ub2 *)0,                      /* lcr tag */
                        &txid, &txidl, (OCIDate *)0,   /* txn id  & src time */
                        (ub2 *)0, (ub2 *)0,              /* OLD/NEW col cnts */
                        (ub1 **)0, (ub2 *)0,                 /* LCR position */
                        (oraub8*)0, lcrp, OCI_DEFAULT);
 
  if (ret != OCI_SUCCESS)
    ocierror(ocip, (char *)"OCILCRHeaderGet failed");
  else
  {
    printf("  src_db_name=%.*s\n  cmd_type=%.*s txid=%.*s\n",
           *src_db_namel, *src_db_name, cmd_type_len, cmd_type, txidl, txid );
 
    if (ownerl > 0)
      printf("  owner=%.*s oname=%.*s \n", ownerl, owner, onamel, oname);
  } 
}
 
/*---------------------------------------------------------------------
 * detach - Detach from XStream server
 *---------------------------------------------------------------------*/
static void detach(oci_t * ocip)
{
  sword  err = OCI_SUCCESS;
 
  printf ("Detach from XStream %s server\n",
          ocip->outbound ? "outbound" : "inbound" );
 
  if (ocip->outbound)
  {
    OCICALL(ocip, OCIXStreamOutDetach(ocip->svcp, ocip->errp, OCI_DEFAULT));
  }
  else
  {
    OCICALL(ocip, OCIXStreamInDetach(ocip->svcp, ocip->errp, 
                                     (ub1 *)0, (ub2 *)0,    /* processed LWM */
                                     OCI_DEFAULT));
  }
}
 
/*---------------------------------------------------------------------
 * disconnect_db  - Logoff from the database
 *---------------------------------------------------------------------*/
static void disconnect_db(oci_t * ocip)
{
  if (OCILogoff(ocip->svcp, ocip->errp))
  {
    ocierror(ocip, (char *)"OCILogoff() failed");
  }
 
  if (ocip->errp)
    OCIHandleFree((dvoid *) ocip->errp, (ub4) OCI_HTYPE_ERROR);
 
  if (ocip->envp)
    OCIHandleFree((dvoid *) ocip->envp, (ub4) OCI_HTYPE_ENV);
}
 
/*---------------------------------------------------------------------
 * ocierror - Print error status and exit program
 *---------------------------------------------------------------------*/
static void ocierror(oci_t * ocip, char * msg)
{
  sb4 errcode=0;
  text bufp[4096];
 
  if (ocip->errp)
  {
    OCIErrorGet((dvoid *) ocip->errp, (ub4) 1, (text *) NULL, &errcode,
                bufp, (ub4) 4096, (ub4) OCI_HTYPE_ERROR);
    printf("%s\n%s", msg, bufp);
  }
  else
    puts(msg);
 
  printf ("\n");
  exit(1);
}
 
/*--------------------------------------------------------------------
 * print_usage - Print command usage
 *---------------------------------------------------------------------*/
static void print_usage(int exitcode)
{
  puts("\nUsage: xio -ob_svr <outbound_svr> -ob_db <outbound_db>\n" 
         "           -ob_usr <conn_user> -ob_pwd <conn_user_pwd>\n" 
         "           -ib_svr <inbound_svr> -ib_db <inbound_db>\n"
         "           -ib_usr <apply_user> -ib_pwd <apply_user_pwd>\n");
  puts("  ob_svr  : outbound server name\n"
       "  ob_db   : database name of outbound server\n"
       "  ob_usr  : connect user to outbound server\n"
       "  ob_pwd  : password of outbound's connect user\n"
       "  ib_svr  : inbound server name\n"
       "  ib_db   : database name of inbound server\n"
       "  ib_usr  : apply user for inbound server\n"
       "  ib_pwd  : password of inbound's apply user\n");
 
  exit(exitcode);
}
 
/*--------------------------------------------------------------------
 * get_inputs - Get user inputs from command line
 *---------------------------------------------------------------------*/
static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, 
                       int argc, char ** argv)
{
  char * option;
  char * value;
 
  memset (xout_params, 0, sizeof(*xout_params));
  memset (xin_params, 0, sizeof(*xin_params));
  while(--argc)
  {
    /* get the option name */
    argv++;
    option = *argv;
 
    /* check that the option begins with a "-" */
    if (!strncmp(option, (char *)"-", 1))
    {
      option ++;
    }
    else
    {
      printf("Error: bad argument '%s'\n", option);
      print_usage(1);
    }
 
    /* get the value of the option */
    --argc;
    argv++;
 
    value = *argv;    
 
    if (!strncmp(option, (char *)"ob_db", 5))
    {
      xout_params->dbname = (oratext *)value;
      xout_params->dbnamelen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ob_usr", 6))
    {
      xout_params->user = (oratext *)value;
      xout_params->userlen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ob_pwd", 6))
    {
      xout_params->passw = (oratext *)value;
      xout_params->passwlen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ob_svr", 6))
    {
      xout_params->svrnm = (oratext *)value;
      xout_params->svrnmlen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_db", 5))
    {
      xin_params->dbname = (oratext *)value;
      xin_params->dbnamelen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_usr", 6))
    {
      xin_params->user = (oratext *)value;
      xin_params->userlen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_pwd", 6))
    {
      xin_params->passw = (oratext *)value;
      xin_params->passwlen = strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_svr", 6))
    {
      xin_params->svrnm = (oratext *)value;
      xin_params->svrnmlen = strlen(value);
    }
    else
    {
      printf("Error: unknown option '%s'.\n", option);
      print_usage(1);
    }
  }
 
  /* print usage and exit if any argument is not specified */
  if (!xout_params->svrnmlen || !xout_params->passwlen || 
      !xout_params->userlen || !xout_params->dbnamelen ||
      !xin_params->svrnmlen || !xin_params->passwlen || 
      !xin_params->userlen || !xin_params->dbnamelen)
  {
    printf("Error: missing command arguments. \n");
    print_usage(1);
  }
}

Sample XStream Client Application for the Java API

To run the sample XStream client application for the Java API, compile and link the application file, and enter the following on a command line:

java xio xsin_oraclesid xsin_host xsin_port xsin_username 
xsin_passwd xin_servername xsout_oraclesid xsout_host xsout_port 
xsout_username xsout_passwd xsout_servername

Substitute the appropriate values for the following placeholders:

  • xsin_oraclesid is the Oracle SID of the inbound server's database.

  • xsin_host is the host name of the computer system running the inbound server.

  • xsin_port is the port number of the listener for the inbound server's database.

  • xsin_username is the inbound server's apply user.

  • xsin_passwd is the password for the inbound server's apply user.

  • xin_servername is the name of the inbound server.

  • xsout_oraclesid is the Oracle SID of the outbound server's database.

  • xsout_host is the host name of the computer system running the outbound server.

  • xsout_port is the port number of the listener for the outbound server's database.

  • xsout_username is the outbound server's connect user.

  • xsout_passwd is the password for the outbound server's connect user.

  • xsout_servername is the name of the outbound server.

When the sample client application is running, it prints information about attaching to the inbound server and outbound server, along with the last position for each server. The output looks similar to the following:

xsin_host = server2.example.com
xsin_port = 1482
xsin_ora_sid = db2
xsin connection url: jdbc:oracle:oci:@server2.example.com:1482:db2
xsout_host = server1.example.com
xsout_port = 1481
xsout_ora_sid = db1
xsout connection url: jdbc:oracle:oci:@server1.example.com:1481:db1
Attached to inbound server:xin
Inbound Server Last Position is: 0000000920250000000100000001000000092025000000010000000101
Attached to outbound server:xout
Last Position is: 0000000920250000000100000001000000092025000000010000000101

This demo is available in the following location:

$ORACLE_HOME/rdbms/demo/xstream/java

The file name for the demo is xio.java. See the README.txt file in the demo directory for more information about compiling and running the application.

The code for the sample application that uses the Java API follows:

import oracle.streams.*;
import oracle.jdbc.internal.OracleConnection;
import oracle.jdbc.*;
import oracle.sql.*;
import java.sql.*;
import java.util.*;
 
public class xio
{
  public static String xsinusername = null;
  public static String xsinpasswd = null;
  public static String xsinName = null;
  public static String xsoutusername = null;
  public static String xsoutpasswd = null;
  public static String xsoutName = null;
  public static String in_url = null;
  public static String out_url = null;
  public static Connection in_conn = null;
  public static Connection out_conn = null;
  public static XStreamIn xsIn = null;
  public static XStreamOut xsOut = null;
  public static byte[] lastPosition = null;
  public static byte[] processedLowPosition = null;
    
  public static void main(String args[])
  {
    // get connection url to inbound and outbound server
    in_url = parseXSInArguments(args);    
    out_url = parseXSOutArguments(args);    
 
    // create connection to inbound and outbound server
    in_conn = createConnection(in_url, xsinusername, xsinpasswd);
    out_conn = createConnection(out_url, xsoutusername, xsoutpasswd);
 
    // attach to inbound and outbound server
    xsIn = attachInbound(in_conn);
    xsOut = attachOutbound(out_conn);
    
    // main loop to get lcrs 
    get_lcrs(xsIn, xsOut);
    
    // detach from inbound and outbound server
    detachInbound(xsIn);
    detachOutbound(xsOut);
  }
    
  // parse the arguments to get the conncetion url to inbound db
  public static String parseXSInArguments(String args[])
  {
    String trace, pref;
    String orasid, host, port;
    
    if (args.length != 12)
    {
      printUsage();
      System.exit(0);
    }
 
    orasid = args[0];
    host = args[1];
    port = args[2];
    xsinusername = args[3];
    xsinpasswd = args[4];
    xsinName = args[5];
    
    System.out.println("xsin_host = "+host);
    System.out.println("xsin_port = "+port);
    System.out.println("xsin_ora_sid = "+orasid);
 
    String in_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid;
    System.out.println("xsin connection url: "+ in_url);
 
    return in_url;
  }
 
  // parse the arguments to get the conncetion url to outbound db
  public static String parseXSOutArguments(String args[])
  {
    String trace, pref;
    String orasid, host, port;
    
    if (args.length != 12)
    {
      printUsage();
      System.exit(0);
    }
 
    orasid = args[6];
    host = args[7];
    port = args[8];
    xsoutusername = args[9];
    xsoutpasswd = args[10];
    xsoutName = args[11];
    
    
    System.out.println("xsout_host = "+host);
    System.out.println("xsout_port = "+port);
    System.out.println("xsout_ora_sid = "+orasid);
 
    String out_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid;
    System.out.println("xsout connection url: "+ out_url);
 
    return out_url;
  }
 
  // print out sample program usage message
  public static void printUsage()
  {
    System.out.println("");      
    System.out.println("Usage: java xio "+"<xsin_oraclesid> " + "<xsin_host> "
                                         + "<xsin_port> ");
    System.out.println("                "+"<xsin_username> " + "<xsin_passwd> "
                                         + "<xsin_servername> ");
    System.out.println("                "+"<xsout_oraclesid> " + "<xsout_host> "
                                         + "<xsout_port> ");
    System.out.println("                "+"<xsout_username> " + "<xsout_passwd> "
                                         + "<xsout_servername> ");
  }
 
  // create a connection to an Oracle Database
  public static Connection createConnection(String url, 
                                            String username, 
                                            String passwd)
  {
    try
    {
      DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
      return DriverManager.getConnection(url, username, passwd);
    }
    catch(Exception e)
    {
      System.out.println("fail to establish DB connection to: " +url);
      e.printStackTrace();
      return null;
    }
  }
 
  // attach to the XStream Inbound Server
  public static XStreamIn attachInbound(Connection in_conn)
  {
    XStreamIn xsIn = null;
    try
    {
      xsIn = XStreamIn.attach((OracleConnection)in_conn, xsinName,
                              "XSDEMOINCLIENT" , XStreamIn.DEFAULT_MODE);
 
      // use last position to decide where should we start sending LCRs  
      lastPosition = xsIn.getLastPosition();
      System.out.println("Attached to inbound server:"+xsinName);
      System.out.print("Inbound Server Last Position is: ");
      if (null == lastPosition)
      {
        System.out.println("null");
      }
      else
      {
        printHex(lastPosition);
      }
      return xsIn;
    }
    catch(Exception e)
    {
      System.out.println("cannot attach to inbound server: "+xsinName);
      System.out.println(e.getMessage());
      e.printStackTrace();
      return null;
    }        
  }
 
  // attach to the XStream Outbound Server    
  public static XStreamOut attachOutbound(Connection out_conn)
  {
    XStreamOut xsOut = null;
 
    try
    {
      // when attach to an outbound server, client needs to tell outbound
      // server the last position.
      xsOut = XStreamOut.attach((OracleConnection)out_conn, xsoutName,
                                lastPosition, XStreamOut.DEFAULT_MODE);
      System.out.println("Attached to outbound server:"+xsoutName);
      System.out.print("Last Position is: ");  
      if (lastPosition != null)
      {
        printHex(lastPosition);
      }
      else
      {
        System.out.println("NULL");
      }
      return xsOut;
    }
    catch(Exception e)
    {
      System.out.println("cannot attach to outbound server: "+xsoutName);
      System.out.println(e.getMessage());
      e.printStackTrace();
      return null;
    } 
  }
 
  // detach from the XStream Inbound Server
  public static void detachInbound(XStreamIn xsIn)
  {
    byte[] processedLowPosition = null;
    try
    {
      processedLowPosition = xsIn.detach(XStreamIn.DEFAULT_MODE);
      System.out.print("Inbound server processed low Position is: ");
      if (processedLowPosition != null)
      {
        printHex(processedLowPosition);
      }
      else
      {
        System.out.println("NULL");
      }
    }
    catch(Exception e)
    {
      System.out.println("cannot detach from the inbound server: "+xsinName);
      System.out.println(e.getMessage());
      e.printStackTrace();
    }
  }
 
  // detach from the XStream Outbound Server    
  public static void detachOutbound(XStreamOut xsOut)
  {
    try
    {
      xsOut.detach(XStreamOut.DEFAULT_MODE);
    }
    catch(Exception e)
    {
      System.out.println("cannot detach from the outbound server: "+xsoutName);
      System.out.println(e.getMessage());
      e.printStackTrace();
    }       
  }
 
  public static void get_lcrs(XStreamIn xsIn, XStreamOut xsOut)
  {
    byte[] ping_pos = null;
    byte[] fetchlwm = null;
    String src_db = null;
    
    if (null == xsIn) 
    {
      System.out.println("xstreamIn is null");
      System.exit(0);
    }
 
    if (null == xsOut)
    {
      System.out.println("xstreamOut is null");
      System.exit(0);
    }
 
    try
    {
      while(true) 
      {
        // receive an LCR from outbound server
        LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE);
        fetchlwm = xsOut.getFetchLowWatermark();
    
        // save source db for ping lcr
        if (null != alcr)
          src_db = alcr.getSourceDatabaseName();
        
        if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active
        {
          assert alcr != null;
          // send the LCR to the inbound server
          xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);
 
          // also get chunk data for this LCR if any
          if (alcr instanceof RowLCR)
          {
            // receive chunk from outbound then send to inbound
            if (((RowLCR)alcr).hasChunkData())
            {
              ChunkColumnValue chunk = null; 
              do
              {
                chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE);
                xsIn.sendChunk(chunk, XStreamIn.DEFAULT_MODE);
              } while (!chunk.isEndOfRow());
            }
          }
          processedLowPosition = alcr.getPosition();
          ping_pos = processedLowPosition;          
        }
        else  // batch is end 
        {
          assert alcr == null;
 
          // send ping lcr if we haven't received any lcr in the batch
          // but we got a new fetch lwm, then send a commit lcr to
          // ping the inbound server with the new fetch LWM position
          if (null != src_db && null != fetchlwm && 
              !samePos(fetchlwm,ping_pos))
          {
            xsIn.sendLCR(createPing(src_db, fetchlwm), 
                         XStreamIn.DEFAULT_MODE);
            ping_pos = fetchlwm;
          }
 
          // flush the network
          xsIn.flush(XStreamIn.DEFAULT_MODE);
          // get the processed_low_position from inbound server
          processedLowPosition = 
              xsIn.getProcessedLowWatermark();
          // update the processed_low_position at oubound server
          if (null != processedLowPosition)
            xsOut.setProcessedLowWatermark(processedLowPosition, 
                                           XStreamOut.DEFAULT_MODE);
        }
      }
    }
    catch(Exception e)
    {
      System.out.println("exception when processing LCRs");
      System.out.println(e.getMessage());
      e.printStackTrace();
    }
  }
 
  public static void printHex(byte[] b) 
  {
    for (int i = 0; i < b.length; ++i) 
    {
      System.out.print(
        Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3));
    }
    System.out.println("");
  }    
 
  // ping lcr is used to bump up the inbound server watermark
  private static RowLCR createPing(String src_db, byte[] pos)
  {
    java.util.Date today = new java.util.Date();
    java.sql.Timestamp now = new java.sql.Timestamp(today.getTime());
    oracle.sql.DATE src_time = new oracle.sql.DATE(now);
 
 
    RowLCR alcr = new DefaultRowLCR();
    ((RowLCR)alcr).setSourceDatabaseName(src_db);
    ((RowLCR)alcr).setSourceTime(src_time);
    ((RowLCR)alcr).setPosition(pos);
    ((RowLCR)alcr).setCommandType(RowLCR.COMMIT);
    ((RowLCR)alcr).setTransactionId("Ping: " + src_time.toString());
 
    return alcr;
  }
 
  private static boolean samePos(byte[] pos1, byte[] pos2)
  {
    int   cmp_len;
    boolean    result;
 
    if (pos1.length != pos2.length)
      return false;
    
    for (int i = 0; i<pos1.length; i++)
    {
      if (pos1[i] != pos2[i])
        return false;            
    }
 
    return true;
  }
}