22 Using ANYDATA Queues for User Messages

This chapter describes how to use and manage Oracle Database Advanced Queuing when enqueuing and propagating. It describes ANYDATA queues and user messages.

Oracle Streams uses queues of type ANYDATA to store three types of messages:

  • Captured logical change record (LCR)

    This message type, produced by an Oracle Streams capture process, is not discussed in this guide.

    See Also:

    "Streams Capture Process" in Oracle Streams Concepts and Administration
  • User-enqueued LCR

    This is a message containing an LCR that was enqueued by a user or application.

  • User message

    This is a non-LCR message created and enqueued by a user or application.

All three types of messages can be used for information sharing within a single database or between databases.

This chapter contains these topics:

See Also:

Oracle Database PL/SQL Packages and Types Reference for more information about the ANYDATA type

ANYDATA Queues and User Messages

This section contains these topics:

ANYDATA Wrapper for User Messages Payloads

You can wrap almost any type of payload in an ANYDATA payload with the Convertdata_type static functions of the ANYDATA type, where data_type is the type of object to wrap. These functions take the object as input and return an ANYDATA object.

The following datatypes cannot be wrapped in an ANYDATA wrapper:

  • Nested table

  • NCLOB

  • ROWID and UROWID

The following datatypes can be directly wrapped in an ANYDATA wrapper, but they cannot be present in a user-defined type payload wrapped in an ANYDATA wrapper:

Programmatic Interfaces for Enqueue and Dequeue of User Messages

Your applications can use the following programmatic interfaces to enqueue user messages into an ANYDATA queue and dequeue user messages from an ANYDATA queue:

The following sections provide information about using these interfaces to enqueue user messages into and dequeue user messages from an ANYDATA queue.

See Also:

Chapter 3, "Oracle Database Advanced Queuing: Programmatic Interfaces" for more information about these programmatic interfaces

Enqueuing User Messages Using PL/SQL

To enqueue a user message containing an LCR into an ANYDATA queue using PL/SQL, first create the LCR to be enqueued. You use the constructor for the SYS.LCR$_ROW_RECORD type to create a row LCR, and you use the constructor for the SYS.LCR$_DDL_RECORD type to create a DDL LCR. Then you use the ANYDATA.ConvertObject function to convert the LCR into an ANYDATA payload and enqueue it using the DBMS_AQ.ENQUEUE procedure.

To enqueue a user message containing a non-LCR object into an ANYDATA queue using PL/SQL, you use one of the ANYDATA.Convert* functions to convert the object into an ANYDATA payload and enqueue it using the DBMS_AQ.ENQUEUE procedure.

Enqueuing User Messages Using OCI or JMS

To enqueue a user message containing an LCR into an ANYDATA queue using JMS or OCI, you must represent the LCR in XML format. To construct an LCR, use the oracle.xdb.XMLType class. LCRs are defined in the SYS schema. The LCR schema must be loaded into the SYS schema using the catxlcr.sql script in ORACLE_HOME/rdbms/admin.

To enqueue a message using OCI, perform the same actions that you would to enqueue a message into a typed queue. To enqueue a message using JMS, a user must have EXECUTE privilege on the DBMS_AQ, DBMS_AQIN and DBMS_AQJMS packages.

Note:

Enqueue of JMS types and XML types does not work with ANYDATA queues unless you call DBMS_AQADM.ENABLE_JMS_TYPES(queue_table_name) after DBMS_STREAMS_ADM.SET_UP_QUEUE(queue_name). Enabling a queue for these types may affect import/export of the queue table.

A non-LCR user message can be a message of any user-defined type or a JMS type. The JMS types include the following:

  • javax.jms.TextMessage

  • javax.jms.MapMessage

  • javax.jms.StreamMessage

  • javax.jms.ObjectMessage

  • javax.jms.BytesMessage

When using user-defined types, you must generate the Java class for the message using Jpublisher, which implements the ORAData interface. To enqueue a message into an ANYDATA queue, you can use methods QueueSender.send or TopicPublisher.publish.

See Also:

Dequeuing User Messages Using PL/SQL

