Oracle® Database XStream Guide 11g Release 2 (11.2) Part Number E16545-05 |
|
|
PDF · Mobi · ePub |
This chapter describes configuring the Oracle Database components that are used by XStream. This chapter also includes sample client applications that communicate with an XStream outbound server and inbound server.
This chapter contains these topics:
See Also:
This section describes preparing for an XStream configuration.
This section contains the following topics:
An XStream administrator configures and manages XStream components in an XStream Out or XStream In environment. This section describes configuring an XStream administrator by granting a user the appropriate privileges. You must configure an XStream administrator in each Oracle database included in the XStream configuration.
Before configuring an XStream administrator, ensure that the following prerequisites are met:
Ensure that you can log in to each database in the XStream configuration as an administrative user who can create users, grant privileges, and create tablespaces.
Identify a user who will be the XStream administrator. Either create a new user with the appropriate privileges or grant these privileges to an existing user.
Do not use the SYS
or SYSTEM
user as an XStream administrator, and ensure that the XStream administrator does not use the SYSTEM
tablespace as its default tablespace.
If a new tablespace is required for the XStream administrator, then ensure that there is enough disk space on each computer system in the XStream configuration for the tablespace. The recommended size of the tablespace is 25 MB.
This section makes the following assumptions:
The username of the XStream administrator is xstrmadmin
.
The tablespace used by the XStream administrator is xstream_tbs
.
To configure an XStream administrator:
In SQL*Plus, connect as an administrative user who can create users, grant privileges, and create tablespaces. Remain connected as this administrative user for all subsequent steps.
See Also:
Oracle Database Administrator's Guide for information about connecting to a database in SQL*PlusEither create a tablespace for the XStream administrator or use an existing tablespace.
This tablespace stores any objects created in the XStream administrator's schema, including any spillover of messages from the buffered queues owned by the schema.
For example, the following statement creates a new tablespace for the XStream administrator:
CREATE TABLESPACE xstream_tbs DATAFILE '/usr/oracle/dbs/xstream_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
Create a new user to act as the XStream administrator or identify an existing user.
For example, to create a user named xstrmadmin
and specify that this user uses the xstream_tbs
tablespace, run the following statement:
CREATE USER xstrmadmin IDENTIFIED BY password
DEFAULT TABLESPACE xstream_tbs
QUOTA UNLIMITED ON xstream_tbs;
Note:
Enter an appropriate password for the administrative user.See Also:
Oracle Database Security Guide for guidelines about choosing passwordsGrant the Oracle Streams administrator the DBA role:
GRANT DBA TO xstrmadmin;
Note:
TheDBA
role is required for a user to create or alter outbound servers, inbound servers, capture processes, synchronous captures, and apply processes. When the user does not need to perform these tasks, the DBA
role can be revoked from the user.Run the GRANT_ADMIN_PRIVILEGE
procedure in the DBMS_XSTREAM_AUTH
package.
A user must have explicit EXECUTE
privilege on a package to execute a subprogram in the package inside of a user-created subprogram, and a user must have explicit SELECT
privilege on a data dictionary view to query the view inside of a user-created subprogram. These privileges cannot be granted through a role. You can run the GRANT_ADMIN_PRIVILEGE
procedure to grant such privileges to the XStream administrator, or you can grant them directly.
Depending on the parameter settings for the GRANT_ADMIN_PRIVILEGE
procedure, it either grants the privileges for an XStream administrator directly, or it generates a script that you can edit and then run to grant these privileges.
Note:
Starting with Oracle Database 11g Release 2 (11.2.0.2), theDBMS_XSTREAM_AUTH
package is available.See Also:
"GRANT_ADMIN_PRIVILEGE Procedure"Use the GRANT_ADMIN_PRIVILEGE procedure to grant privileges directly:
Run the following procedure:
BEGIN DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'xstrmadmin', grant_privileges => TRUE); END; /
Use the GRANT_ADMIN_PRIVILEGE procedure to generate a script:
Complete the following steps:
Use the SQL statement CREATE
DIRECTORY
to create a directory object for the directory into which you want to generate the script. A directory object is similar to an alias for the directory. For example, to create a directory object called xstrm_dir
for the /usr/admin directory on your computer system, run the following procedure:
CREATE DIRECTORY xstrm_dir AS '/usr/admin';
Run the GRANT_ADMIN_PRIVILEGE
procedure to generate a script named grant_xstrm_privs.sql
and place this script in the /usr/admin directory on your computer system:
BEGIN DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'xstrmadmin', grant_privileges => FALSE, file_name => 'grant_xstrm_privs.sql', directory_name => 'xstrm_dir'); END; /
Notice that the grant_privileges
parameter is set to FALSE
so that the procedure does not grant the privileges directly. Also, notice that the directory object created in Step a is specified for the directory_name
parameter.
Edit the generated script if necessary and save your changes.
Execute the script in SQL*Plus:
SET ECHO ON SPOOL grant_xstrm_privs.out @/usr/admin/grant_xstrm_privs.sql SPOOL OFF
Check the spool file to ensure that all of the grants executed successfully. If there are errors, then edit the script to correct the errors and rerun it.
If necessary, grant the following additional privileges:
If you plan to use Oracle Enterprise Manager to manage databases with XStream components, then configure the XStream administrator to be a Database Control administrator. Doing so grants additional privileges required by Oracle Enterprise Manager, such as the privileges required to run Oracle Enterprise Manager jobs. See Oracle Database 2 Day DBA for instructions.
Grant the privileges for a remote XStream administrator to perform actions in the local database. Grant these privileges using the GRANT_REMOTE_ADMIN_ACCESS
procedure in the DBMS_XSTREAM_AUTH
package. Grant this privilege if a remote XStream administrator will use a database link that connects to the local XStream administrator to perform administrative actions. Specifically, grant these privileges if either of the following conditions are true:
You plan to configure a downstream capture process at a remote downstream database that captures changes originating at the local source database, and the downstream capture process will use a database link to perform administrative actions at the source database.
You plan to use a remote XStream administrator to set the instantiation system change number (SCN) values for replicated database objects at the local database.
If no apply user is specified for an inbound server, then grant the XStream administrator the necessary privileges to perform DML and DDL changes on the apply objects owned by other users. If an apply user is specified, then the apply user must have these privileges. These privileges can be granted directly or through a role.
If no apply user is specified for an inbound server, then grant the XStream administrator EXECUTE
privilege on any PL/SQL subprogram owned by another user that is executed by an inbound server. These subprograms can be used in apply handlers or error handlers. If an apply user is specified, then the apply user must have these privileges. These privileges must be granted directly. They cannot be granted through a role.
Grant the XStream administrator EXECUTE
privilege on any PL/SQL function owned by another user that is specified in a custom rule-based transformation for a rule used by a capture process, synchronous capture, propagation, outbound server, or inbound server. For a capture process or synchronous capture, if a capture user is specified, then the capture user must have these privileges. For an inbound server, if an apply user is specified, then the apply user must have these privileges. These privileges must be granted directly. They cannot be granted through a role.
Grant the XStream administrator privileges to alter database objects where appropriate. For example, if the XStream administrator must create a supplemental log group for a table in another schema, then the XStream administrator must have the necessary privileges to alter the table. These privileges can be granted directly or through a role.
If the XStream administrator does not own the queue used by a capture process, synchronous capture, propagation, outbound server, or inbound server, and is not specified as the queue user for the queue when the queue is created, then the XStream administrator must be configured as a secure queue user of the queue if you want the XStream administrator to be able to enqueue messages into or dequeue messages from the queue. The XStream administrator might also need ENQUEUE
or DEQUEUE
privileges on the queue, or both. See Oracle Streams Concepts and Administration for information about managing queues.
Grant the XStream administrator EXECUTE
privilege on any object types that the XStream administrator might need to access. These privileges can be granted directly or through a role.
If the XStream administrator will use Data Pump to perform export and import operations on database objects in other schemas during an instantiation, then grant the EXP_FULL_DATABASE
and IMP_FULL_DATABASE
roles to the XStream administrator.
If Oracle Database Vault is installed, then the user who performs the following actions must be granted the BECOME
USER
system privilege:
Creates or alters a capture process
Creates or alters an outbound server
Creates or alters an inbound server
Granting the BECOME
USER
system privilege to the user who performs these actions is not required if Oracle Database Vault is not installed. You can revoke the BECOME
USER
system privilege from the user after the completing one of these actions, if necessary.
Repeat all of the previous steps at each Oracle database in the environment that will use XStream.
This section describes the decisions to make and the tasks to complete to prepare for an XStream Out configuration.
When you configure XStream Out, you must configure XStream components to capture database changes and send these changes to the outbound server in the form of logical change records (LCRs). These components include a capture process and at least one queue. The capture process can be a local capture process or a downstream capture process. For some configurations, you must also configure a propagation.
Local capture means that a capture process runs on the source database. Downstream capture means that a capture process runs on a database other than the source database. The source database is the database where the changes were generated. The primary reason to use downstream capture is to reduce the load on the source database, thereby improving its performance. The primary reason to use a local capture is because it is easier to configure and maintain.
The database that captures changes made to the source database is called the capture database. One of the following databases can be the capture database:
Source database (local capture)
Destination database (downstream capture)
A third database (downstream capture)
If the database running the outbound server is not the capture database, then a propagation sends changes from the capture database to the database running the outbound server. If the database running the outbound server is the capture database, then this propagation between databases is not needed because the capture process and outbound server use the same queue.
You can configure the components in the following ways:
Local capture and outbound server in the same database: The database objects, capture process, and outbound server are all in the same database. This configuration is the easiest to configure and maintain because all of the components are contained in one database.
Local capture and outbound server in different databases: The database objects and capture process are in one database, and the outbound server is in another database. A propagation sends LCRs from the source database to the outbound server database. This configuration is best when you want easy configuration and maintenance and when you want to optimize the performance of the outbound server database.
Downstream capture and outbound server in the same database: The database objects are in one database, and the capture process and outbound server are in another database. This configuration is best when you want to optimize the performance of the database with the database objects and want to offload change capture to another database. With this configuration, most of the components run on the database with the outbound server.
Downstream capture and outbound server in different databases: The database objects are in one database, the outbound server is in another database, and the capture process is in a third database. This configuration is best when you want to optimize the performance of both the database with the database objects and the database with the outbound server. With this configuration, the capture process runs on a third database, and a propagation sends LCRs from the capture database to the outbound server database.
If you decide to configure a downstream capture process, then you must decide which type of downstream capture process you want to configure. The following types are available:
A real-time downstream capture process configuration means that redo transport services use the log writer process (LGWR) at the source database to send redo data to the downstream database, and a remote file server process (RFS) at the downstream database receives the redo data over the network and stores the redo data in the standby redo log.
An archived-log downstream capture process configuration means that archived redo log files from the source database are copied to the downstream database, and the capture process captures changes in these archived redo log files. These log files can be transferred automatically using redo transport services, or they can be transferred manually using a method such as FTP.
The advantage of real-time downstream capture over archived-log downstream capture is that real-time downstream capture reduces the amount of time required to capture changes made to the source database. The time is reduced because the real-time downstream capture process does not need to wait for the redo log file to be archived before it can capture changes from it. You can configure multiple real-time downstream capture processes that capture changes from the same source database, but you cannot configure real-time downstream capture for multiple source databases at one downstream database.
The advantage of archived-log downstream capture over real-time downstream capture is that archived-log downstream capture allows downstream capture processes for multiple source databases at a downstream database. You can copy redo log files from multiple source databases to a single downstream database and configure multiple archived-log downstream capture processes to capture changes in these redo log files.
Preparing for an XStream Out outbound server is similar to preparing for an Oracle Streams replication environment. The components used in an Oracle Streams replication environment to capture changes and send them to an apply process are the same components used to capture changes and send them to an outbound server. These components include a capture process and one or more queues. If the capture process runs on a different database than the outbound server, then a propagation is also required.
Several of the tasks described in this section are described in more detail in Oracle Streams Replication Administrator's Guide. This section provides an overview of each task and specific information about completing the task for an XStream Out configuration.
Ensure that the following prerequisites are met before configuring XStream Out:
If Required, Configure Network Connectivity and Database Links
If Required, Configure Log File Transfer to a Downstream Database
If Required, Add Standby Redo Logs for Real-Time Downstream Capture
To configure and manage an XStream Out configuration, create an XStream administrator on each Oracle database that is involved in the XStream Out configuration.
Network connectivity and database links are not required when all of the components run on the same database. These components include the capture process, queue, and outbound server.
You must configure network connectivity and database links if you decided to configure XStream in either of the following ways:
The capture process and the outbound server will run on different databases.
Downstream capture will be used.
See "Decide How to Configure XStream" for more information about these decisions.
If network connectivity is required, then configure your network and Oracle Net so that the databases can communicate with each other.
The following database links are required:
When the capture process runs on a different database from the outbound server, create a database link from the capture database to the outbound server database. A propagation uses this database link to send changes from the capture database to the outbound server database.
When you use downstream capture, create a database link from the capture database to the source database. The source database is the database that generates the redo data that the capture process uses to capture changes. The capture process uses this database link to perform administrative tasks at the source database.
The name of each database link must match the global name of the destination database, and each database link should be created in the XStream administrator's schema.
For example, assume that you want to create a database link in a configuration with the following characteristics:
The global name of the source database is dbs1.example.com
.
The global name of the destination database is dbs2.example.com
.
The XStream administrator is xstrmadmin
at each database.
Given these assumptions, the following statement creates a database link from dbs1.example.com
to dbs2.example.com
:
CONNECT xstrmadmin@dbs1.example.com Enter password: password CREATE DATABASE LINK dbs2.example.com CONNECT TO xstrmadmin IDENTIFIED BY password USING 'dbs2.example.com';
See Also:
Oracle Database 2 Day + Data Replication and Integration Guide for instructions about creating database links using Oracle Enterprise Manager
Oracle Database Administrator's Guide for more information about database links
Each source database that generates changes that will be captured by a capture process must be in ARCHIVELOG
mode. For downstream capture processes, the downstream database also must be in ARCHIVELOG
mode if you plan to configure a real-time downstream capture process. The downstream database does not need to be in ARCHIVELOG
mode if you plan to run only archived-log downstream capture processes on it.
If you are configuring XStream in an Oracle Real Application Clusters (Oracle RAC) environment, then the archived redo log files of all threads from all instances must be available to any instance running a capture process. This requirement pertains to both local and downstream capture processes.
See Also:
Oracle Database Administrator's Guide for instructions about running a database inARCHIVELOG
modeSome initialization parameters are important for the configuration, operation, reliability, and performance of the components in an XStream configuration. Set these parameters appropriately.
Oracle Streams Replication Administrator's Guide contains detailed information about all of the initialization parameters that are important for an Oracle Streams environment. The guidelines for setting these parameters also apply to an XStream configuration. In addition to the requirements described in Oracle Streams Replication Administrator's Guide for all Oracle Streams components, the following requirements apply to XStream outbound servers:
Ensure that the PROCESSES
initialization parameter is set to a value large enough to accommodate the outbound server background processes and all of the other Oracle Database background processes.
Ensure that the SESSIONS
initialization parameter is set to a value large enough to accommodate the sessions used by the outbound server background processes and all of the other Oracle Database sessions.
The Oracle Streams pool is a portion of memory in the System Global Area (SGA) that is used by Oracle Streams. The Oracle Streams pool stores buffered queue messages in memory, and it provides memory for capture processes and outbound servers. The Oracle Streams pool always stores LCRs captured by a capture process, and it stores LCRs and messages that are enqueued into a buffered queue by applications. Ensure that there is enough space in the Oracle Streams pool at each database to store LCRs and run the components properly.
Each outbound server requires 1 MB of memory. The Oracle Streams pool is initialized the first time an outbound server is started.
See Also:
Oracle Streams Replication Administrator's Guide for information about Oracle Streams pool requirementsIf you decided to use a local capture process, then log file transfer is not required. However, if you decided to use downstream capture that uses redo transport services to transfer archived redo log files to the downstream database automatically, then configure log file transfer from the source database to the capture database. See "Decide How to Configure XStream" for information about this decision.
See Also:
Oracle Streams Replication Administrator's Guide for instructionsIf you decided to configure real-time downstream capture, then add standby redo logs to the capture database. See "Decide How to Configure XStream" for information about this decision.
See Also:
Oracle Streams Replication Administrator's Guide for instructionsEnsure that the following prerequisites are met before configuring XStream In:
To configure and manage an XStream In configuration, create an XStream administrator on the Oracle database that will run the XStream inbound server.
Some initialization parameters are important for the configuration, operation, reliability, and performance of XStream inbound servers. Set these parameters appropriately.
Oracle Streams Replication Administrator's Guide contains detailed information about all of the initialization parameters that are important for an Oracle Streams environment. The guidelines for setting these parameters also apply to an XStream configuration. In addition to the requirements described in Oracle Streams Replication Administrator's Guide for all Oracle Streams components, the following requirements apply to XStream inbound servers:
Ensure that the PROCESSES
initialization parameter is set to a value large enough to accommodate the inbound server background processes and all of the other Oracle Database background processes.
Ensure that the SESSIONS
initialization parameter is set to a value large enough to accommodate the sessions used by the inbound server background processes and all of the other Oracle Database sessions.
The Oracle Streams pool is a portion of memory in the System Global Area (SGA) that provides memory for inbound servers. Ensure that there is enough space in the Oracle Streams pool for the inbound server to run properly. An inbound server requires 1 MB for each inbound server parallelism. For example, if parallelism is set to 10 for an inbound server, then at least 10 MB of memory is required for the inbound server. The Oracle Streams pool also must have enough space to store LCRs before they are applied. The Oracle Streams pool is initialized the first time an inbound server is started.
An outbound server in an XStream Out configuration streams Oracle database changes to a client application. The client application attaches to the outbound server using the Oracle Call Interface (OCI) or Java interface to receive these changes.
Configuring an outbound server involves creating the components that send captured database changes to the outbound server. It also involves configuring the outbound server itself, which includes specifying the connect user that the client application will use to attach to the outbound server.
This section contains these topics:
You can create an outbound server using the following procedures in the DBMS_XSTREAM_ADM
package:
The CREATE_OUTBOUND
procedure creates an outbound server, a queue, and a capture process in a single database with one procedure call.
The ADD_OUTBOUND
procedure only creates an outbound server. You must create the capture process and queue separately, and they must exist before you run the ADD_OUTBOUND
procedure. You can configure the capture process on the same database as the outbound server or on a different database.
In both cases, you must create the client application that communicates with the outbound server and receives LCRs from the outbound server.
If you require multiple outbound servers, then you can use the CREATE_OUTBOUND
procedure to create the capture process that captures database changes for the first outbound server. Next, you can run the ADD_OUTBOUND
procedure to add additional outbound servers that receive the same captured changes. The capture process can reside on the same database as the outbound servers or on a different database.
This section contains these topics:
The CREATE_OUTBOUND
procedure in the DBMS_XSTREAM_ADM
package creates a capture process, queue, and outbound server in a single database. Both the capture process and the outbound server use the queue created by the procedure. When you run the procedure, you provide the name of the new outbound server, while the procedure generates a name for the capture process and queue. If you want all of the components to run on the same database, then the CREATE_OUTBOUND
procedure is the fastest and easiest way to create an outbound server.
Before configuring XStream Out, ensure that the following prerequisites are met:
Complete the tasks described in "Prerequisites for Configuring XStream Out".
This section makes the following assumptions:
The capture process will be a local capture process, and it will run on the same database as the outbound server.
The instructions in this section can only set up the local capture and outbound server on the same database configuration described in "Decide How to Configure XStream".
The name of the outbound server is xout
.
Data manipulation language (DML) and data definition language (DDL) changes made to the oe.orders
and oe.order_items
tables are sent to the outbound server.
DML and DDL changes made to the hr
schema are sent to the outbound server.
To create an outbound server using the CREATE_OUTBOUND
procedure:
In SQL*Plus, connect to the database as the XStream administrator.
See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.
Run the CREATE_OUTBOUND
procedure.
Given the assumptions for this section, run the following CREATE_OUTBOUND
procedure:
DECLARE tables DBMS_UTILITY.UNCL_ARRAY; schemas DBMS_UTILITY.UNCL_ARRAY; BEGIN tables(1) := 'oe.orders'; tables(2) := 'oe.order_items'; schemas(1) := 'hr'; DBMS_XSTREAM_ADM.CREATE_OUTBOUND( server_name => 'xout', table_names => tables, schema_names => schemas); END; /
Running this procedure performs the following actions:
Configures supplemental logging for the oe.orders
and oe.order_items
tables and for all of the tables in the hr
schema.
Creates a queue with a system-generated name that is used by the capture process and the outbound server.
Creates and starts a capture process with a system-generated name with rule sets that instruct it to capture DML and DDL changes to the oe.orders
table, the oe.order_items
table, and the hr
schema.
Creates and starts an outbound server named xout
with rule sets that instruct it to send DML and DDL changes to the oe.orders
table, the oe.order_items
table, and the hr
schema to the client application.
Sets the current user as the connect user for the outbound server. In this example, the current user is the XStream administrator. The client application must connect to the database as the connect user to interact with the outbound server.
Tip:
To capture and send all database changes to the outbound server, specifyNULL
(the default) for the table_names
and schema_names
parameters.Create and run the client application that will connect to the outbound server and receive the LCRs. See "Sample XStream Client Application" for a sample application.
To add one or more additional outbound servers that receive LCRs from the capture process created in Step 2, follow the instructions in "Adding an Additional Outbound Server to a Capture Process Stream".
See Also:
"CREATE_OUTBOUND Procedure"The ADD_OUTBOUND
procedure in the DBMS_XSTREAM_ADM
package creates an outbound server. This procedure does not create the capture process or the queue. You must configure these components manually.
The instructions in this section can set up any of the configurations described in "Decide How to Configure XStream". However, if you chose the local capture and outbound server on the same database configuration, then it is usually easier to use the CREATE_OUTBOUND
procedure to configure all of the components simultaneously. See "Configuring Multiple XStream Out Components Using CREATE_OUTBOUND".
Before configuring XStream Out, ensure that the following prerequisites are met:
Complete the tasks described in "Prerequisites for Configuring XStream Out".
This section makes the following assumptions:
The name of the outbound server is xout
.
The queue used by the outbound server is xstrmadmin.xstream_queue
.
The source database is db1.example.com
.
DML and DDL changes made to the oe.orders
and oe.order_items
tables are sent to the outbound server.
DML and DDL changes made to the hr
schema are sent to the outbound server.
The capture process for the outbound server does not exist. (If the capture process exists, then skip Steps 1 to 3, and go to Step 4.)
To create an outbound server using the ADD_OUTBOUND
procedure:
In SQL*Plus, connect to the database that will run the capture process (the capture database) as the XStream administrator.
See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.
Create the queue that will be used by the capture process.
See Oracle Streams Replication Administrator's Guide for instructions.
Add rules to the capture process's rule sets to capture changes to the hr
schema, the oe.orders
table, and the oe.order_items
table. Do not start the capture process.
See Oracle Streams Replication Administrator's Guide for instructions.
If the capture process will run on a different database than the outbound server, then set the xout_client_exists
capture process parameter to Y
.
Setting this parameter to Y
enables the capture process to send LCRs to an outbound server.
Skip this step if the capture process will run on the same database as the outbound server. In this case, the xout_client_exists
capture process parameter will be set to Y
automatically.
See Oracle Streams Concepts and Administration for information about setting a capture process parameter. See Oracle Database PL/SQL Packages and Types Reference for information about the xout_client_exists
capture process parameter.
Connect to the source database.
The source database is the database that contains the database objects for which the capture process will capture changes. The source database and the capture database might be the same database.
Ensure that required supplemental logging is specified for the database objects at the source database.
Supplemental logging is required for the database objects for which the capture process will capture changes. If the capture database and the source database are the same database, then supplemental logging might have been specified during capture process creation.
Ensure that the following supplemental logging is specified at the source database:
Any columns at the source database that are used in a primary key in tables for which changes are processed by the outbound server must be unconditionally logged in a log group or by database supplemental logging of primary key columns.
Any columns at the source database that are used by a rule or a rule-based transformation must be unconditionally logged.
For the example in this section, ensure that supplemental logging is configured for the hr
schema, the oe.orders
table, and the oe.order_items
table.
See Oracle Streams Replication Administrator's Guide for instructions about specifying supplemental logging.
Connect to the database that will run the outbound server as the XStream administrator.
Create the queue that will be used by the outbound server.
This step is not required if the capture process and the outbound server run on the same database and use the same queue.
See Oracle Streams Replication Administrator's Guide for instructions.
Run the ADD_OUTBOUND
procedure.
Given the assumption for this section, run the following ADD_OUTBOUND
procedure:
DECLARE tables DBMS_UTILITY.UNCL_ARRAY; schemas DBMS_UTILITY.UNCL_ARRAY; BEGIN tables(1) := 'oe.orders'; tables(2) := 'oe.order_items'; schemas(1) := 'hr'; DBMS_XSTREAM_ADM.ADD_OUTBOUND( server_name => 'xout', queue_name => 'xstrmadmin.xstream_queue', source_database => 'db1.example.com', table_names => tables, schema_names => schemas); END; /
If the capture process runs on the same database as the outbound server, then specify the capture process's queue for the queue_name
parameter.
Running this procedure performs the following actions:
Creates an outbound server named xout
. The outbound server has rule sets that instruct it to send DML and DDL changes to the oe.orders
table, the oe.order_items
table, and the hr
schema to the client application. The rules specify that these changes must have originated at the db1.example.com
database. The outbound server dequeues LCRs from the queue xstrmadmin.xstream_queue
.
Sets the current user as the connect user for the outbound server. In this example, the current user is the XStream administrator. The client application must connect to the database as the connect user to interact with the outbound server.
Tip:
For the outbound server to receive all of the LCRs sent by the capture process, specifyNULL
(the default) for the table_names
and schema_names
parameters.Connect to the capture database as the XStream administrator.
Create the propagation that sends LCRs from the capture process's queue on the local database to the queue used by the outbound server on the outbound server database.
Add rules to the propagation's rule sets to send changes to the hr
schema, the oe.orders
table, and the oe.order_items
table from the source queue to the destination queue.
This step is not required if the capture process and the outbound server run on the same database and use the same queue.
See Oracle Streams Replication Administrator's Guide for instructions.
Create and run the client application that will connect to the outbound server and receive the LCRs. See "Sample XStream Client Application" for a sample application.
Connect to the database that is running outbound server as the XStream administrator.
Start the outbound server if it is disabled. For example:
exec DBMS_APPLY_ADM.START_APPLY('xout');
Connect to the capture database as the XStream administrator.
Start the capture process created in Step 3.
See Oracle Streams Concepts and Administration for instructions.
To add one or more additional outbound servers that receive LCRs from the capture process created in Step 3, follow the instructions in "Adding an Additional Outbound Server to a Capture Process Stream".
See Also:
"ADD_OUTBOUND Procedure"XStream Out configurations often require multiple outbound servers that process a stream of LCRs from a single capture process. This section describes adding an additional outbound server to a database that already includes at least one outbound server. The additional outbound server uses the same queue as another outbound server to receive the LCRs from the capture process. When an XStream Out environment exists, use the ADD_OUTBOUND
procedure in the DBMS_XSTREAM_ADM
package to add another outbound server to a capture process stream.
Before completing the steps in this section, configure an XStream Out environment that includes at least one outbound server. The following sections describe configuring and XStream Out environment:
This section makes the following assumptions:
The name of the outbound server is xout2
.
The queue used by the outbound server is xstrmadmin.xstream_queue
.
DML and DDL changes made to the oe.orders
and oe.order_items
tables are sent to the outbound server.
DML and DDL changes made to the hr
schema are sent to the outbound server.
The source database for the database changes is db1.example.com
.
To add another outbound server to a capture process stream using the ADD_OUTBOUND
procedure:
In SQL*Plus, connect to the database that will run the additional outbound server as the XStream administrator.
See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.
Determine the name of the queue used by an existing outbound server that receives LCRs from the capture process.
Run the query in "Displaying General Information About an Outbound Server" to determine the owner and name of the queue. This query also shows the name of the capture process and the source database name.
Run the ADD_OUTBOUND
procedure.
Given the assumptions for this section, run the following ADD_OUTBOUND
procedure:
DECLARE tables DBMS_UTILITY.UNCL_ARRAY; schemas DBMS_UTILITY.UNCL_ARRAY; BEGIN tables(1) := 'oe.orders'; tables(2) := 'oe.order_items'; schemas(1) := 'hr'; DBMS_XSTREAM_ADM.ADD_OUTBOUND( server_name => 'xout2', queue_name => 'xstrmadmin.xstream_queue', source_database => 'db1.example.com', table_names => tables, schema_names => schemas); END; /
Running this procedure performs the following actions:
Creates an outbound server named xout2
. The outbound server has rule sets that instruct it to send DML and DDL changes to the oe.orders
table, the oe.order_items
table, and the hr
schema to the client application. The rules specify that these changes must have originated at the db1.example.com
database. The outbound server dequeues LCRs from the queue xstrmadmin.xstream_queue
.
Sets the current user as the connect user for the outbound server. In this example, the current user is the XStream administrator. The client application must connect to the database as the connect user to interact with the outbound server.
Tip:
For the outbound server to receive all of the LCRs sent by the capture process, specifyNULL
(the default) for the table_names
and schema_names
parameters.If a client application does not exist, then create and run the client application that will connect to the outbound server and receive the LCRs. See "Sample XStream Client Application" for a sample application.
See Also:
"ADD_OUTBOUND Procedure"An inbound server in an XStream In configuration receives a stream of changes from a client application. The inbound server can apply these changes to database objects in an Oracle database, or it can process the changes in a customized way. A client application can attach to an inbound server and send row changes and DDL changes encapsulated in LCRs using the OCI or Java interface.
The CREATE_INBOUND
procedure in the DBMS_XSTREAM_ADM
package creates an inbound server. You must create the client application that communicates with the inbound server and sends LCRs to the inbound server.
Before configuring XStream In, ensure that the following prerequisites are met:
Complete the tasks described in "Preparing for XStream In".
This section makes the following assumptions:
The name of the outbound server is xin
.
The inbound server applies all of the changes it receives from the XStream client application.
The queue used by the outbound server is xstrmadmin.xin_queue
.
To create an inbound server:
In SQL*Plus, connect to the database that will run the inbound server as the XStream administrator.
See Oracle Database Administrator's Guide for information about connecting to a database in SQL*Plus.
Run the CREATE_INBOUND
procedure.
For example, the following CREATE_INBOUND
procedure configures an inbound server named xin
:
BEGIN DBMS_XSTREAM_ADM.CREATE_INBOUND( server_name => 'xin', queue_name => 'xin_queue'); END; /
Running this procedure performs the following actions:
Creates an inbound server named xin
.
Sets the queue with the name xin_queue
as the inbound server's queue, and creates this queue if it does not exist. This queue does not store LCRs sent by the client application. Instead, it stores error transactions if an LCR raises an error. The current user is the queue owner. In this example, the current user is the XStream administrator.
Sets the current user as the apply user for the inbound server. In this example, the current user is the XStream administrator. The client application must connect to the database as the apply user to interact with the inbound server.
Tip:
By default, an inbound server does not use rules or rule sets. Therefore, it processes all LCRs sent to it by the client application. To add rules and rule sets, use theDBMS_STREAMS_ADM
package or the DBMS_RULE_ADM
package. See Oracle Streams Concepts and Administration.If necessary, create apply handlers for the inbound server.
Apply handlers are optional. Apply handlers process LCRs sent to an inbound server in a customized way.
Create and run the client application that will connect to the inbound server and send LCRs to it.
See "Sample XStream Client Application" for a sample application.
If the inbound server is disabled, then start the inbound server.
For example, enter the following:
exec DBMS_APPLY_ADM.START_APPLY('xin');
See Also:
"CREATE_INBOUND Procedure"This section contains a sample XStream client application. This application illustrates the basic tasks that are required of an XStream Out and XStream In application.
The application performs the following tasks:
It attaches to an XStream outbound server and inbound server and waits for LCRs from the outbound server. The outbound server and inbound server are in two different databases.
When it receives an LCR from the outbound server, it immediately sends the LCR to the inbound server.
It periodically gets the processed low position from the inbound server and sends this value to the outbound server.
It periodically sends a "ping" LCR from the outbound server to the inbound server to move the inbound server's processed low position forward in times of low activity.
In an XStream Out configuration that does not send LCRs to an inbound server, the client application must obtain the processed low position in another way.
This application waits indefinitely for transactions from the outbound server. To interrupt the application, enter the interrupt command for your operating system. For example, the interrupt command on some operating systems is control-C
. If the program is restarted, then the outbound server starts sending LCRs from the processed low position that was set during the previous run.
Figure 3-1 provides an overview of the XStream environment configured in this section.
Before running the sample application, ensure that the following components exist:
Two Oracle databases with network connectivity between them
An XStream administrator on both databases
An outbound server configuration on one database, including a capture process, queue, and outbound server
An inbound server configuration on another database
The sample applications in the following sections perform the same tasks. One sample application uses the OCI API, and the other uses the Java API.
Note:
An Oracle Database installation includes several XStream demos. These demos are in the following location:$ORACLE_HOME/rdbms/demo/xstream
See Also:
To run the sample XStream client application for the OCI API, compile and link the application file, and enter the following on a command line:
xio -ob_svr xout_name -ob_db sn_xout_db -ob_usr xout_cu -ob_pwd xout_cu_pass -ib_svr xin_name -ib_db sn_xin_db -ib_usr xin_au -ib_pwd xin_au_pass
Substitute the appropriate values for the following placeholders:
xout_name is the name of the outbound server.
sn_xout_db is the service name for the outbound server's database.
xout_cu is the outbound server's connect user.
xout_cu_pass is the password for the outbound server's connect user.
xin_name is the name of the inbound server.
sn_xin_db is the service name for the inbound server's database.
xin_au is the inbound server's apply user.
xin_au_pass is the password for the inbound server's apply user.
When the sample client application is running, it prints information about the row LCRs it is processing. The output looks similar to the following:
----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=UPDATE txid=17.0.74 owner=HR oname=COUNTRIES ----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=COMMIT txid=17.0.74 ----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=UPDATE txid=12.25.77 owner=OE oname=ORDERS ----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=UPDATE txid=12.25.77 owner=OE oname=ORDERS
This output contains the following information for each row LCR:
src_db_name
shows the source database for the change encapsulated in the row LCR.
cmd_type
shows the type of SQL statement that made the change.
txid
shows the transaction ID of the transaction that includes the row LCR.
owner
shows the owner of the database object that was changed.
oname
shows the name of the database object that was changed.
This demo is available in the following location:
$ORACLE_HOME/rdbms/demo/xstream/oci
The file name for the demo is xio.c
. See the README.txt
file in the demo directory for more information about compiling and running the application.
The code for the sample application that uses the OCI API follows:
#ifndef OCI_ORACLE #include <oci.h> #endif #ifndef _STDIO_H #include <stdio.h> #endif #ifndef _STDLIB_H #include <stdlib.h> #endif #ifndef _STRING_H #include <string.h> #endif #ifndef _MALLOC_H #include <malloc.h> #endif /*---------------------------------------------------------------------- * Internal structures *----------------------------------------------------------------------*/ #define M_DBNAME_LEN (128) typedef struct conn_info /* connect info */ { oratext * user; ub4 userlen; oratext * passw; ub4 passwlen; oratext * dbname; ub4 dbnamelen; oratext * svrnm; ub4 svrnmlen; } conn_info_t; typedef struct params { conn_info_t xout; /* outbound info */ conn_info_t xin; /* inbound info */ } params_t; typedef struct oci /* OCI handles */ { OCIEnv *envp; /* Environment handle */ OCIError *errp; /* Error handle */ OCIServer *srvp; /* Server handle */ OCISvcCtx *svcp; /* Service handle */ OCISession *authp; OCIStmt *stmtp; boolean attached; boolean outbound; } oci_t; static void connect_db(conn_info_t *opt_params_p, oci_t ** ocip, ub2 char_csid, ub2 nchar_csid); static void disconnect_db(oci_t * ocip); static void ocierror(oci_t * ocip, char * msg); static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound); static void detach(oci_t *ocip); static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip); static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip); static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, oratext **src_db_name, ub2 *src_db_namel); static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty); static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, int argc, char ** argv); static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, ub2 *nchar_csid); static void set_client_charset(oci_t *outbound_ocip); #define OCICALL(ocip, function) do {\ sword status=function;\ if (OCI_SUCCESS==status) break;\ else if (OCI_ERROR==status) \ {ocierror(ocip, (char *)"OCI_ERROR");\ exit(1);}\ else {printf("Error encountered %d\n", status);\ exit(1);}\ } while(0) /*--------------------------------------------------------------------- * M A I N P R O G R A M *---------------------------------------------------------------------*/ main(int argc, char **argv) { /* Outbound and inbound connection info */ conn_info_t xout_params; conn_info_t xin_params; oci_t *xout_ocip = (oci_t *)NULL; oci_t *xin_ocip = (oci_t *)NULL; ub2 obdb_char_csid = 0; /* outbound db char csid */ ub2 obdb_nchar_csid = 0; /* outbound db nchar csid */ /* parse command line arguments */ get_inputs(&xout_params, &xin_params, argc, argv); /* Get the outbound database CHAR and NCHAR character set info */ get_db_charsets(&xout_params, &obdb_char_csid, &obdb_nchar_csid); /* Connect to the outbound db and set the client env to the outbound charsets * to minimize character conversion when transferring LCRs from outbound * directly to inbound server. */ connect_db(&xout_params, &xout_ocip, obdb_char_csid, obdb_nchar_csid); /* Attach to outbound server */ attach(xout_ocip, &xout_params, TRUE); /* connect to inbound db and set the client charsets the same as the * outbound db charsets. */ connect_db(&xin_params, &xin_ocip, obdb_char_csid, obdb_nchar_csid); /* Attach to inbound server */ attach(xin_ocip, &xin_params, FALSE); /* Get lcrs from outbound server and send to inbound server */ get_lcrs(xin_ocip, xout_ocip); /* Detach from XStream servers */ detach(xout_ocip); detach(xin_ocip); /* Disconnect from both databases */ disconnect_db(xout_ocip); disconnect_db(xin_ocip); free(xout_ocip); free(xin_ocip); exit (0); } /*--------------------------------------------------------------------- * connect_db - Connect to the database and set the env to the given * char and nchar character set ids. *---------------------------------------------------------------------*/ static void connect_db(conn_info_t *params_p, oci_t **ociptr, ub2 char_csid, ub2 nchar_csid) { oci_t *ocip; printf ("Connect to Oracle as %.*s@%.*s ", params_p->userlen, params_p->user, params_p->dbnamelen, params_p->dbname); if (char_csid && nchar_csid) printf ("using char csid=%d and nchar csid=%d", char_csid, nchar_csid); printf("\n"); ocip = (oci_t *)malloc(sizeof(oci_t)); if (OCIEnvNlsCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0, (size_t) 0, (dvoid **) 0, char_csid, nchar_csid)) { ocierror(ocip, (char *)"OCIEnvCreate() failed"); } if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp, (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0)) { ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed"); } /* Logon to database */ OCICALL(ocip, OCILogon(ocip->envp, ocip->errp, &ocip->svcp, params_p->user, params_p->userlen, params_p->passw, params_p->passwlen, params_p->dbname, params_p->dbnamelen)); /* allocate the server handle */ OCICALL(ocip, OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->srvp, OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0)); OCICALL(ocip, OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp, (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)); if (*ociptr == (oci_t *)NULL) { *ociptr = ocip; } } /*--------------------------------------------------------------------- * get_db_charsets - Get the database CHAR and NCHAR character set ids. *---------------------------------------------------------------------*/ static const oratext GET_DB_CHARSETS[] = \ "select parameter, value from nls_database_parameters where parameter = \ 'NLS_CHARACTERSET' or parameter = 'NLS_NCHAR_CHARACTERSET'"; #define PARM_BUFLEN (30) static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, ub2 *nchar_csid) { OCIDefine *defnp1 = (OCIDefine *) NULL; OCIDefine *defnp2 = (OCIDefine *) NULL; oratext parm[PARM_BUFLEN]; oratext value[OCI_NLS_MAXBUFSZ]; ub2 parm_len = 0; ub2 value_len = 0; oci_t ocistruct; oci_t *ocip = &ocistruct; *char_csid = 0; *nchar_csid = 0; memset (ocip, 0, sizeof(ocistruct)); if (OCIEnvCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0, (size_t) 0, (dvoid **) 0)) { ocierror(ocip, (char *)"OCIEnvCreate() failed"); } if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp, (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0)) { ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed"); } OCICALL(ocip, OCILogon(ocip->envp, ocip->errp, &ocip->svcp, params_p->user, params_p->userlen, params_p->passw, params_p->passwlen, params_p->dbname, params_p->dbnamelen)); OCICALL(ocip, OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp, (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)); /* Execute stmt to select the db nls char and nchar character set */ OCICALL(ocip, OCIStmtPrepare(ocip->stmtp, ocip->errp, (CONST text *)GET_DB_CHARSETS, (ub4)strlen((char *)GET_DB_CHARSETS), (ub4)OCI_NTV_SYNTAX, (ub4)OCI_DEFAULT)); OCICALL(ocip, OCIDefineByPos(ocip->stmtp, &defnp1, ocip->errp, (ub4) 1, parm, PARM_BUFLEN, SQLT_CHR, (void*) 0, &parm_len, (ub2 *)0, OCI_DEFAULT)); OCICALL(ocip, OCIDefineByPos(ocip->stmtp, &defnp2, ocip->errp, (ub4) 2, value, OCI_NLS_MAXBUFSZ, SQLT_CHR, (void*) 0, &value_len, (ub2 *)0, OCI_DEFAULT)); OCICALL(ocip, OCIStmtExecute(ocip->svcp, ocip->stmtp, ocip->errp, (ub4)0, (ub4)0, (const OCISnapshot *)0, (OCISnapshot *)0, (ub4)OCI_DEFAULT)); while (OCIStmtFetch(ocip->stmtp, ocip->errp, 1, OCI_FETCH_NEXT, OCI_DEFAULT) == OCI_SUCCESS) { value[value_len] = '\0'; if (parm_len == strlen("NLS_CHARACTERSET") && !memcmp(parm, "NLS_CHARACTERSET", parm_len)) { *char_csid = OCINlsCharSetNameToId(ocip->envp, value); printf("Outbound database NLS_CHARACTERSET = %.*s (csid = %d) \n", value_len, value, *char_csid); } else if (parm_len == strlen("NLS_NCHAR_CHARACTERSET") && !memcmp(parm, "NLS_NCHAR_CHARACTERSET", parm_len)) { *nchar_csid = OCINlsCharSetNameToId(ocip->envp, value); printf("Outbound database NLS_NCHAR_CHARACTERSET = %.*s (csid = %d) \n", value_len, value, *nchar_csid); } } disconnect_db(ocip); } /*--------------------------------------------------------------------- * attach - Attach to XStream server specified in connection info *---------------------------------------------------------------------*/ static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound) { sword err; printf ("Attach to XStream %s server '%.*s'\n", outbound ? "outbound" : "inbound", conn->svrnmlen, conn->svrnm); if (outbound) { OCICALL(ocip, OCIXStreamOutAttach(ocip->svcp, ocip->errp, conn->svrnm, (ub2)conn->svrnmlen, (ub1 *)0, 0, OCI_DEFAULT)); } else { OCICALL(ocip, OCIXStreamInAttach(ocip->svcp, ocip->errp, conn->svrnm, (ub2)conn->svrnmlen, (oratext *)"From_XOUT", 9, (ub1 *)0, 0, OCI_DEFAULT)); } ocip->attached = TRUE; ocip->outbound = outbound; } /*--------------------------------------------------------------------- * ping_svr - Ping inbound server by sending a commit LCR. *---------------------------------------------------------------------*/ static void ping_svr(oci_t *xin_ocip, void *commit_lcr, ub1 *cmtpos, ub2 cmtpos_len, oratext *source_db, ub2 source_db_len) { OCIDate src_time; oratext txid[128]; OCICALL(xin_ocip, OCIDateSysDate(xin_ocip->errp, &src_time)); sprintf(txid, "Ping %2d:%2d:%2d", src_time.OCIDateTime.OCITimeHH, src_time.OCIDateTime.OCITimeMI, src_time.OCIDateTime.OCITimeSS); /* Initialize LCR with new txid and commit position */ OCICALL(xin_ocip, OCILCRHeaderSet(xin_ocip->svcp, xin_ocip->errp, source_db, source_db_len, (oratext *)OCI_LCR_ROW_CMD_COMMIT, (ub2)strlen(OCI_LCR_ROW_CMD_COMMIT), (oratext *)0, 0, /* null owner */ (oratext *)0, 0, /* null object */ (ub1 *)0, 0, /* null tag */ txid, (ub2)strlen((char *)txid), &src_time, cmtpos, cmtpos_len, 0, commit_lcr, OCI_DEFAULT)); /* Send commit lcr to inbound server. */ if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, commit_lcr, OCI_LCR_XROW, 0, OCI_DEFAULT) == OCI_ERROR) { ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed in ping_svr()"); } } /*--------------------------------------------------------------------- * get_lcrs - Get LCRs from outbound server and send to inbound server. *---------------------------------------------------------------------*/ static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip) { sword status = OCI_SUCCESS; void *lcr; ub1 lcrtype; oraub8 flag; ub1 proclwm[OCI_LCR_MAX_POSITION_LEN]; ub2 proclwm_len = 0; ub1 sv_pingpos[OCI_LCR_MAX_POSITION_LEN]; ub2 sv_pingpos_len = 0; ub1 fetchlwm[OCI_LCR_MAX_POSITION_LEN]; ub2 fetchlwm_len = 0; void *commit_lcr = (void *)0; oratext *lcr_srcdb = (oratext *)0; ub2 lcr_srcdb_len = 0; oratext source_db[M_DBNAME_LEN]; ub2 source_db_len = 0; ub4 lcrcnt = 0; /* create an lcr to ping the inbound server periodically by sending a * commit lcr. */ commit_lcr = (void*)0; OCICALL(xin_ocip, OCILCRNew(xin_ocip->svcp, xin_ocip->errp, OCI_DURATION_SESSION, OCI_LCR_XROW, &commit_lcr, OCI_DEFAULT)); while (status == OCI_SUCCESS) { lcrcnt = 0; /* reset lcr count before each batch */ while ((status = OCIXStreamOutLCRReceive(xout_ocip->svcp, xout_ocip->errp, &lcr, &lcrtype, &flag, fetchlwm, &fetchlwm_len, OCI_DEFAULT)) == OCI_STILL_EXECUTING) { lcrcnt++; /* print header of LCR just received */ print_lcr(xout_ocip, lcr, lcrtype, &lcr_srcdb, &lcr_srcdb_len); /* save the source db to construct ping lcr later */ if (!source_db_len && lcr_srcdb_len) { memcpy(source_db, lcr_srcdb, lcr_srcdb_len); source_db_len = lcr_srcdb_len; } /* send the LCR just received */ if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, lcr, lcrtype, flag, OCI_DEFAULT) == OCI_ERROR) { ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed"); } /* If LCR has chunked columns (i.e, has LOB/Long/XMLType columns) */ if (flag & OCI_XSTREAM_MORE_ROW_DATA) { /* receive and send chunked columns */ get_chunks(xin_ocip, xout_ocip); } } if (status == OCI_ERROR) ocierror(xout_ocip, (char *)"OCIXStreamOutLCRReceive failed"); /* clear the saved ping position if we just received some new lcrs */ if (lcrcnt) { sv_pingpos_len = 0; } /* If no lcrs received during previous WHILE loop and got a new fetch * LWM then send a commit lcr to ping the inbound server with the new * fetch LWM position. */ else if (fetchlwm_len > 0 && source_db_len > 0 && (fetchlwm_len != sv_pingpos_len || memcmp(sv_pingpos, fetchlwm, fetchlwm_len))) { /* To ensure we don't send multiple lcrs with duplicate position, send * a new ping only if we have saved the last ping position. */ if (sv_pingpos_len > 0) { ping_svr(xin_ocip, commit_lcr, fetchlwm, fetchlwm_len, source_db, source_db_len); } /* save the position just sent to inbound server */ memcpy(sv_pingpos, fetchlwm, fetchlwm_len); sv_pingpos_len = fetchlwm_len; } /* flush inbound network to flush all lcrs to inbound server */ OCICALL(xin_ocip, OCIXStreamInFlush(xin_ocip->svcp, xin_ocip->errp, OCI_DEFAULT)); /* get processed LWM of inbound server */ OCICALL(xin_ocip, OCIXStreamInProcessedLWMGet(xin_ocip->svcp, xin_ocip->errp, proclwm, &proclwm_len, OCI_DEFAULT)); if (proclwm_len > 0) { /* Set processed LWM for outbound server */ OCICALL(xout_ocip, OCIXStreamOutProcessedLWMSet(xout_ocip->svcp, xout_ocip->errp, proclwm, proclwm_len, OCI_DEFAULT)); } } if (status != OCI_SUCCESS) ocierror(xout_ocip, (char *)"get_lcrs() encounters error"); } /*--------------------------------------------------------------------- * get_chunks - Get each chunk for the current LCR and send it to * the inbound server. *---------------------------------------------------------------------*/ static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip) { oratext *colname; ub2 colname_len; ub2 coldty; oraub8 col_flags; ub2 col_csid; ub4 chunk_len; ub1 *chunk_ptr; oraub8 row_flag; sword err; sb4 rtncode; do { /* Get a chunk from outbound server */ OCICALL(xout_ocip, OCIXStreamOutChunkReceive(xout_ocip->svcp, xout_ocip->errp, &colname, &colname_len, &coldty, &col_flags, &col_csid, &chunk_len, &chunk_ptr, &row_flag, OCI_DEFAULT)); /* print chunked column info */ printf( " Chunked column name=%.*s DTY=%d chunk len=%d csid=%d col_flag=0x%x\n", colname_len, colname, coldty, chunk_len, col_csid, col_flags); /* print chunk data */ print_chunk(chunk_ptr, chunk_len, coldty); /* Send the chunk just received to inbound server */ OCICALL(xin_ocip, OCIXStreamInChunkSend(xin_ocip->svcp, xin_ocip->errp, colname, colname_len, coldty, col_flags, col_csid, chunk_len, chunk_ptr, row_flag, OCI_DEFAULT)); } while (row_flag & OCI_XSTREAM_MORE_ROW_DATA); } /*--------------------------------------------------------------------- * print_chunk - Print chunked column information. Only print the first * 50 bytes for each chunk. *---------------------------------------------------------------------*/ static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty) { #define MAX_PRINT_BYTES (50) /* print max of 50 bytes per chunk */ ub4 print_bytes; if (chunk_len == 0) return; print_bytes = chunk_len > MAX_PRINT_BYTES ? MAX_PRINT_BYTES : chunk_len; printf(" Data = ", chunk_len); if (dty == SQLT_CHR) printf("%.*s", print_bytes, chunk_ptr); else { ub2 idx; for (idx = 0; idx < print_bytes; idx++) printf("%02x", chunk_ptr[idx]); } printf("\n"); } /*--------------------------------------------------------------------- * print_lcr - Print header information of given lcr. *---------------------------------------------------------------------*/ static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, oratext **src_db_name, ub2 *src_db_namel) { oratext *cmd_type; ub2 cmd_type_len; oratext *owner; ub2 ownerl; oratext *oname; ub2 onamel; oratext *txid; ub2 txidl; sword ret; printf("\n ----------- %s LCR Header -----------------\n", lcrtype == OCI_LCR_XDDL ? "DDL" : "ROW"); /* Get LCR Header information */ ret = OCILCRHeaderGet(ocip->svcp, ocip->errp, src_db_name, src_db_namel, /* source db */ &cmd_type, &cmd_type_len, /* command type */ &owner, &ownerl, /* owner name */ &oname, &onamel, /* object name */ (ub1 **)0, (ub2 *)0, /* lcr tag */ &txid, &txidl, (OCIDate *)0, /* txn id & src time */ (ub2 *)0, (ub2 *)0, /* OLD/NEW col cnts */ (ub1 **)0, (ub2 *)0, /* LCR position */ (oraub8*)0, lcrp, OCI_DEFAULT); if (ret != OCI_SUCCESS) ocierror(ocip, (char *)"OCILCRHeaderGet failed"); else { printf(" src_db_name=%.*s\n cmd_type=%.*s txid=%.*s\n", *src_db_namel, *src_db_name, cmd_type_len, cmd_type, txidl, txid ); if (ownerl > 0) printf(" owner=%.*s oname=%.*s \n", ownerl, owner, onamel, oname); } } /*--------------------------------------------------------------------- * detach - Detach from XStream server *---------------------------------------------------------------------*/ static void detach(oci_t * ocip) { sword err = OCI_SUCCESS; printf ("Detach from XStream %s server\n", ocip->outbound ? "outbound" : "inbound" ); if (ocip->outbound) { OCICALL(ocip, OCIXStreamOutDetach(ocip->svcp, ocip->errp, OCI_DEFAULT)); } else { OCICALL(ocip, OCIXStreamInDetach(ocip->svcp, ocip->errp, (ub1 *)0, (ub2 *)0, /* processed LWM */ OCI_DEFAULT)); } } /*--------------------------------------------------------------------- * disconnect_db - Logoff from the database *---------------------------------------------------------------------*/ static void disconnect_db(oci_t * ocip) { if (OCILogoff(ocip->svcp, ocip->errp)) { ocierror(ocip, (char *)"OCILogoff() failed"); } if (ocip->errp) OCIHandleFree((dvoid *) ocip->errp, (ub4) OCI_HTYPE_ERROR); if (ocip->envp) OCIHandleFree((dvoid *) ocip->envp, (ub4) OCI_HTYPE_ENV); } /*--------------------------------------------------------------------- * ocierror - Print error status and exit program *---------------------------------------------------------------------*/ static void ocierror(oci_t * ocip, char * msg) { sb4 errcode=0; text bufp[4096]; if (ocip->errp) { OCIErrorGet((dvoid *) ocip->errp, (ub4) 1, (text *) NULL, &errcode, bufp, (ub4) 4096, (ub4) OCI_HTYPE_ERROR); printf("%s\n%s", msg, bufp); } else puts(msg); printf ("\n"); exit(1); } /*-------------------------------------------------------------------- * print_usage - Print command usage *---------------------------------------------------------------------*/ static void print_usage(int exitcode) { puts("\nUsage: xio -ob_svr <outbound_svr> -ob_db <outbound_db>\n" " -ob_usr <conn_user> -ob_pwd <conn_user_pwd>\n" " -ib_svr <inbound_svr> -ib_db <inbound_db>\n" " -ib_usr <apply_user> -ib_pwd <apply_user_pwd>\n"); puts(" ob_svr : outbound server name\n" " ob_db : database name of outbound server\n" " ob_usr : connect user to outbound server\n" " ob_pwd : password of outbound's connect user\n" " ib_svr : inbound server name\n" " ib_db : database name of inbound server\n" " ib_usr : apply user for inbound server\n" " ib_pwd : password of inbound's apply user\n"); exit(exitcode); } /*-------------------------------------------------------------------- * get_inputs - Get user inputs from command line *---------------------------------------------------------------------*/ static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, int argc, char ** argv) { char * option; char * value; memset (xout_params, 0, sizeof(*xout_params)); memset (xin_params, 0, sizeof(*xin_params)); while(--argc) { /* get the option name */ argv++; option = *argv; /* check that the option begins with a "-" */ if (!strncmp(option, (char *)"-", 1)) { option ++; } else { printf("Error: bad argument '%s'\n", option); print_usage(1); } /* get the value of the option */ --argc; argv++; value = *argv; if (!strncmp(option, (char *)"ob_db", 5)) { xout_params->dbname = (oratext *)value; xout_params->dbnamelen = strlen(value); } else if (!strncmp(option, (char *)"ob_usr", 6)) { xout_params->user = (oratext *)value; xout_params->userlen = strlen(value); } else if (!strncmp(option, (char *)"ob_pwd", 6)) { xout_params->passw = (oratext *)value; xout_params->passwlen = strlen(value); } else if (!strncmp(option, (char *)"ob_svr", 6)) { xout_params->svrnm = (oratext *)value; xout_params->svrnmlen = strlen(value); } else if (!strncmp(option, (char *)"ib_db", 5)) { xin_params->dbname = (oratext *)value; xin_params->dbnamelen = strlen(value); } else if (!strncmp(option, (char *)"ib_usr", 6)) { xin_params->user = (oratext *)value; xin_params->userlen = strlen(value); } else if (!strncmp(option, (char *)"ib_pwd", 6)) { xin_params->passw = (oratext *)value; xin_params->passwlen = strlen(value); } else if (!strncmp(option, (char *)"ib_svr", 6)) { xin_params->svrnm = (oratext *)value; xin_params->svrnmlen = strlen(value); } else { printf("Error: unknown option '%s'.\n", option); print_usage(1); } } /* print usage and exit if any argument is not specified */ if (!xout_params->svrnmlen || !xout_params->passwlen || !xout_params->userlen || !xout_params->dbnamelen || !xin_params->svrnmlen || !xin_params->passwlen || !xin_params->userlen || !xin_params->dbnamelen) { printf("Error: missing command arguments. \n"); print_usage(1); } }
To run the sample XStream client application for the Java API, compile and link the application file, and enter the following on a command line:
java xio xsin_oraclesid xsin_host xsin_port xsin_username xsin_passwd xin_servername xsout_oraclesid xsout_host xsout_port xsout_username xsout_passwd xsout_servername
Substitute the appropriate values for the following placeholders:
xsin_oraclesid is the Oracle SID of the inbound server's database.
xsin_host is the host name of the computer system running the inbound server.
xsin_port is the port number of the listener for the inbound server's database.
xsin_username is the inbound server's apply user.
xsin_passwd is the password for the inbound server's apply user.
xin_servername is the name of the inbound server.
xsout_oraclesid is the Oracle SID of the outbound server's database.
xsout_host is the host name of the computer system running the outbound server.
xsout_port is the port number of the listener for the outbound server's database.
xsout_username is the outbound server's connect user.
xsout_passwd is the password for the outbound server's connect user.
xsout_servername is the name of the outbound server.
When the sample client application is running, it prints information about attaching to the inbound server and outbound server, along with the last position for each server. The output looks similar to the following:
xsin_host = server2.example.com xsin_port = 1482 xsin_ora_sid = db2 xsin connection url: jdbc:oracle:oci:@server2.example.com:1482:db2 xsout_host = server1.example.com xsout_port = 1481 xsout_ora_sid = db1 xsout connection url: jdbc:oracle:oci:@server1.example.com:1481:db1 Attached to inbound server:xin Inbound Server Last Position is: 0000000920250000000100000001000000092025000000010000000101 Attached to outbound server:xout Last Position is: 0000000920250000000100000001000000092025000000010000000101
This demo is available in the following location:
$ORACLE_HOME/rdbms/demo/xstream/java
The file name for the demo is xio.java
. See the README.txt
file in the demo directory for more information about compiling and running the application.
The code for the sample application that uses the Java API follows:
import oracle.streams.*; import oracle.jdbc.internal.OracleConnection; import oracle.jdbc.*; import oracle.sql.*; import java.sql.*; import java.util.*; public class xio { public static String xsinusername = null; public static String xsinpasswd = null; public static String xsinName = null; public static String xsoutusername = null; public static String xsoutpasswd = null; public static String xsoutName = null; public static String in_url = null; public static String out_url = null; public static Connection in_conn = null; public static Connection out_conn = null; public static XStreamIn xsIn = null; public static XStreamOut xsOut = null; public static byte[] lastPosition = null; public static byte[] processedLowPosition = null; public static void main(String args[]) { // get connection url to inbound and outbound server in_url = parseXSInArguments(args); out_url = parseXSOutArguments(args); // create connection to inbound and outbound server in_conn = createConnection(in_url, xsinusername, xsinpasswd); out_conn = createConnection(out_url, xsoutusername, xsoutpasswd); // attach to inbound and outbound server xsIn = attachInbound(in_conn); xsOut = attachOutbound(out_conn); // main loop to get lcrs get_lcrs(xsIn, xsOut); // detach from inbound and outbound server detachInbound(xsIn); detachOutbound(xsOut); } // parse the arguments to get the conncetion url to inbound db public static String parseXSInArguments(String args[]) { String trace, pref; String orasid, host, port; if (args.length != 12) { printUsage(); System.exit(0); } orasid = args[0]; host = args[1]; port = args[2]; xsinusername = args[3]; xsinpasswd = args[4]; xsinName = args[5]; System.out.println("xsin_host = "+host); System.out.println("xsin_port = "+port); System.out.println("xsin_ora_sid = "+orasid); String in_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid; System.out.println("xsin connection url: "+ in_url); return in_url; } // parse the arguments to get the conncetion url to outbound db public static String parseXSOutArguments(String args[]) { String trace, pref; String orasid, host, port; if (args.length != 12) { printUsage(); System.exit(0); } orasid = args[6]; host = args[7]; port = args[8]; xsoutusername = args[9]; xsoutpasswd = args[10]; xsoutName = args[11]; System.out.println("xsout_host = "+host); System.out.println("xsout_port = "+port); System.out.println("xsout_ora_sid = "+orasid); String out_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid; System.out.println("xsout connection url: "+ out_url); return out_url; } // print out sample program usage message public static void printUsage() { System.out.println(""); System.out.println("Usage: java xio "+"<xsin_oraclesid> " + "<xsin_host> " + "<xsin_port> "); System.out.println(" "+"<xsin_username> " + "<xsin_passwd> " + "<xsin_servername> "); System.out.println(" "+"<xsout_oraclesid> " + "<xsout_host> " + "<xsout_port> "); System.out.println(" "+"<xsout_username> " + "<xsout_passwd> " + "<xsout_servername> "); } // create a connection to an Oracle Database public static Connection createConnection(String url, String username, String passwd) { try { DriverManager.registerDriver(new oracle.jdbc.OracleDriver()); return DriverManager.getConnection(url, username, passwd); } catch(Exception e) { System.out.println("fail to establish DB connection to: " +url); e.printStackTrace(); return null; } } // attach to the XStream Inbound Server public static XStreamIn attachInbound(Connection in_conn) { XStreamIn xsIn = null; try { xsIn = XStreamIn.attach((OracleConnection)in_conn, xsinName, "XSDEMOINCLIENT" , XStreamIn.DEFAULT_MODE); // use last position to decide where should we start sending LCRs lastPosition = xsIn.getLastPosition(); System.out.println("Attached to inbound server:"+xsinName); System.out.print("Inbound Server Last Position is: "); if (null == lastPosition) { System.out.println("null"); } else { printHex(lastPosition); } return xsIn; } catch(Exception e) { System.out.println("cannot attach to inbound server: "+xsinName); System.out.println(e.getMessage()); e.printStackTrace(); return null; } } // attach to the XStream Outbound Server public static XStreamOut attachOutbound(Connection out_conn) { XStreamOut xsOut = null; try { // when attach to an outbound server, client needs to tell outbound // server the last position. xsOut = XStreamOut.attach((OracleConnection)out_conn, xsoutName, lastPosition, XStreamOut.DEFAULT_MODE); System.out.println("Attached to outbound server:"+xsoutName); System.out.print("Last Position is: "); if (lastPosition != null) { printHex(lastPosition); } else { System.out.println("NULL"); } return xsOut; } catch(Exception e) { System.out.println("cannot attach to outbound server: "+xsoutName); System.out.println(e.getMessage()); e.printStackTrace(); return null; } } // detach from the XStream Inbound Server public static void detachInbound(XStreamIn xsIn) { byte[] processedLowPosition = null; try { processedLowPosition = xsIn.detach(XStreamIn.DEFAULT_MODE); System.out.print("Inbound server processed low Position is: "); if (processedLowPosition != null) { printHex(processedLowPosition); } else { System.out.println("NULL"); } } catch(Exception e) { System.out.println("cannot detach from the inbound server: "+xsinName); System.out.println(e.getMessage()); e.printStackTrace(); } } // detach from the XStream Outbound Server public static void detachOutbound(XStreamOut xsOut) { try { xsOut.detach(XStreamOut.DEFAULT_MODE); } catch(Exception e) { System.out.println("cannot detach from the outbound server: "+xsoutName); System.out.println(e.getMessage()); e.printStackTrace(); } } public static void get_lcrs(XStreamIn xsIn, XStreamOut xsOut) { byte[] ping_pos = null; byte[] fetchlwm = null; String src_db = null; if (null == xsIn) { System.out.println("xstreamIn is null"); System.exit(0); } if (null == xsOut) { System.out.println("xstreamOut is null"); System.exit(0); } try { while(true) { // receive an LCR from outbound server LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE); fetchlwm = xsOut.getFetchLowWatermark(); // save source db for ping lcr if (null != alcr) src_db = alcr.getSourceDatabaseName(); if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active { assert alcr != null; // send the LCR to the inbound server xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE); // also get chunk data for this LCR if any if (alcr instanceof RowLCR) { // receive chunk from outbound then send to inbound if (((RowLCR)alcr).hasChunkData()) { ChunkColumnValue chunk = null; do { chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE); xsIn.sendChunk(chunk, XStreamIn.DEFAULT_MODE); } while (!chunk.isEndOfRow()); } } processedLowPosition = alcr.getPosition(); ping_pos = processedLowPosition; } else // batch is end { assert alcr == null; // send ping lcr if we haven't received any lcr in the batch // but we got a new fetch lwm, then send a commit lcr to // ping the inbound server with the new fetch LWM position if (null != src_db && null != fetchlwm && !samePos(fetchlwm,ping_pos)) { xsIn.sendLCR(createPing(src_db, fetchlwm), XStreamIn.DEFAULT_MODE); ping_pos = fetchlwm; } // flush the network xsIn.flush(XStreamIn.DEFAULT_MODE); // get the processed_low_position from inbound server processedLowPosition = xsIn.getProcessedLowWatermark(); // update the processed_low_position at oubound server if (null != processedLowPosition) xsOut.setProcessedLowWatermark(processedLowPosition, XStreamOut.DEFAULT_MODE); } } } catch(Exception e) { System.out.println("exception when processing LCRs"); System.out.println(e.getMessage()); e.printStackTrace(); } } public static void printHex(byte[] b) { for (int i = 0; i < b.length; ++i) { System.out.print( Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3)); } System.out.println(""); } // ping lcr is used to bump up the inbound server watermark private static RowLCR createPing(String src_db, byte[] pos) { java.util.Date today = new java.util.Date(); java.sql.Timestamp now = new java.sql.Timestamp(today.getTime()); oracle.sql.DATE src_time = new oracle.sql.DATE(now); RowLCR alcr = new DefaultRowLCR(); ((RowLCR)alcr).setSourceDatabaseName(src_db); ((RowLCR)alcr).setSourceTime(src_time); ((RowLCR)alcr).setPosition(pos); ((RowLCR)alcr).setCommandType(RowLCR.COMMIT); ((RowLCR)alcr).setTransactionId("Ping: " + src_time.toString()); return alcr; } private static boolean samePos(byte[] pos1, byte[] pos2) { int cmp_len; boolean result; if (pos1.length != pos2.length) return false; for (int i = 0; i<pos1.length; i++) { if (pos1[i] != pos2[i]) return false; } return true; } }