This chapter describes the Java Message Service (JMS) operational interface (shared interfaces) to Oracle Database Advanced Queuing (AQ).
This chapter contains these topics:
Oracle Database Advanced Queuing JMS Operational Interface: Shared Interfaces
Setting Default TimeToLive for All Messages Sent by a MessageProducer
Setting Default Priority for All Messages Sent by a MessageProducer
This section discusses Oracle Database Advanced Queuing shared interfaces for JMS operations.
This section contains these topics:
public void start() throws JMSException
AQjmsConnection.start()
starts a JMS connection for receiving messages.
public oracle.jms.AQjmsConnection getJmsConnection() throws JMSException
AQjmsSession.getJmsConnection()
gets a JMS connection from a session.
public void commit() throws JMSException
AQjmsSession.commit()
commits all JMS and SQL operations performed in a session.
public void rollback() throws JMSException
AQjmsSession.rollback()
terminates all JMS and SQL operations performed in a session.
public java.sql.Connection getDBConnection() throws JMSException
AQjmsSession.getDBConnection()
gets the underlying JDBC connection from a JMS session. The JDBC connection can be used to perform SQL operations as part of the same transaction in which the JMS operations are accomplished.
public oracle.jdbc.pool.OracleOCIConnectionPool getOCIConnectionPool()
AQjmsConnection.getOCIConnectionPool()
gets the underlying OracleOCIConnectionPool
from a JMS connection. The settings of the OracleOCIConnectionPool
instance can be tuned by the user depending on the connection usage, for example, the number of sessions the user wants to create using the given connection. The user should not, however, close the OracleOCIConnectionPool
instance being used by the JMS connection.
public javax.jms.BytesMessage createBytesMessage() throws JMSException
AQjmsSession.createBytesMessage()
creates a bytes message. It can be used only if the queue table that contains the destination queue/topic was created with the SYS.AQ$_JMS_BYTES_MESSAGE
or AQ$_JMS_MESSAGE
payload types.
public javax.jms.MapMessage createMapMessage() throws JMSException
AQjmsSession.createMapMessage()
creates a map message. It can be used only if the queue table that contains the destination queue/topic was created with the SYS.AQ$_JMS_MAP_MESSAGE
or AQ$_JMS_MESSAGE
payload types.
public javax.jms.StreamMessage createStreamMessage() throws JMSException
AQjmsSession.createStreamMessage()
creates a stream message. It can be used only if the queue table that contains the destination queue/topic was created with the SYS.AQ$_JMS_STREAM_MESSAGE
or AQ$_JMS_MESSAGE
payload types.
public javax.jms.ObjectMessage createObjectMessage(java.io.Serializable object) throws JMSException
AQjmsSession.createObjectMessage()
creates an object message. It can be used only if the queue table that contains the destination queue/topic was created with the SYS.AQ$_JMS_OBJECT_MESSAGE
or AQ$_JMS_MESSAGE
payload types.
public javax.jms.TextMessage createTextMessage() throws JMSException
AQjmsSession.createTextMessage()
creates a text message. It can be used only if the queue table that contains the destination queue/topic was created with the SYS.AQ$_JMS_TEXT_MESSAGE
or AQ$_JMS_MESSAGE
payload types.
public javax.jms.Message createMessage() throws JMSException
AQjmsSession.createMessage()
creates a JMS message. You can use the AQ$_JMS_MESSAGE
construct message to construct messages of different types. The message type must be one of the following:
DBMS_AQ.JMS_TEXT_MESSAGE
DBMS_AQ.JMS_OBJECT_MESSAGE
DBMS_AQ.JMS_MAP_MESSAGE
DBMS_AQ.JMS_BYTES_MESSAGE
DBMS_AQ.JMS_STREAM_MESSAGE
You can also use this ADT to create a header-only JMS message.
public oracle.jms.AdtMessage createAdtMessage() throws JMSException
AQjmsSession.createAdtMessage()
creates an AdtMessage
. It can be used only if the queue table that contains the queue/topic was created with an Oracle ADT payload type. An AdtMessage
must be populated with an object that implements the CustomDatum
interface. This object must be the Java mapping of the SQL ADT defined as the payload for the queue/topic. Java classes corresponding to SQL ADT types can be generated using the Jpublisher tool.
Property names starting with JMS are provider-specific. User-defined properties cannot start with JMS.
The following provider properties can be set by clients using text, stream, object, bytes or map messages:
JMSXAppID (string)
JMSXGroupID (string)
JMSXGroupSeq (int)
JMS_OracleExcpQ (string)
This message property specifies the exception queue.
JMS_OracleDelay (int)
This message property specifies the message delay in seconds.
The following properties can be set on AdtMessage
JMS_OracleExcpQ (String)
This message property specifies the exception queue as "schema
.queue_name
"
JMS_OracleDelay (int)
This message property specifies the message delay in seconds.
This section contains these topics:
public void setBooleanProperty(java.lang.String name, boolean value) throws JMSException
AQjmsMessage.setBooleanProperty()
specifies a message property as Boolean. It has the following parameters:
Parameter | Description |
---|---|
name |
Name of the Boolean property |
value |
Boolean property value to set in the message |
public void setStringProperty(java.lang.String name, java.lang.String value) throws JMSException
AQjmsMessage.setStringProperty()
specifies a message property as string. It has the following parameters:
Parameter | Description |
---|---|
name |
Name of the string property |
value |
String property value to set in the message |
public void setIntProperty(java.lang.String name, int value) throws JMSException
AQjmsMessage.setIntProperty()
specifies a message property as integer. It has the following parameters:
Parameter | Description |
---|---|
name |
Name of the integer property |
value |
Integer property value to set in the message |
public void setDoubleProperty(java.lang.String name, double value) throws JMSException
AQjmsMessage.setDoubleProperty()
specifies a message property as double. It has the following parameters:
Parameter | Description |
---|---|
name |
Name of the double property |
value |
Double property value to set in the message |
public void setFloatProperty(java.lang.String name, float value) throws JMSException
AQjmsMessage.setFloatProperty()
specifies a message property as float. It has the following parameters:
Parameter | Description |
---|---|
name |
Name of the float property |
value |
Float property value to set in the message |
public void setByteProperty(java.lang.String name, byte value) throws JMSException
AQjmsMessage.setByteProperty()
specifies a message property as byte. It has the following parameters:
Parameter | Description |
---|---|
name |
Name of the byte property |
value |
Byte property value to set in the message |
public void setLongProperty(java.lang.String name, long value) throws JMSException
AQjmsMessage.setLongProperty()
specifies a message property as long. It has the following parameters:
Parameter | Description |
---|---|
name |
Name of the long property |
value |
Long property value to set in the message |
public void setShortProperty(java.lang.String name, short value) throws JMSException
AQjmsMessage.setShortProperty()
specifies a message property as short. It has the following parameters:
Parameter | Description |
---|---|
name |
Name of the short property |
value |
Short property value to set in the message |
public void setObjectProperty(java.lang.String name, java.lang.Object value) throws JMSException
AQjmsMessage.setObjectProperty()
specifies a message property as object. Only objectified primitive values are supported: Boolean, byte, short, integer, long, float, double and string. It has the following parameters:
Parameter | Description |
---|---|
name |
Name of the Java object property |
value |
Java object property value to set in the message |
public void setTimeToLive(long timeToLive) throws JMSException
This method sets the default TimeToLive
for all messages sent by a MessageProducer
. It is calculated after message delay has taken effect. This method has the following parameter:
Parameter | Description |
---|---|
timeToLive |
Message time to live in milliseconds (zero is unlimited) |
public void setPriority(int priority) throws JMSException
This method sets the default Priority
for all messages sent by a MessageProducer
. It has the following parameter:
Parameter | Description |
---|---|
priority |
Message priority for this message producer. The default is 4. |
Priority values can be any integer. A smaller number indicates higher priority. If a priority value is explicitly specified during a send()
operation, then it overrides the default value set by this method.
public void createAQAgent(java.lang.String agent_name, boolean enable_http, throws JMSException
This method creates an AQjmsAgent
. It has the following parameters:
Parameter | Description |
---|---|
agent_name |
Name of the AQ agent |
enable_http |
If set to true, then this agent is allowed to access AQ through HTTP |
You can receive a message synchronously by specifying Timeout or without waiting. You can also receive a message using a transformation:
public javax.jms.Message receive(long timeout) throws JMSException
This method receives a message using a message consumer by specifying timeout.
Parameter | Description |
---|---|
timeout |
Timeout value in milliseconds |
Example 15-6 Using a Message Consumer by Specifying Timeout
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = null; TopicSession jms_sess; Topic shipped_orders; int myport = 5521; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "WS", "Shipped_Orders_Topic"); /* create a subscriber, specifying the correct CustomDatumFactory and selector */ subscriber1 = jms_sess.createDurableSubscriber( shipped_orders, 'WesternShipping', " priority > 1 and tab.user_data.region like 'WESTERN %'", false, AQjmsAgent.getFactory()); /* receive, blocking for 30 seconds if there were no messages */ Message = subscriber.receive(30000);
Example 15-7 JMS: Blocking Until a Message Arrives
public BolOrder get_new_order1(QueueSession jms_session) { Queue queue; QueueReceiver qrec; ObjectMessage obj_message; BolCustomer customer; BolOrder new_order = null; String state; try { /* get a handle to the new_orders queue */ queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que"); qrec = jms_session.createReceiver(queue); /* wait for a message to show up in the queue */ obj_message = (ObjectMessage)qrec.receive(); new_order = (BolOrder)obj_message.getObject(); customer = new_order.getCustomer(); state = customer.getState(); System.out.println("Order: for customer " + customer.getName()); } catch (JMSException ex) { System.out.println("Exception: " + ex); } return new_order; }
public javax.jms.Message receiveNoWait() throws JMSException
This method receives a message using a message consumer without waiting.
Example 15-8 JMS: Nonblocking Messages
public BolOrder poll_new_order3(QueueSession jms_session) { Queue queue; QueueReceiver qrec; ObjectMessage obj_message; BolCustomer customer; BolOrder new_order = null; String state; try { /* get a handle to the new_orders queue */ queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que"); qrec = jms_session.createReceiver(queue); /* check for a message to show in the queue */ obj_message = (ObjectMessage)qrec.receiveNoWait(); new_order = (BolOrder)obj_message.getObject(); customer = new_order.getCustomer(); state = customer.getState(); System.out.println("Order: for customer " + customer.getName()); } catch (JMSException ex) { System.out.println("Exception: " + ex); } return new_order; }
A transformation can be applied when receiving a message from a queue or topic. The transformation is applied to the message before returning it to JMS application.
The transformation can be specified using the setTransformation()
interface of the AQjmsQueueReceiver
, AQjmsTopicSubscriber
or AQjmsTopicReceiver
.
Example 15-9 JMS: Receiving Messages from a Destination Using a Transformation
Assume that the Western Shipping application retrieves messages from the OE_bookedorders_topic. It specifies the transformation OE2WS
to retrieve the message as the Oracle object type WS_order
. Assume that the WSOrder Java class has been generated by Jpublisher to map to the Oracle object WS.WS_order
:
public AQjmsAdtMessage retrieve_bookedorders(TopicSession jms_session) AQjmsTopicReceiver receiver; Topic topic; Message msg = null; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("OE", "OE_bookedorders_topic"); /* Create a receiver for WShip */ receiver = ((AQjmsSession)jms_session).createTopicReceiver( topic, "WShip, null, WSOrder.getFactory()); /* set the transformation in the publisher */ receiver.setTransformation("OE2WS"); msg = receiver.receive(10); } catch (JMSException ex) { System.out.println("Exception :", ex); } return (AQjmsAdtMessage)msg; }
public void setNavigationMode(int mode) throws JMSException
This method specifies the navigation mode for receiving messages. It has the following parameter:
Parameter | Description |
---|---|
mode |
New value of the navigation mode |
Example 15-10 Specifying Navigation Mode for Receiving Messages
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = null; TopicSession jms_sess; Topic shipped_orders; int myport = 5521; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic("WS", "Shipped_Orders_Topic"); /* create a subscriber, specifying the correct CustomDatumFactory and selector */ subscriber1 = jms_sess.createDurableSubscriber( shipped_orders, 'WesternShipping', "priority > 1 and tab.user_data.region like 'WESTERN %'", false, AQjmsAgent.getFactory()); subscriber1.setNavigationMode(AQjmsConstants.NAVIGATION_FIRST_MESSAGE); /* get message for the subscriber, returning immediately if there was nomessage */ Message = subscriber.receive();
You can receive a message asynchronously two ways:
public void setMessageListener(javax.jms.MessageListener myListener) throws JMSException
This method specifies a message listener at the message consumer. It has the following parameter:
Parameter | Description |
---|---|
myListener |
Sets the consumer message listener |
Example 15-11 Specifying Message Listener at Message Consumer
TopicConnectionFactory tc_fact = null; TopicConnection t_conn = null; TopicSession t_sess = null; TopicSession jms_sess; Topic shipped_orders; int myport = 5521; MessageListener mLis = null; /* create connection and session */ tc_fact = AQjmsFactory.getTopicConnectionFactory( "MYHOSTNAME", "MYSID", myport, "oci8"); t_conn = tc_fact.createTopicConnection("jmstopic", "jmstopic"); jms_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); shipped_orders = ((AQjmsSession )jms_sess).getTopic( "WS", "Shipped_Orders_Topic"); /* create a subscriber, specifying the correct CustomDatumFactory and selector */ subscriber1 = jms_sess.createDurableSubscriber( shipped_orders, 'WesternShipping', "priority > 1 and tab.user_data.region like 'WESTERN %'", false, AQjmsAgent.getFactory()); mLis = new myListener(jms_sess, "foo"); /* get message for the subscriber, returning immediately if there was nomessage */ subscriber.setMessageListener(mLis); The definition of the myListener class import oracle.AQ.*; import oracle.jms.*; import javax.jms.*; import java.lang.*; import java.util.*; public class myListener implements MessageListener { TopicSession mySess; String myName; /* constructor */ myListener(TopicSession t_sess, String t_name) { mySess = t_sess; myName = t_name; } public onMessage(Message m) { System.out.println("Retrieved message with correlation: " || m.getJMSCorrelationID()); try{ /* commit the dequeue */ mySession.commit(); } catch (java.sql.SQLException e) {System.out.println("SQL Exception on commit"); } } }
This section contains these topics:
public java.lang.String getJMSCorrelationID() throws JMSException
AQjmsMessage.getJMSCorrelationID()
gets the correlation identifier of a message.
This section contains these topics:
public boolean getBooleanProperty(java.lang.String name) throws JMSException
AQjmsMessage.getBooleanProperty()
gets a message property as Boolean. It has the following parameter:
Parameter | Description |
---|---|
name |
Name of the Boolean property |
public java.lang.String getStringProperty(java.lang.String name) throws JMSException
AQjmsMessage.getStringProperty()
gets a message property as string. It has the following parameter:
Parameter | Description |
---|---|
name |
Name of the string property |
public int getIntProperty(java.lang.String name) throws JMSException
AQjmsMessage.getIntProperty()
gets a message property as integer. It has the following parameter:
Parameter | Description |
---|---|
name |
Name of the integer property |
public double getDoubleProperty(java.lang.String name) throws JMSException
AQjmsMessage.getDoubleProperty()
gets a message property as double. It has the following parameter:
Parameter | Description |
---|---|
name |
Name of the double property |
public float getFloatProperty(java.lang.String name) throws JMSException
AQjmsMessage.getFloatProperty()
gets a message property as float. It has the following parameter:
Parameter | Description |
---|---|
name |
Name of the float property |
public byte getByteProperty(java.lang.String name) throws JMSException
AQjmsMessage.getByteProperty()
gets a message property as byte. It has the following parameter:
Parameter | Description |
---|---|
name |
Name of the byte property |
public long getLongProperty(java.lang.String name) throws JMSException
AQjmsMessage.getLongProperty()
gets a message property as long. It has the following parameter:
Parameter | Description |
---|---|
name |
Name of the long property |
public short getShortProperty(java.lang.String name) throws JMSException
AQjmsMessage.getShortProperty() gets a message property as short. It has the following parameter:
Parameter | Description |
---|---|
name |
Name of the short property |
This section contains these topics:
public void close() throws JMSException
AQjmsProducer.close()
closes a MessageProducer
.
public void close() throws JMSException
AQjmsConsumer.close()
closes a message consumer.
public void stop() throws JMSException
AQjmsConnection.stop()
stops a JMS connection.
public void close() throws JMSException
AQjmsSession.close()
closes a JMS session.
public void close() throws JMSException
AQjmsConnection.close()
closes a JMS connection and releases all resources allocated on behalf of the connection. Because the JMS provider typically allocates significant resources outside the JVM on behalf of a connection, clients should close them when they are not needed. Relying on garbage collection to eventually reclaim these resources may not be timely enough.
This section contains these topics:
public java.lang.String getErrorCode()
AQjmsException.getErrorCode()
gets the error code for a JMS exception.
public int getErrorNumber()
AQjmsException.getErrorNumber()
gets the error number for a JMS exception.
Note:
This method will be deprecated in a future release. UsegetErrorCode()
instead.public java.lang.String getLinkString()
AQjmsException.getLinkString()
gets the exception linked to a JMS exception. In general, this contains the SQL exception raised by the database.
public void printStackTrace(java.io.PrintStream s)
AQjmsException.printStackTrace()
prints the stack trace for a JMS exception.
public void setExceptionListener(javax.jms.ExceptionListener listener) throws JMSException
AQjmsConnection.setExceptionListener()
specifies an exception listener for a connection. It has the following parameter:
Parameter | Description |
---|---|
listener |
Exception listener |
If an exception listener has been registered, then it is informed of any serious problem detected for a connection. This is accomplished by calling the listener onException()
method, passing it a JMS exception describing the problem. This allows a JMS client to be notified of a problem asynchronously. Some connections only consume messages, so they have no other way to learn the connection has failed.
Example 15-13 Specifying Exception Listener for Connection
//register an exception listener Connection jms_connection; jms_connection.setExceptionListener( new ExceptionListener() { public void onException (JMSException jmsException) { System.out.println("JMS-EXCEPTION: " + jmsException.toString()); } }; );
public javax.jms.ExceptionListener getExceptionListener() throws JMSException
AQjmsConnection.getExceptionListener()
gets the exception listener for the connection.
Example 15-14 demonstrates how to use ExceptionListener
with MessageListener
. Ensure that the following conditions are met:
The user jmsuser
with password jmsuser
is created in the database, with appropriate privileges.
The queue demoQueue
is created and started.
This example demonstrates how to make the MessageListener asynchronously receive the messages, where the exception listener recreates the JMS objects in case there is a connection restart.
Example 15-14 Using ExceptionListener with MessageListener
import java.util.Enumeration; import java.util.Properties; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import oracle.jms.AQjmsConnectionFactory; import oracle.jms.AQjmsFactory; import oracle.jms.AQjmsSession; public class JMSDemo { static String queueName = "demoQueue"; static String queueOwner = "jmsuser"; static String queueOwnerPassword = "jmsuser"; static Connection connection = null; static int numberOfMessages = 25000; static int messageCount = 0; static String jdbcURL = ""; public static void main(String args[]) { try { jdbcURL = System.getProperty("JDBC_URL"); if (jdbcURL == null) System.out .println("The system property JDBC_URL has not been set, " + "usage:java -DJDBC_URL=xxx filename "); else { JMSDemo demo = new JMSDemo(); demo.performJmsOperations(); } } catch (Exception exception) { System.out.println("Exception : " + exception); exception.printStackTrace(); } finally { try { if (connection != null) connection.close(); } catch (Exception exc) { exc.printStackTrace(); } } System.out.println("\nEnd of Demo aqjmsdemo11."); } public void performJmsOperations() { try { connection = getConnection(jdbcURL); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); // remove the messages from the Queue drainQueue(queueName, queueOwner, jdbcURL, true); // set the exception listener on the Connection connection.setExceptionListener(new DemoExceptionListener()); MessageProducer producer = session.createProducer(queue); TextMessage textMessage = null; System.out.println("Sending " + numberOfMessages + " messages to queue " + queueName); for (int i = 0; i < numberOfMessages; i++) { textMessage = session.createTextMessage(); textMessage.setText("Sample message text"); producer.send(textMessage); } MessageConsumer consumer = session.createConsumer(queue); System.out.println("Setting the message listener ..."); consumer.setMessageListener(new DemoMessageListener()); connection.start(); // Introduce a long wait to allow the listener to receive all the messages while (messageCount < numberOfMessages) { try { Thread.sleep(5000); } catch (InterruptedException interruptedException) { } } } catch (JMSException jmsException) { jmsException.printStackTrace(); } } // Sample message listener static class DemoMessageListener implements javax.jms.MessageListener { public void onMessage(Message message) { try { System.out.println("Message listener received message with JMSMessageID " + message.getJMSMessageID()); messageCount++; } catch (JMSException jmsException) { System.out.println("JMSException " + jmsException.getMessage()); } } } // sample exception listener static class DemoExceptionListener implements javax.jms.ExceptionListener { public void onException(JMSException jmsException) { try { // As a first step close the connection if (connection != null) connection.close(); } catch (JMSException exception) {} try { System.out.println("Re-create the necessary JMS objects ..."); connection = getConnection(jdbcURL); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new DemoMessageListener()); } catch (JMSException newJmsException) { newJmsException.printStackTrace(); } } } // Utility method to get a connection static Connection getConnection(String jdbcUrl) throws JMSException { Properties prop = new Properties(); prop.put("user", queueOwner); prop.put("password", queueOwnerPassword); AQjmsConnectionFactory fact = (AQjmsConnectionFactory) AQjmsFactory .getConnectionFactory(jdbcUrl, prop); Connection conn = fact.createConnection(); return conn; } // Utility method to remove the messages from the queue static void drainQueue(String queueName, String queueOwner, String jdbcUrl, boolean debugInfo) { Connection connection = null; Session session = null; long timeout = 10000; int count = 0; Message message = null; try { connection = getConnection(jdbcUrl); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = ((AQjmsSession) session).getQueue(queueOwner, queueName); MessageConsumer messageConsumer = session.createConsumer(queue); QueueBrowser browser = session.createBrowser(queue); Enumeration enumeration = browser.getEnumeration(); if (enumeration.hasMoreElements()) { while ((message = messageConsumer.receive(timeout)) != null) { if (debugInfo) { count++; } } } messageConsumer.close(); if (debugInfo) { System.out.println("Removed " + count + " messages from the queue : " + queueName); } } catch (JMSException jmsException) { jmsException.printStackTrace(); } finally { try { if (session != null) session.close(); if (connection != null) connection.close(); } catch (Exception exception) { } } } }