To dequeue a user message from an ANYDATA queue using PL/SQL, you use the DBMS_AQ.DEQUEUE procedure and specify ANYDATA as the payload. The user message can contain an LCR or another type of object.

Dequeuing User Messages Using OCI or JMS

In an ANYDATA queue, user messages containing LCRs in XML format are represented as oracle.xdb.XMLType. Non-LCR messages can be any user-defined type or a JMS type.

To dequeue a message from an ANYDATA queue using JMS, you can use methods QueueReceiver, TopicSubscriber, or TopicReceiver. Because the queue can contain different types of objects wrapped in ANYDATA wrappers, you must register a list of SQL types and their corresponding Java classes in the type map of the JMS session. JMS types are already preregistered in the type map.

For example, suppose a queue contains user-enqueued LCR messages represented as oracle.xdb.XMLType and non-LCR messages of type person and address. The classes JPerson.java and JAddress.java are the ORAData mappings for person and address, respectively. Before dequeuing the message, the type map must be populated as follows:

java.util.Map map = ((AQjmsSession)q_sess).getTypeMap();

map.put("SCOTT.PERSON", Class.forName("JPerson"));
map.put("SCOTT.ADDRESS", Class.forName("JAddress"));
map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLType"));  // For LCRs

When using a messageSelector with a QueueReceiver or TopicPublisher, the selector can contain any SQL expression that has a combination of one or more of the following:

  • JMS message header fields or properties

    These include JMSPriority, JMSCorrelationID, JMSType, JMSXUserI, JMSXAppID, JMSXGroupID, and JMSXGroupSeq. An example of a JMS message field messageSelector is:

    JMSPriority < 3 AND JMSCorrelationID = 'Fiction'
    
  • User-defined message properties

    An example of a user-defined message properties messageSelector is:

    color IN ('RED', 'BLUE', 'GREEN') AND price < 30000
    
  • PL/SQL functions

    An example of a PL/SQL function messageSelector is:

    hr.GET_TYPE(tab.user_data) = 'HR.EMPLOYEES'  
    

To dequeue a message from an ANYDATA queue using OCI, perform the same actions that you would to dequeue a message from a typed queue.

See Also:

Message Propagation and ANYDATA Queues

ANYDATA queues can interoperate with typed queues. Table 22-1 shows the types of propagation possible between queues.

Table 22-1 Propagation Between Different Types of Queues

Source Queue Destination Queue Transformation

ANYDATA

ANYDATA

None

Typed

ANYDATA

Implicit

Note: Propagation is possible only if the messages in the typed queue meet the restrictions outlined in "Object Type Support".

ANYDATA

Typed

Requires a rule to filter messages and a user-defined transformation. Only messages containing a payload of the same type as the typed queue can be propagated to the typed queue.

Typed

Typed

Follows Oracle Database Advanced Queuing rules


Note:

Propagations cannot propagate user-enqueued ANYDATA messages that encapsulate payloads of object types, varrays, or nested tables between databases with different character sets. Propagations can propagate such messages between databases with the same character set.

Although you cannot use Simple Object Access Protocol (SOAP) to interact directly with an ANYDATA queue, you can use SOAP by propagating messages between an ANYDATA queue and a typed queue. If you want to enqueue a message into an ANYDATA queue using SOAP, you must first configure propagation from a typed queue to the ANYDATA queue. Then you can use SOAP to enqueue a message into the typed queue. The message is propagated automatically from the typed queue to the ANYDATA queue.

If you want to use SOAP to dequeue a message that is in an ANYDATA queue, then you can configure propagation from the ANYDATA queue to a typed queue. The message is propagated automatically from the ANYDATA queue to the typed queue, where it is available for access using SOAP.

See Also:

"Propagating Messages Between an ANYDATA Queue and a Typed Queue" in Oracle Streams Concepts and Administration

Enqueuing User Messages in ANYDATA Queues

This section provides examples of enqueuing messages into an ANYDATA queue. The examples assume you are in a SQL*Plus testing environment with access to two databases named db01 and db02. The first few examples prepare the testing environment for the other examples in this chapter.

In Example 22-1, you connect as a user with administrative privileges at databases db01 and db02 to create administrator user strmadmin and to grant EXECUTE privilege on the DBMS_AQ package to sample schema user oe.

