Oracle® Streams Concepts and Administration 10g Release 2 (10.2) Part Number B14229-04 |
|
|
View PDF |
This chapter provides instructions for managing ANYDATA
queues, propagations, and messaging environments.
This chapter contains these topics:
Each task described in this chapter should be completed by a Streams administrator that has been granted the appropriate privileges, unless specified otherwise.
An ANYDATA
queue stages messages whose payloads are of ANYDATA
type. Therefore, an ANYDATA
queue can stage a message with a payload of nearly any type, if the payload is wrapped in an ANYDATA
wrapper. Each Streams capture process, apply process, and messaging client is associated with one ANYDATA
queue, and each Streams propagation is associated with one ANYDATA
source queue and one ANYDATA
destination queue.
This section contains instructions for completing the following tasks related to ANYDATA
queues:
The easiest way to create an ANYDATA
queue is to use the SET_UP_QUEUE
procedure in the DBMS_STREAMS_ADM
package. This procedure enables you to specify the following settings for the ANYDATA
queue it creates:
The queue table for the queue
A storage clause for the queue table
The queue name
A queue user that will be configured as a secure queue user of the queue and granted ENQUEUE
and DEQUEUE
privileges on the queue
A comment for the queue
If the specified queue table does not exist, then it is created. If the specified queue table exists, then the existing queue table is used for the new queue. If you do not specify any queue table when you create the queue, then, by default, streams_queue_table
is specified.
You can use a single procedure, the SET_UP_QUEUE
procedure in the DBMS_STREAMS_ADM
package, to create an ANYDATA
queue and the queue table used by the queue. For SET_UP_QUEUE
to create a new queue table, the specified queue table must not exist.
For example, run the following procedure to create an ANYDATA
queue with the SET_UP_QUEUE
procedure:
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'strmadmin.streams_queue_table', queue_name => 'strmadmin.streams_queue', queue_user => 'hr'); END; /
Running this procedure performs the following actions:
Creates a queue table named streams_queue_table
. The queue table is created only if it does not already exist. Queues based on the queue table stage messages of ANYDATA
type. Queue table names can be a maximum of 24 bytes.
Creates a queue named streams_queue
. The queue is created only if it does not already exist. Queue names can be a maximum of 24 bytes.
Specifies that the streams_queue
queue is based on the strmadmin.streams_queue_table
queue table.
Configures the hr
user as a secure queue user of the queue, and grants this user ENQUEUE
and DEQUEUE
privileges on the queue.
Starts the queue.
Default settings are used for the parameters that are not explicitly set in the SET_UP_QUEUE
procedure.
When the SET_UP_QUEUE
procedure creates a queue table, the following DBMS_AQADM.CREATE_QUEUE_TABLE
parameter settings are specified:
If the database is Oracle Database 10g Release 2 or later, the sort_list
setting is commit_time
. If the database is a release prior to Oracle Database 10g Release 2, the sort_list
setting is enq_time
.
The multiple_consumers
setting is true
.
The message_grouping
setting is transactional
.
The secure
setting is true
.
The other parameters in the CREATE_QUEUE_TABLE
procedure are set to their default values.
You can use the CREATE_QUEUE_TABLE
procedure in the DBMS_AQADM
package to create a queue table of ANYDATA
type with different properties than the default properties specified by the SET_UP_QUEUE
procedure in the DBMS_STREAMS_ADM
package. After you create the queue table with the CREATE_QUEUE_TABLE
procedure, you can create a queue that uses the queue table. To do so, specify the queue table in the queue_table
parameter of the SET_UP_QUEUE
procedure.
Similarly, you can use the CREATE_QUEUE
procedure in the DBMS_AQADM
package to create a queue instead of SET_UP_QUEUE
. Use CREATE_QUEUE
if you require custom settings for the queue. For example, use CREATE_QUEUE
to specify a custom retry delay or retention time. If you use CREATE_QUEUE
, then you must start the queue manually.
Note:
A message cannot be enqueued into a queue unless a subscriber who can dequeue the message is configured.See Also:
"Wrapping User Message Payloads in an ANYDATA Wrapper and Enqueuing Them" for an example that creates an ANYDATA
queue using procedures in the DBMS_AQADM
package
Oracle Database PL/SQL Packages and Types Reference for more information about the SET_UP_QUEUE
, CREATE_QUEUE_TABLE
, and CREATE_QUEUE
procedures
For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. If you use the SET_UP_QUEUE
procedure in the DBMS_STREAMS_ADM
package to create the secure queue, then the queue owner and the user specified by the queue_user
parameter are configured as secure users of the queue automatically. If you want to enable other users to perform operations on the queue, then you can configure these users in one of the following ways:
Run SET_UP_QUEUE
and specify a queue_user
. Queue creation is skipped if the queue already exists, but a new queue user is configured if one is specified.
Associate the user with an AQ agent manually.
The following example illustrates associating a user with an AQ agent manually. Suppose you want to enable the oe
user to perform queue operations on the streams_queue
created in "Creating an ANYDATA Queue". The following steps configure the oe
user as a secure queue user of streams_queue
:
Connect as an administrative user who can create AQ agents and alter users.
Create an agent:
EXEC DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'streams_queue_agent');
If the user must be able to dequeue messages from queue, then make the agent a subscriber of the secure queue:
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('streams_queue_agent', NULL, NULL); DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.streams_queue', subscriber => subscriber, rule => NULL, transformation => NULL); END; /
Associate the user with the agent:
BEGIN DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'streams_queue_agent', db_username => 'oe'); END; /
Grant the user EXECUTE
privilege on the DBMS_STREAMS_MESSAGING
package or the DBMS_AQ
package, if the user is not already granted these privileges:
GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO oe; GRANT EXECUTE ON DBMS_AQ TO oe;
When these steps are complete, the oe
user is a secure user of the streams_queue
queue and can perform operations on the queue. You still must grant the user specific privileges to perform queue operations, such as enqueue and dequeue privileges.
See Also:
Oracle Database PL/SQL Packages and Types Reference for more information about AQ agents and using the DBMS_AQADM
package
You might want to disable a user from performing queue operations on a secure queue for the following reasons:
You dropped a capture process, but you did not drop the queue that was used by the capture process, and you do not want the user who was the capture user to be able to perform operations on the remaining secure queue.
You dropped an apply process, but you did not drop the queue that was used by the apply process, and you do not want the user who was the apply user to be able to perform operations on the remaining secure queue.
You used the ALTER_APPLY
procedure in the DBMS_APPLY_ADM
package to change the apply_user
for an apply process, and you do not want the old apply_user
to be able to perform operations on the apply process queue.
You enabled a user to perform operations on a secure queue by completing the steps described in Enabling a User to Perform Operations on a Secure Queue, but you no longer want this user to be able to perform operations on the secure queue.
To disable a secure queue user, you can revoke ENQUEUE
and DEQUEUE
privilege on the queue from the user, or you can run the DISABLE_DB_ACCESS
procedure in the DBMS_AQADM
package. For example, suppose you want to disable the oe
user from performing queue operations on the streams_queue
created in "Creating an ANYDATA Queue".
Attention:
If an AQ agent is used for multiple secure queues, then runningDISABLE_DB_ACCESS
for the agent prevents the user associated with the agent from performing operations on all of these queues.Run the following procedure to disable the oe
user from performing queue operations on the secure queue streams_queue
:
BEGIN DBMS_AQADM.DISABLE_DB_ACCESS( agent_name => 'streams_queue_agent', db_username => 'oe'); END; /
If the agent is no longer needed, you can drop the agent:
BEGIN DBMS_AQADM.DROP_AQ_AGENT( agent_name => 'streams_queue_agent'); END; /
Revoke privileges on the queue from the user, if the user no longer needs these privileges.
BEGIN DBMS_AQADM.REVOKE_QUEUE_PRIVILEGE ( privilege => 'ALL', queue_name => 'strmadmin.streams_queue', grantee => 'oe'); END; /
See Also:
Oracle Database PL/SQL Packages and Types Reference for more information about AQ agents and using the DBMS_AQADM
package
You use the REMOVE_QUEUE
procedure in the DBMS_STREAMS_ADM
package to remove an existing ANYDATA
queue. When you run the REMOVE_QUEUE
procedure, it waits until any existing messages in the queue are consumed. Next, it stops the queue, which means that no further enqueues into the queue or dequeues from the queue are allowed. When the queue is stopped, it drops the queue.
You can also drop the queue table for the queue if it is empty and is not used by another queue. To do so, specify true
, the default, for the drop_unused_queue_table
parameter.
In addition, you can drop any Streams clients that use the queue by setting the cascade
parameter to true
. By default, the cascade
parameter is set to false
.
For example, to remove an ANYDATA
queue named streams_queue
in the strmadmin
schema and drop its empty queue table, run the following procedure:
BEGIN DBMS_STREAMS_ADM.REMOVE_QUEUE( queue_name => 'strmadmin.streams_queue', cascade => false, drop_unused_queue_table => true); END; /
In this case, because the cascade
parameter is set to false
, this procedure drops the streams_queue
only if no Streams clients use the queue. If the cascade
parameter is set to false
and any Streams client uses the queue, then an error is raised.
A propagation propagates messages from a Streams source queue to a Streams destination queue. This section provides instructions for completing the following tasks:
In addition, you can use the features of Oracle Advanced Queuing (AQ) to manage Streams propagations.
See Also:
Oracle Streams Advanced Queuing User's Guide and Reference for more information about managing propagations with the features of AQ
You can use any of the following procedures to create a propagation between two ANYDATA
queues:
Each of these procedures in the DBMS_STREAMS_ADM
package creates a propagation with the specified name if it does not already exist, creates either a positive rule set or negative rule set for the propagation if the propagation does not have such a rule set, and can add table rules, schema rules, or global rules to the rule set. The CREATE_PROPAGATION
procedure creates a propagation, but does not create a rule set or rules for the propagation. However, the CREATE_PROPAGATION
procedure enables you to specify an existing rule set to associate with the propagation, either as a positive or a negative rule set. All propagations are started automatically upon creation.
The following tasks must be completed before you create a propagation:
Create a source queue and a destination queue for the propagation, if they do not exist. See "Creating an ANYDATA Queue" for instructions.
Create a database link between the database containing the source queue and the database containing the destination queue. See "Configuring a Streams Administrator" for information.
The following example runs the ADD_TABLE_PROPAGATION_RULES
procedure in the DBMS_STREAMS_ADM
package to create a propagation:
BEGIN DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES( table_name => 'hr.departments', streams_name => 'strm01_propagation', source_queue_name => 'strmadmin.strm_a_queue', destination_queue_name => 'strmadmin.strm_b_queue@dbs2.net', include_dml => true, include_ddl => true, include_tagged_lcr => false, source_database => 'dbs1.net', inclusion_rule => true, queue_to_queue => true); END; /
Running this procedure performs the following actions:
Creates a propagation named strm01_propagation
. The propagation is created only if it does not already exist.
Specifies that the propagation propagates LCRs from strm_a_queue
in the current database to strm_b_queue
in the dbs2.net
database.
Specifies that the propagation uses the dbs2.net
database link to propagate the LCRs, because the destination_queue_name
parameter contains @dbs2.net
.
Creates a positive rule set and associates it with the propagation because the inclusion_rule
parameter is set to true
. The rule set uses the evaluation context SYS.STREAMS$_EVALUATION_CONTEXT
. The rule set name is system generated.
Creates two rules. One rule evaluates to TRUE
for row LCRs that contain the results of DML changes to the hr.departments
table. The other rule evaluates to TRUE
for DDL LCRs that contain DDL changes to the hr.departments
table. The rule names are system generated.
Adds the two rules to the positive rule set associated with the propagation. The rules are added to the positive rule set because the inclusion_rule
parameter is set to true
.
Specifies that the propagation propagates an LCR only if it has a NULL
tag, because the include_tagged_lcr
parameter is set to false
. This behavior is accomplished through the system-created rules for the propagation.
Specifies that the source database for the LCRs being propagated is dbs1.net
, which might or might not be the current database. This propagation does not propagate LCRs in the source queue that have a different source database.
Creates a propagation job for the queue-to-queue propagation.
Note:
To use queue-to-queue propagation, the compatibility level must be 10.2.0 or higher for each database that contains a queue involved in the propagation.See Also:
Oracle Streams Replication Administrator's Guide for more information about Streams tags
The following example runs the CREATE_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package to create a propagation:
BEGIN DBMS_PROPAGATION_ADM.CREATE_PROPAGATION( propagation_name => 'strm02_propagation', source_queue => 'strmadmin.strm03_queue', destination_queue => 'strmadmin.strm04_queue', destination_dblink => 'dbs2.net', rule_set_name => 'strmadmin.strm01_rule_set', queue_to_queue => true); END; /
Running this procedure performs the following actions:
Creates a propagation named strm02_propagation
. A propagation with the same name must not exist.
Specifies that the propagation propagates messages from strm03_queue
in the current database to strm04_queue
in the dbs2.net
database. Depending on the rules in the rule sets for the propagation, the propagated messages can be captured messages or user-enqueued messages, or both.
Specifies that the propagation uses the dbs2.net
database link to propagate the messages.
Associates the propagation with an existing rule set named strm01_rule_set
. This rule set is the positive rule set for the propagation.
Creates a propagation job for the queue-to-queue propagation.
Note:
To use queue-to-queue propagation, the compatibility level must be 10.2.0 or higher for each database that contains a queue involved in the propagation.You run the START_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package to start an existing propagation. For example, the following procedure starts a propagation named strm01_propagation
:
BEGIN DBMS_PROPAGATION_ADM.START_PROPAGATION( propagation_name => 'strm01_propagation'); END; /
You run the STOP_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package to stop an existing propagation. For example, the following procedure stops a propagation named strm01_propagation
:
BEGIN DBMS_PROPAGATION_ADM.STOP_PROPAGATION( propagation_name => 'strm01_propagation', force => false); END; /
To clear the statistics for the propagation when it is stopped, set the force
parameter to true
. If there is a problem with a propagation, then stopping the propagation with the force
parameter set to true
and restarting the propagation might correct the problem. If the force
parameter is set to false
, then the statistics for the propagation are not cleared.
To alter the schedule of an existing propagation job, use the ALTER_PROPAGATION_SCHEDULE
procedure in the DBMS_AQADM
package. The following sections contain examples that alter the schedule of a propagation job for a queue-to-queue propagation and for a queue-to-dblink propagation. These examples set the propagation job to propagate messages every 15 minutes (900 seconds), with each propagation lasting 300 seconds, and a 25-second wait before new messages in a completely propagated queue are propagated.
See Also:
Oracle Streams Advanced Queuing User's Guide and Reference for more information about using the ALTER_PROPAGATION_SCHEDULE
procedure
To alter the schedule of a propagation job for a queue-to-queue propagation that propagates messages from the strmadmin.strm_a_queue
source queue to the strmadmin.strm_b_queue
destination queue using the dbs2.net
database link, run the following procedure:
BEGIN DBMS_AQADM.ALTER_PROPAGATION_SCHEDULE( queue_name => 'strmadmin.strm_a_queue', destination => 'dbs2.net', duration => 300, next_time => 'SYSDATE + 900/86400', latency => 25, destination_queue => 'strmadmin.strm_b_queue'); END; /
Because each queue-to-queue propagation has its own propagation job, this procedure alters only the schedule of the propagation that propagates messages between the two queues specified. The destination_queue
parameter must specify the name of the destination queue to alter the propagation schedule of a queue-to-queue propagation.
To alter the schedule of a propagation job for a queue-to-dblink propagation that propagates messages from the strmadmin.streams_queue
source queue using the dbs3.net
database link, run the following procedure:
BEGIN DBMS_AQADM.ALTER_PROPAGATION_SCHEDULE( queue_name => 'strmadmin.streams_queue', destination => 'dbs3.net', duration => 300, next_time => 'SYSDATE + 900/86400', latency => 25); END; /
Because the propagation is a queue-to-dblink propagation, the destination_queue
parameter is not specified. Completing this task affects all queue-to-dblink propagations that propagate messages from the source queue to all destination queues that use the dbs3.net
database link.
You can specify one positive rule set and one negative rule set for a propagation. The propagation propagates a message if it evaluates to TRUE
for at least one rule in the positive rule set and discards a change if it evaluates to TRUE
for at least one rule in the negative rule set. The negative rule set is evaluated before the positive rule set.
You specify an existing rule set as the positive rule set for an existing propagation using the rule_set_name
parameter in the ALTER_PROPAGATION
procedure. This procedure is in the DBMS_PROPAGATION_ADM
package.
For example, the following procedure sets the positive rule set for a propagation named strm01_propagation
to strm02_rule_set
.
BEGIN DBMS_PROPAGATION_ADM.ALTER_PROPAGATION( propagation_name => 'strm01_propagation', rule_set_name => 'strmadmin.strm02_rule_set'); END; /
You specify an existing rule set as the negative rule set for an existing propagation using the negative_rule_set_name
parameter in the ALTER_PROPAGATION
procedure. This procedure is in the DBMS_PROPAGATION_ADM
package.
For example, the following procedure sets the negative rule set for a propagation named strm01_propagation
to strm03_rule_set
.
BEGIN DBMS_PROPAGATION_ADM.ALTER_PROPAGATION( propagation_name => 'strm01_propagation', negative_rule_set_name => 'strmadmin.strm03_rule_set'); END; /
To add rules to the rule set of a propagation, you can run one of the following procedures:
Excluding the ADD_SUBSET_PROPAGATION_RULES
procedure, these procedures can add rules to the positive rule set or negative rule set for a propagation. The ADD_SUBSET_PROPAGATION_RULES
procedure can add rules only to the positive rule set for a propagation.
The following example runs the ADD_TABLE_PROPAGATION_RULES
procedure in the DBMS_STREAMS_ADM
package to add rules to the positive rule set of an existing propagation named strm01_propagation
:
BEGIN DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES( table_name => 'hr.locations', streams_name => 'strm01_propagation', source_queue_name => 'strmadmin.strm_a_queue', destination_queue_name => 'strmadmin.strm_b_queue@dbs2.net', include_dml => true, include_ddl => true, source_database => 'dbs1.net', inclusion_rule => true); END; /
Running this procedure performs the following actions:
Creates two rules. One rule evaluates to TRUE
for row LCRs that contain the results of DML changes to the hr.locations
table. The other rule evaluates to TRUE
for DDL LCRs that contain DDL changes to the hr.locations
table. The rule names are system generated.
Specifies that both rules evaluate to TRUE
only for LCRs whose changes originated at the dbs1.net
source database.
Adds the two rules to the positive rule set associated with the propagation because the inclusion_rule
parameter is set to true
.
The following example runs the ADD_TABLE_PROPAGATION_RULES
procedure in the DBMS_STREAMS_ADM
package to add rules to the negative rule set of an existing propagation named strm01_propagation
:
BEGIN DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES( table_name => 'hr.departments', streams_name => 'strm01_propagation', source_queue_name => 'strmadmin.strm_a_queue', destination_queue_name => 'strmadmin.strm_b_queue@dbs2.net', include_dml => true, include_ddl => true, source_database => 'dbs1.net', inclusion_rule => false); END; /
Running this procedure performs the following actions:
Creates two rules. One rule evaluates to TRUE
for row LCRs that contain the results of DML changes to the hr.departments
table, and the other rule evaluates to TRUE
for DDL LCRs that contain DDL changes to the hr.departments
table. The rule names are system generated.
Specifies that both rules evaluate to TRUE
only for LCRs whose changes originated at the dbs1.net
source database.
Adds the two rules to the negative rule set associated with the propagation because the inclusion_rule
parameter is set to false
.
You remove a rule from the rule set for an existing propagation by running the REMOVE_RULE
procedure in the DBMS_STREAMS_ADM
package. For example, the following procedure removes a rule named departments3
from the positive rule set of a propagation named strm01_propagation
.
BEGIN DBMS_STREAMS_ADM.REMOVE_RULE( rule_name => 'departments3', streams_type => 'propagation', streams_name => 'strm01_propagation', drop_unused_rule => true, inclusion_rule => true); END; /
In this example, the drop_unused_rule
parameter in the REMOVE_RULE
procedure is set to true
, which is the default setting. Therefore, if the rule being removed is not in any other rule set, then it will be dropped from the database. If the drop_unused_rule
parameter is set to false
, then the rule is removed from the rule set, but it is not dropped from the database even if it is not in any other rule set.
If the inclusion_rule
parameter is set to false
, then the REMOVE_RULE
procedure removes the rule from the negative rule set for the propagation, not the positive rule set.
To remove all of the rules in the rule set for the propagation, then specify NULL
for the rule_name
parameter when you run the REMOVE_RULE
procedure.
You specify that you want to remove a rule set from a propagation using the ALTER_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package. This procedure can remove the positive rule set, negative rule set, or both. Specify true
for the remove_rule_set
parameter to remove the positive rule set for the propagation. Specify true
for the remove_negative_rule_set
parameter to remove the negative rule set for the propagation.
For example, the following procedure removes both the positive and the negative rule set from a propagation named strm01_propagation
.
BEGIN DBMS_PROPAGATION_ADM.ALTER_PROPAGATION( propagation_name => 'strm01_propagation', remove_rule_set => true, remove_negative_rule_set => true); END; /
Note:
If a propagation does not have a positive or negative rule set, then the propagation propagates all messages in the source queue to the destination queue.You run the DROP_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package to drop an existing propagation. For example, the following procedure drops a propagation named strm01_propagation
:
BEGIN DBMS_PROPAGATION_ADM.DROP_PROPAGATION( propagation_name => 'strm01_propagation', drop_unused_rule_sets => true); END; /
Because the drop_unused_rule_sets
parameter is set to true
, this procedure also drops any rule sets used by the propagation strm01_propagation
, unless a rule set is used by another Streams client. If the drop_unused_rule_sets
parameter is set to true
, then both the positive rule set and negative rule set for the propagation might be dropped. If this procedure drops a rule set, then it also drops any rules in the rule set that are not in another rule set.
Note:
When you drop a propagation, the propagation job used by the propagation is dropped automatically, if no other propagations are using the propagation job.Streams enables messaging with queues of type ANYDATA
. These queues stage user messages whose payloads are of ANYDATA
type, and an ANYDATA
payload can be a wrapper for payloads of different datatypes.
This section provides instructions for completing the following tasks:
Note:
The examples in this section assume that you have configured a Streams administrator at each database.See Also:
"ANYDATA Queues and User Messages" for conceptual information about messaging in Streams
Oracle Streams Advanced Queuing User's Guide and Reference for more information about AQ
Oracle Database PL/SQL Packages and Types Reference for more information about the ANYDATA
type
You can wrap almost any type of payload in an ANYDATA
payload. The following sections provide examples of enqueuing messages into, and dequeuing messages from, an ANYDATA
queue.
The following steps illustrate how to wrap payloads of various types in an ANYDATA
payload.
Connect as an administrative user who can create users, grant privileges, create tablespaces, and alter users at the dbs1.net
database.
Grant EXECUTE
privilege on the DBMS_AQ
package to the oe
user so that this user can run the ENQUEUE
and DEQUEUE
procedures in that package:
GRANT EXECUTE ON DBMS_AQ TO oe;
Connect as the Streams administrator, as in the following example:
CONNECT strmadmin/strmadminpw@dbs1.net
Create an ANYDATA
queue if one does not already exist.
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'oe_q_table_any', queue_name => 'oe_q_any', queue_user => 'oe'); END; /
The oe
user is configured automatically as a secure queue user of the oe_q_any
queue and is given ENQUEUE
and DEQUEUE
privileges on the queue. In addition, an AQ agent named oe
is configured and is associated with the oe
user. However, a message cannot be enqueued into a queue unless a subscriber who can dequeue the message is configured.
Add a subscriber for oe_q_any
queue. This subscriber will perform explicit dequeues of messages.
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('OE', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_q_any', subscriber => subscriber); END; /
Connect as the oe
user.
CONNECT oe/oe@dbs1.net
Create a procedure that takes as an input parameter an object of ANYDATA
type and enqueues a message containing the payload into an existing ANYDATA
queue.
CREATE OR REPLACE PROCEDURE oe.enq_proc (payload ANYDATA) IS enqopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('OE', NULL, NULL); DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_q_any', enqueue_options => enqopt, message_properties => mprop, payload => payload, msgid => enq_msgid); END; /
Run the procedure you created in Step 7 by specifying the appropriate Convert
data_type
function. The following commands enqueue messages of various types.
VARCHAR2
type:
EXEC oe.enq_proc(ANYDATA.ConvertVarchar2('Chemicals - SW')); COMMIT;
NUMBER
type:
EXEC oe.enq_proc(ANYDATA.ConvertNumber('16')); COMMIT;
User-defined type:
BEGIN oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ( '1646 Brazil Blvd','361168','Chennai','Tam', 'IN'))); END; / COMMIT;
See Also:
"Viewing the Contents of User-Enqueued Messages in a Queue" for information about viewing the contents of these enqueued messagesThe following steps illustrate how to dequeue a payload wrapped in an ANYDATA
payload. This example assumes that you have completed the steps in "Wrapping User Message Payloads in an ANYDATA Wrapper and Enqueuing Them".
To dequeue messages, you must know the consumer of the messages. To find the consumer for the messages in a queue, connect as the owner of the queue and query the AQ$
queue_table_name
, where queue_table_name
is the name of the queue table. For example, to find the consumers of the messages in the oe_q_any
queue, run the following query:
CONNECT strmadmin/strmadminpw@dbs1.net SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;
Connect as the oe
user:
CONNECT oe/oe@dbs1.net
Create a procedure that takes as an input the consumer of the messages you want to dequeue. The following example procedure dequeues messages of oe.cust_address_typ
and prints the contents of the messages.
CREATE OR REPLACE PROCEDURE oe.get_cust_address ( consumer IN VARCHAR2) AS address OE.CUST_ADDRESS_TYP; deq_address ANYDATA; msgid RAW(16); deqopt DBMS_AQ.DEQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; new_addresses BOOLEAN := true; next_trans EXCEPTION; no_messages EXCEPTION; pragma exception_init (next_trans, -25235); pragma exception_init (no_messages, -25228); num_var pls_integer; BEGIN deqopt.consumer_name := consumer; deqopt.wait := 1; WHILE (new_addresses) LOOP BEGIN DBMS_AQ.DEQUEUE( queue_name => 'strmadmin.oe_q_any', dequeue_options => deqopt, message_properties => mprop, payload => deq_address, msgid => msgid); deqopt.navigation := DBMS_AQ.NEXT; DBMS_OUTPUT.PUT_LINE('****'); IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); num_var := deq_address.GetObject(address); DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** '); DBMS_OUTPUT.PUT_LINE(address.street_address); DBMS_OUTPUT.PUT_LINE(address.postal_code); DBMS_OUTPUT.PUT_LINE(address.city); DBMS_OUTPUT.PUT_LINE(address.state_province); DBMS_OUTPUT.PUT_LINE(address.country_id); ELSE DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); END IF; COMMIT; EXCEPTION WHEN next_trans THEN deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION; WHEN no_messages THEN new_addresses := false; DBMS_OUTPUT.PUT_LINE('No more messages'); END; END LOOP; END; /
Run the procedure you created in Step 1 and specify the consumer of the messages you want to dequeue, as in the following example:
SET SERVEROUTPUT ON SIZE 100000 EXEC oe.get_cust_address('OE');
This section contains instructions for configuring the following elements in a database:
An enqueue procedure that enqueues messages into an ANYDATA
queue at a database. In this example, the enqueue procedure uses a trigger to enqueue a message every time a row is inserted into the oe.orders
table.
A messaging client that can dequeue user-enqueued messages based on rules. In this example, the messaging client uses a rule so that it dequeues only messages that involve the oe.orders
table. The messaging client uses the DEQUEUE
procedure in the DBMS_STREAMS_MESSAGING
to dequeue one message at a time and display the order number for the order.
Message notification for the messaging client. In this example, a notification is sent to an email address when a message is enqueued into the queue used by the messaging client. The message can be dequeued by the messaging client because the message satisfies the rule sets of the messaging client.
You can query the DBA_STREAMS_MESSAGE_CONSUMERS
data dictionary view for information about existing messaging clients and notifications.
Complete the following steps to configure a messaging client and message notification:
Connect as an administrative user who can grant privileges and execute subprograms in supplied packages.
Set the host name used to send the email, the mail port, and the email account that sends email messages for email notifications using the DBMS_AQELM
package. The following example sets the mail host name to smtp.mycompany.com
, the mail port to 25
, and the email account to Mary.Smith@mycompany.com
:
BEGIN DBMS_AQELM.SET_MAILHOST('smtp.mycompany.com') ; DBMS_AQELM.SET_MAILPORT(25) ; DBMS_AQELM.SET_SENDFROM('Mary.Smith@mycompany.com'); END; /
You can use procedures in the DBMS_AQELM
package to determine the current mail host, mail port, and send from settings for a database. For example, to determine the current mail host for a database, use the DBMS_AQELM.GET_MAILHOST
procedure.
Grant the necessary privileges to the users who will create the messaging client, enqueue and dequeue messages, and specify message notifications. In this example, the oe
user performs all of these tasks.
GRANT EXECUTE ON DBMS_AQ TO oe; GRANT EXECUTE ON DBMS_STREAMS_ADM TO oe; GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO oe; BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, grantee => 'oe', grant_option => false); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ, grantee => 'oe', grant_option => false); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ, grantee => 'oe', grant_option => false); END; /
Connect as the oe
user:
CONNECT oe/oe
Create an ANYDATA
queue using SET_UP_QUEUE
, as in the following example:
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'oe.notification_queue_table', queue_name => 'oe.notification_queue'); END; /
Create the types for the user-enqueued messages, as in the following example:
CREATE TYPE oe.user_msg AS OBJECT( object_name VARCHAR2(30), object_owner VARCHAR2(30), message VARCHAR2(50)); /
Create a trigger that enqueues a message into the queue whenever an order is inserted into the oe.orders
table, as in the following example:
CREATE OR REPLACE TRIGGER oe.order_insert AFTER INSERT ON oe.orders FOR EACH ROW DECLARE msg oe.user_msg; str VARCHAR2(2000); BEGIN str := 'New Order - ' || :NEW.ORDER_ID || ' Order ID'; msg := oe.user_msg( object_name => 'ORDERS', object_owner => 'OE', message => str); DBMS_STREAMS_MESSAGING.ENQUEUE ( queue_name => 'oe.notification_queue', payload => ANYDATA.CONVERTOBJECT(msg)); END; /
Create the messaging client that will dequeue messages from the queue and the rule used by the messaging client to determine which messages to dequeue, as in the following example:
BEGIN DBMS_STREAMS_ADM.ADD_MESSAGE_RULE ( message_type => 'oe.user_msg', rule_condition => ' :msg.OBJECT_OWNER = ''OE'' AND ' || ' :msg.OBJECT_NAME = ''ORDERS'' ', streams_type => 'dequeue', streams_name => 'oe', queue_name => 'oe.notification_queue'); END; /
Set the message notification to send email upon enqueue of messages that can be dequeued by the messaging client, as in the following example:
BEGIN DBMS_STREAMS_ADM.SET_MESSAGE_NOTIFICATION ( streams_name => 'oe', notification_action => 'Mary.Smith@mycompany.com', notification_type => 'MAIL', include_notification => true, queue_name => 'oe.notification_queue'); END; /
Create a PL/SQL procedure that dequeues messages using the messaging client, as in the following example:
CREATE OR REPLACE PROCEDURE oe.deq_notification(consumer IN VARCHAR2) AS msg ANYDATA; user_msg oe.user_msg; num_var PLS_INTEGER; more_messages BOOLEAN := true; navigation VARCHAR2(30); BEGIN navigation := 'FIRST MESSAGE'; WHILE (more_messages) LOOP BEGIN DBMS_STREAMS_MESSAGING.DEQUEUE( queue_name => 'oe.notification_queue', streams_name => consumer, payload => msg, navigation => navigation, wait => DBMS_STREAMS_MESSAGING.NO_WAIT); IF msg.GETTYPENAME() = 'OE.USER_MSG' THEN num_var := msg.GETOBJECT(user_msg); DBMS_OUTPUT.PUT_LINE(user_msg.object_name); DBMS_OUTPUT.PUT_LINE(user_msg.object_owner); DBMS_OUTPUT.PUT_LINE(user_msg.message); END IF; navigation := 'NEXT MESSAGE'; COMMIT; EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN navigation := 'NEXT TRANSACTION'; WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN more_messages := false; DBMS_OUTPUT.PUT_LINE('No more messages.'); WHEN OTHERS THEN RAISE; END; END LOOP; END; /
Insert rows into the oe.orders
table, as in the following example:
INSERT INTO oe.orders VALUES(2521, 'direct', 144, 0, 922.57, 159, NULL); INSERT INTO oe.orders VALUES(2522, 'direct', 116, 0, 1608.29, 153, NULL); COMMIT; INSERT INTO oe.orders VALUES(2523, 'direct', 116, 0, 227.55, 155, NULL); COMMIT;
Message notification sends a message to the email address specified in Step 9 for each message that was enqueued. Each notification is an AQXmlNotification
, which includes of the following:
notification_options
, which includes the following:
destination
- The destination queue from which the message was dequeued
consumer_name
- The name of the messaging client that dequeued the message
message_set
- The set of message properties
The following example shows the AQXmlNotification
format sent in an email notification:
<?xml version="1.0" encoding="UTF-8"?> <Envelope xmlns="http://ns.oracle.com/AQ/schemas/envelope"> <Body> <AQXmlNotification xmlns="http://ns.oracle.com/AQ/schemas/access"> <notification_options> <destination>OE.NOTIFICATION_QUEUE</destination> <consumer_name>OE</consumer_name> </notification_options> <message_set> <message> <message_header> <message_id>CB510DDB19454731E034080020AE3E0A</message_id> <expiration>-1</expiration> <delay>0</delay> <priority>1</priority> <delivery_count>0</delivery_count> <sender_id> <agent_name>OE</agent_name> <protocol>0</protocol> </sender_id> <message_state>0</message_state> </message_header> </message> </message_set> </AQXmlNotification> </Body> </Envelope>
You can dequeue the messages enqueued in this example by running the oe.deq_notification
procedure:
SET SERVEROUTPUT ON SIZE 100000 EXEC oe.deq_notification('OE');
See Also:
Chapter 6, "How Rules Are Used in Streams" for more information about rule sets for Streams clients and for information about how messages satisfy rule sets
Oracle Streams Advanced Queuing User's Guide and Reference and Oracle XML DB Developer's Guide for more information about message notifications and XML