Example 22-1 Creating ANYDATA Users

GRANT EXECUTE ON DBMS_AQ TO oe;
CREATE USER strmadmin IDENTIFIED BY strmadmin DEFAULT TABLESPACE example;
GRANT DBA TO strmadmin; 
GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_TRANSFORM TO strmadmin;

In Example 22-2, you connect to db01 as strmadmin to create ANYDATA queue oe_queue_any. The oe user is configured automatically as a secure user of the oe_queue_any queue and is given ENQUEUE and DEQUEUE privileges on the queue.

Example 22-2 Creating an ANYDATA Queue

CONNECT strmadmin;
Enter password: password
BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE(
    queue_table  => 'oe_qtab_any',
    queue_name   => 'oe_queue_any',
    queue_user   => 'oe');
END;
/

In Example 22-3, you add a subscriber to the oe_queue_any queue. This subscriber performs explicit dequeues of messages. The ADD_SUBSCRIBER procedure will automatically create an AQ_AGENT.

Example 22-3 Adding a Subscriber to the ANYDATA Queue

DECLARE
  subscriber SYS.AQ$_AGENT;
BEGIN
  subscriber :=  SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL);
  SYS.DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name  =>  'strmadmin.oe_queue_any',
    subscriber  =>  subscriber);
END;
/

In Example 22-4, you associate the oe user with the local_agent agent.

Example 22-4 Associating a User with an AQ_AGENT

BEGIN
  DBMS_AQADM.ENABLE_DB_ACCESS(
    agent_name  => 'local_agent',
    db_username => 'oe');
END;
/

In Example 22-5, you connect to database db01 as user oe to create an enqueue procedure. It takes an object of ANYDATA type as an input parameter and enqueues a message containing the payload into an existing ANYDATA queue.

Example 22-5 Creating an Enqueue Procedure

set echo off
set verify off
ACCEPT password CHAR PROMPT 'Enter the password for OE: ' HIDE
CONNECT oe/&password@db01;
 
set echo on
CREATE PROCEDURE oe.enq_proc (payload ANYDATA)  IS 
  enqopt     DBMS_AQ.ENQUEUE_OPTIONS_T;
  mprop      DBMS_AQ.MESSAGE_PROPERTIES_T;
  enq_msgid  RAW(16);
BEGIN
  mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); 
  DBMS_AQ.ENQUEUE(
    queue_name          =>  'strmadmin.oe_queue_any',
    enqueue_options     =>  enqopt,
    message_properties  =>  mprop,
    payload             =>  payload,
    msgid               =>  enq_msgid);
END;
/

In Example 22-6, you use procedure oe.enq_proc to enqueue a message of type VARCHAR2 into an ANYDATA queue.

Example 22-6 Enqueuing a VARCHAR2 Message into an ANYDATA Queue

EXEC oe.enq_proc(ANYDATA.ConvertVarchar2('Chemicals - SW'));
COMMIT;

In Example 22-7, you use procedure oe.enq_proc to enqueue a message of type NUMBER into an ANYDATA queue.

Example 22-7 Enqueuing a NUMBER Message into an ANYDATA Queue

EXEC oe.enq_proc(ANYDATA.ConvertNumber('16'));
COMMIT;

In Example 22-8, you use procedure oe.enq_proc to enqueue a user-defined type message into an ANYDATA queue.

Example 22-8 Enqueuing a User-Defined Type Message into an ANYDATA Queue

BEGIN
  oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ(
    '1646 Brazil Blvd','361168','Chennai','Tam', 'IN')));
END;
/
COMMIT;

See Also:

"Viewing the Contents of User-Enqueued Events in a Queue" in Oracle Streams Concepts and Administration

Dequeuing User Messages from ANYDATA Queues

This section provides examples of dequeuing messages from an ANYDATA queue. The examples assume that you have completed the examples in "Enqueuing User Messages in ANYDATA Queues".

To dequeue messages, you must know the consumer of the messages. To find the consumer for the messages in a queue, connect as the owner of the queue and query the AQ$queue_table_name view, where queue_table_name is the name of the queue table containing the queue.

In Example 22-9, you connect to database db01 as strmadmin, the owner of queue oe_queue_any, and perform a query on the AQ$OE_QTAB_ANY view. The query returns three rows, with LOCAL_AGENT as the CONSUMER_NAME in each row.

Example 22-9 Determining the Consumer of Messages in a Queue

CONNECT strmadmin;
Enter password: password
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_QTAB_ANY;

In Example 22-10, you connect to database db01 as user oe to create a dequeue procedure that takes as an input the consumer of the messages you want to dequeue, dequeues messages of oe.cust_address_typ, and prints the contents of the messages.

Example 22-10 Creating a Dequeue Procedure for an ANYDATA Queue

CONNECT oe; -- @db01
Enter password: password

CREATE PROCEDURE oe.get_cust_address (
consumer IN VARCHAR2) AS
  address         OE.CUST_ADDRESS_TYP;
  deq_address     ANYDATA; 
  msgid           RAW(16); 
  deqopt          DBMS_AQ.DEQUEUE_OPTIONS_T; 
  mprop           DBMS_AQ.MESSAGE_PROPERTIES_T;
  new_addresses   BOOLEAN := TRUE;
  next_trans      EXCEPTION;
  no_messages     EXCEPTION; 
  pragma exception_init (next_trans, -25235);
  pragma exception_init (no_messages, -25228);
  num_var         pls_integer;
BEGIN
  deqopt.consumer_name := consumer;
  deqopt.wait := 1;
  WHILE (new_addresses) LOOP
  BEGIN
    DBMS_AQ.DEQUEUE( 
      queue_name          =>  'strmadmin.oe_queue_any',
      dequeue_options     =>  deqopt,
      message_properties  =>  mprop,
      payload             =>  deq_address,
      msgid               =>  msgid);
    deqopt.navigation := DBMS_AQ.NEXT;
    DBMS_OUTPUT.PUT_LINE('****');
    IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN
       DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); 
       num_var := deq_address.GetObject(address);
       DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** ');
       DBMS_OUTPUT.PUT_LINE(address.street_address);
       DBMS_OUTPUT.PUT_LINE(address.postal_code);
       DBMS_OUTPUT.PUT_LINE(address.city);
       DBMS_OUTPUT.PUT_LINE(address.state_province);
       DBMS_OUTPUT.PUT_LINE(address.country_id);
    ELSE
       DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); 
    END IF;
  COMMIT;
  EXCEPTION
     WHEN next_trans THEN
       deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
     WHEN no_messages THEN
       new_addresses := FALSE;
       DBMS_OUTPUT.PUT_LINE('No more messages');
  END;
  END LOOP; 
END;
/

In Example 22-11, you use procedure oe.get_cust_address, created in Example 22-10, specifying LOCAL_AGENT as the consumer.

Example 22-11 Dequeuing Messages from an ANYDATA Queue

SET SERVEROUTPUT ON SIZE 100000
EXEC oe.get_cust_address('LOCAL_AGENT');

The example returns:

****
Message TYPE is: SYS.VARCHAR2
****
Message TYPE is: SYS.NUMBER
****
Message TYPE is: OE.CUST_ADDRESS_TYP
**** CUSTOMER ADDRESS ****
1646 Brazil Blvd
361168
Chennai
Tam
IN
No more messages

Propagating User Messages from ANYDATA Queues to Typed Queues

This section provides examples showing how to propagate non-LCR user messages between an ANYDATA queue and a typed queue.

Note:

The examples in this section assume that you have completed the examples in "Enqueuing User Messages in ANYDATA Queues".

See Also:

"Message Propagation and ANYDATA Queues" for more information about propagation between ANYDATA and typed queues

The first few examples set up propagation from the ANYDATA queue oe_queue_any, created in Example 22-2, to a typed queue in database db02. In Example 22-12, you connect as sample schema user oe to grant EXECUTE privilege on oe.cust_address_typ at databases db01 and db02 to administrator user strmadmin.

Example 22-12 Granting EXECUTE Privilege on a Type

CONNECT oe; -- @db01
Enter password: password

GRANT EXECUTE ON oe.cust_address_typ TO strmadmin;
CONNECT oe; -- @db02
Enter password: password

GRANT EXECUTE ON oe.cust_address_typ TO strmadmin;

In Example 22-13, you connect to database db02 as administrator user strmadmin and create a destination queue of type oe.cust_address_typ.

Example 22-13 Creating a Typed Destination Queue

CONNECT strmadmin;
Enter password: password

BEGIN 
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table         => 'strmadmin.oe_qtab_address', 
    queue_payload_type  => 'oe.cust_address_typ', 
    multiple_consumers  => true);
  DBMS_AQADM.CREATE_QUEUE(
    queue_name          => 'strmadmin.oe_queue_address', 
    queue_table         => 'strmadmin.oe_qtab_address');
  DBMS_AQADM.START_QUEUE(
    queue_name          => 'strmadmin.oe_queue_address');
END;
/

In Example 22-14, you connect to database db01 as administrator user strmadmin to create a database link from db01 to db02.

Example 22-14 Creating a Database Link

CONNECT strmadmin;
Enter password: password

CREATE DATABASE LINK db02 CONNECT TO strmadmin IDENTIFIED BY password 
  USING 'db02'; 

In Example 22-15, you create function any_to_cust_address_typ in the strmadmin schema at db01 that takes an ANYDATA payload containing an oe.cust_address_typ object and returns an oe.cust_address_typ object.

Example 22-15 Creating a Function to Extract a Typed Object from an ANYDATA Object

CONNECT strmadmin;
Enter password: password

CREATE FUNCTION strmadmin.any_to_cust_address_typ(in_any IN ANYDATA) 
RETURN OE.CUST_ADDRESS_TYP
AS
  address       OE.CUST_ADDRESS_TYP;
  num_var       NUMBER;
  type_name     VARCHAR2(100);
BEGIN
  type_name := in_any.GetTypeName();
  IF (type_name = 'OE.CUST_ADDRESS_TYP') THEN
    num_var := in_any.GetObject(address);
    RETURN address;
  ELSE
    raise_application_error(-20101, 'Conversion failed - ' || type_name);
  END IF;
END;
/

In Example 22-16, you create a transformation at db01 using the DBMS_TRANSFORM package.

Example 22-16 Creating an ANYDATA to Typed Object Transformation

BEGIN
  DBMS_TRANSFORM.CREATE_TRANSFORMATION( 
   schema         => 'strmadmin', 
   name           => 'anytoaddress', 
   from_schema    => 'SYS', 
   from_type      => 'ANYDATA', 
   to_schema      => 'oe', 
   to_type        => 'cust_address_typ', 
   transformation => 'strmadmin.any_to_cust_address_typ(source.user_data)'); 
END;
/

In Example 22-17, you create a subscriber for the typed queue. The subscriber must contain a rule that ensures that only messages of the appropriate type are propagated to the destination queue.

Example 22-17 Creating Subscriber ADDRESS_AGENT_REMOTE

DECLARE 
  subscriber  SYS.AQ$_AGENT; 
BEGIN 
  subscriber := SYS.AQ$_AGENT ('ADDRESS_AGENT_REMOTE', 
                               'STRMADMIN.OE_QUEUE_ADDRESS@DB02', 
                               0); 
  DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name     => 'strmadmin.oe_queue_any', 
    subscriber     => subscriber,
    rule           => 'TAB.USER_DATA.GetTypeName()=''OE.CUST_ADDRESS_TYP''',
    transformation => 'strmadmin.anytoaddress'); 
END; 
/

In Example 22-18, you schedule propagation between the ANYDATA queue at db01 and the typed queue at db02.

Example 22-18 Scheduling Propagation from an ANYDATA Queue to a Typed Queue

BEGIN 
  DBMS_AQADM.SCHEDULE_PROPAGATION(
    queue_name   => 'strmadmin.oe_queue_any', 
    destination  => 'db02'); 
END;
/

In Example 22-19, you connect to database db01 as sample schema user oe to enqueue a message of oe.cust_address_typ type wrapped in an ANYDATA wrapper. This example uses the enqueue procedure oe.enq_proc created in Example 22-5.

Example 22-19 Enqueuing a Typed Message in an ANYDATA Wrapper

CONNECT oe;
Enter password: password

BEGIN
  oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ(
    '1668 Chong Tao','111181','Beijing',NULL,'CN')));
END;
/
COMMIT;

After allowing some time for propagation, in Example 22-20 you query queue table AQ$OE_QTAB_ADDRESS at db02 to view the propagated message.

Example 22-20 Viewing the Propagated Message

CONNECT strmadmin;
Enter password: password

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_QTAB_ADDRESS;

The example returns one message for ADDRESS_AGENT_REMOTE:

MSG_ID                           MSG_STATE        CONSUMER_NAME
-------------------------------- ---------------- ------------------------------
EBEF5CACC4665A6FE030578CE70A370D READY            ADDRESS_AGENT_REMOTE
 
1 row selected.

See Also:

Chapter 20, "Oracle Messaging Gateway Message Conversion" for more information about transformations during propagation

Propagating User-Enqueued LCRs from ANYDATA Queues to Typed Queues

You can propagate user-enqueued LCRs to an appropriate typed queue, but propagation of captured LCRs to a typed queue is not supported.

See Also:

"Streams Capture Process" in Oracle Streams Concepts and Administration for more information on capture processes

To propagate user-enqueued LCRs from an ANYDATA queue to a typed queue, you complete the same steps as you do for non-LCR messages, but Oracle supplies the transformation functions. You can use the following functions in the DBMS_STREAMS package to transform LCRs in ANYDATA queues to messages in typed queues:

  • CONVERT_ANYDATA_TO_LCR_ROW transforms an ANYDATA payload containing a row LCR into a SYS.LCR$_ROW_RECORD payload.

  • CONVERT_ANYDATA_TO_LCR_DDL transforms an ANYDATA payload containing a DDL LCR into a SYS.LCR$_DDL_RECORD payload.

The examples in this section set up propagation of row LCRs from an ANYDATA queue named oe_queue_any to a typed queue of type SYS.LCR$_ROW_RECORD named oe_queue_lcr. The source queue oe_queue_any is at database db01, and the destination queue oe_queue_lcr is created at database db02 in Example 22-21.

Note:

The examples in this section assume you have already run the examples in the preceding sections of this chapter.

Example 22-21 Creating a Queue of Type LCR$_ROW_RECORD

CONNECT strmadmin;
Enter password: password

BEGIN 
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table         => 'strmadmin.oe_qtab_lcr', 
    queue_payload_type  => 'SYS.LCR$_ROW_RECORD', 
    multiple_consumers  => true);
  DBMS_AQADM.CREATE_QUEUE(
    queue_name   => 'strmadmin.oe_queue_lcr', 
    queue_table  => 'strmadmin.oe_qtab_lcr');
  DBMS_AQADM.START_QUEUE(
    queue_name   => 'strmadmin.oe_queue_lcr');
END;
/

In Example 22-22, you connect to db01 as administrator user strmadmin to create an ANYDATA to LCR$_ROW_RECORD transformation at db01 using the DBMS_TRANSFORM package.

Example 22-22 Creating an ANYDATA to LCR$_ROW_RECORD Transformation

CONNECT strmadmin;
Enter password: password

BEGIN
  DBMS_TRANSFORM.CREATE_TRANSFORMATION( 
    schema         => 'strmadmin', 
    name           => 'anytolcr', 
    from_schema    => 'SYS', 
    from_type      => 'ANYDATA', 
    to_schema      => 'SYS', 
    to_type        => 'LCR$_ROW_RECORD', 
    transformation =>
          'SYS.DBMS_STREAMS.CONVERT_ANYDATA_TO_LCR_ROW(source.user_data)'); 
END;
/

In Example 22-23, you create a subscriber at the typed queue. The subscriber specifies the anytolcr transformation created in Example 22-22 for the transformation parameter.

Example 22-23 Creating Subscriber ROW_LCR_AGENT_REMOTE

DECLARE 
  subscriber  SYS.AQ$_AGENT; 
BEGIN 
  subscriber := SYS.AQ$_AGENT(
    'ROW_LCR_AGENT_REMOTE', 
    'STRMADMIN.OE_QUEUE_LCR@DB02', 
    0); 
  DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name     => 'strmadmin.oe_queue_any',
    subscriber     => subscriber,
    rule           => 'TAB.USER_DATA.GetTypeName()=''SYS.LCR$_ROW_RECORD''',
    transformation => 'strmadmin.anytolcr'); 
END; 
/

In Example 22-24, you connect to database db01 as sample schema user oe to create a procedure to construct and enqueue a row LCR into the strmadmin.oe_queue_any queue.

Example 22-24 Creating a Procedure to Construct and Enqueue a Row LCR

CONNECT oe;
Enter password: password

CREATE PROCEDURE oe.enq_row_lcr_proc(
  source_dbname  VARCHAR2,
  cmd_type       VARCHAR2,
  obj_owner      VARCHAR2,
  obj_name       VARCHAR2,
  old_vals       SYS.LCR$_ROW_LIST,
  new_vals       SYS.LCR$_ROW_LIST) 
AS
  eopt           DBMS_AQ.ENQUEUE_OPTIONS_T;
  mprop          DBMS_AQ.MESSAGE_PROPERTIES_T;
  enq_msgid      RAW(16);
  row_lcr        SYS.LCR$_ROW_RECORD;
BEGIN
  mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); 
  row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
    source_database_name  =>  source_dbname,
    command_type          =>  cmd_type,
    object_owner          =>  obj_owner,
    object_name           =>  obj_name,
    old_values            =>  old_vals,
    new_values            =>  new_vals);
  DBMS_AQ.ENQUEUE(
    queue_name            =>  'strmadmin.oe_queue_any', 
    enqueue_options       =>  eopt,
    message_properties    =>  mprop,
    payload               =>  ANYDATA.ConvertObject(row_lcr),
    msgid                 =>  enq_msgid);
END enq_row_lcr_proc;
/

In Example 22-25, you use the oe.enq_row_lcr_proc procedure first to create a row LCR that inserts a row into the oe.inventories table, and then to enqueue the row LCR into the strmadmin.oe_queue_any queue.

Note:

This example does not insert a new row in the oe.inventories table. The new row is inserted when an Oracle Streams apply process dequeues the row LCR and applies it.

Example 22-25 Creating and Enqueuing a Row LCR

DECLARE
  newunit1  SYS.LCR$_ROW_UNIT;
  newunit2  SYS.LCR$_ROW_UNIT;
  newunit3  SYS.LCR$_ROW_UNIT;
  newvals   SYS.LCR$_ROW_LIST;
BEGIN
  newunit1 := SYS.LCR$_ROW_UNIT(
    'PRODUCT_ID', 
    ANYDATA.ConvertNumber(3503),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newunit2 := SYS.LCR$_ROW_UNIT(
    'WAREHOUSE_ID', 
    ANYDATA.ConvertNumber(1),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newunit3 := SYS.LCR$_ROW_UNIT(
    'QUANTITY_ON_HAND', 
    ANYDATA.ConvertNumber(157),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3);
oe.enq_row_lcr_proc(
  source_dbname  =>  'DB01',
  cmd_type       =>  'INSERT',
  obj_owner      =>  'OE',
  obj_name       =>  'INVENTORIES',
  old_vals       =>  NULL,
  new_vals       =>  newvals);
END;
/
COMMIT;

The LCR is propagated to database db02 by the schedule created in Example 22-18. After allowing some time for propagation, in Example 22-26 you query queue table AQ$OE_QTAB_LCR at db02 to view the propagated message.

Example 22-26 Viewing the Propagated LCR

CONNECT strmadmin;
Enter password: password

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_QTAB_LCR;

The example returns one message for ROW_LCR_AGENT_REMOTE:

MSG_ID                           MSG_STATE        CONSUMER_NAME
-------------------------------- ---------------- ------------------------------
ECE2B0F912DDFF5EE030578CE70A04BB READY            ROW_LCR_AGENT_REMOTE

See Also:

"DBMS_STREAMS" in Oracle Database PL/SQL Packages and Types Reference for more information about the row LCR and DDL LCR conversion functions