Monday, July 25, 2011

Oracle AQ programmatic Solution(Example)

This article demonstrates a Sample Oracle AQJMS program to send & receive Object Messages between two Clients.

Setups Needed in DataBase side

Grant AQ Permission to the DB Schema User.(Say HR)
connect system/manager as sysdba;

grant aq_administrator_role to <schema>;
grant execute on dbms_aqadm to <schema>;
grant execute on dbms_aq to <schema>;
grant execute on dbms_aqin to <schema>;
grant execute on dbms_aqjms to <schema>;


Database Objects Needed to be created(Comments are self Explanatory)

We need to create an AQ Table,AQ Queue and Start the Queue(s)

set serveroutput on

declare
begin
dbms_aqadm.stop_queue(queue_name => 'SMM_AQ_FACEBOOK_Q');
dbms_aqadm.stop_queue(queue_name => 'SMM_AQ_TWITTER_Q');
dbms_aqadm.stop_queue(queue_name => 'SMM_AQ_RSS_Q');
dbms_aqadm.drop_queue(queue_name => 'SMM_AQ_FACEBOOK_Q');
dbms_aqadm.drop_queue(queue_name => 'SMM_AQ_TWITTER_Q');
dbms_aqadm.drop_queue(queue_name => 'SMM_AQ_RSS_Q');
dbms_aqadm.drop_queue_table(queue_table => 'SMM_AQ_QT');
exception
WHEN OTHERS THEN
dbms_output.put_line (SQLCODE||' Drop Queue Objects ' || SUBSTR(SQLERRM, 1, 256));
END;
/

set serveroutput on
DECLARE
BEGIN
dbms_output.put_line ('Creating Queue Table SMM_AQ_QT.');
dbms_aqadm.CREATE_queue_table( queue_table => 'SMM_AQ_QT', queue_payload_type => 'SYS.AQ$_JMS_OBJECT_MESSAGE', sort_list=> 'ENQ_TIME,PRIORITY', multiple_consumers => FALSE, message_grouping=> DBMS_AQADM.NONE, primary_instance=> '0', secondary_instance=> '0', COMMENT => 'Creating Queue Table SMM_AQ_RSS_QT');
dbms_output.put_line ('Created Queue Table SMM_AQ_QT.');


EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line (SQLCODE||' Create Queue Table ' || SUBSTR(SQLERRM, 1, 256));
END;
/



DECLARE
BEGIN
dbms_output.put_line ('Creating Queue SMM_AQ_RSS_Q...');
dbms_aqadm.CREATE_queue( queue_name => 'SMM_AQ_RSS_Q', queue_table => 'SMM_AQ_QT', queue_type=> DBMS_AQADM.NORMAL_QUEUE, max_retries=> '5', retry_delay=> '0', retention_time=> '0', COMMENT => 'Resource Registration Queue');
dbms_output.put_line ('Created Queue SMM_AQ_RSS_Q.');

dbms_output.put_line ('Creating Queue SMM_AQ_FACEBOOK_Q...');
dbms_aqadm.CREATE_queue( queue_name => 'SMM_AQ_FACEBOOK_Q', queue_table => 'SMM_AQ_QT', queue_type=> DBMS_AQADM.NORMAL_QUEUE, max_retries=> '5', retry_delay=> '0', retention_time=> '0', COMMENT => 'Resource Registration Queue');
dbms_output.put_line ('Created Queue SMM_AQ_FACEBOOK_Q.');

dbms_output.put_line ('Creating Queue SMM_AQ_TWITTER_Q...');
dbms_aqadm.CREATE_queue( queue_name => 'SMM_AQ_TWITTER_Q', queue_table => 'SMM_AQ_QT', queue_type=> DBMS_AQADM.NORMAL_QUEUE, max_retries=> '5', retry_delay=> '0', retention_time=> '0', COMMENT => 'Resource Registration Queue');
dbms_output.put_line ('Created Queue SMM_AQ_TWITTER_Q.');


EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line(SQLCODE|| ' Create Queue ' || SUBSTR(SQLERRM, 1, 256));
END;
/


DECLARE
BEGIN
dbms_output.put_line('starting queue SMM_AQ_RSS_Q...');
dbms_aqadm.start_queue( queue_name => 'SMM_AQ_RSS_Q');
dbms_output.put_line ('Started Queue SMM_AQ_RSS_Q.');

dbms_output.put_line('starting queue SMM_AQ_FACEBOOK_Q...');
dbms_aqadm.start_queue( queue_name => 'SMM_AQ_FACEBOOK_Q');
dbms_output.put_line ('Started Queue SMM_AQ_FACEBOOK_Q.');

dbms_output.put_line('starting queue SMM_AQ_TWITTER_Q...');
dbms_aqadm.start_queue( queue_name => 'SMM_AQ_TWITTER_Q');
dbms_output.put_line ('Started Queue SMM_AQ_TWITTER_Q.');

EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line (SQLCODE || ' Start Queue ' || SUBSTR(SQLERRM, 1, 256));
END;
/

grant all on SYSTEM.SMM_AQ_QT to <schema>;
/

Serialized POJO Object to send and receive from Oracle AQ
import java.io.Serializable;

public class SmmPojo implements Serializable{
private String name;
private Long rollnumber;
public SmmPojo() {
super();
}

public void setName(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setRollnumber(Long rollnumber) {
this.rollnumber = rollnumber;
}

public Long getRollnumber() {
return rollnumber;
}
}


JMS Client programs with Send and Receive code


Below is a Managed bean code.You can create a very simple ADF WebApplication with a DB DataSource called "SmmApp" in the WLS.

In the JSPX page, create 2 buttons and associate them with the 2 ActionEvents shown below[sendMessage() and ReceiveMessage()]


import javax.faces.event.ActionEvent;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import javax.naming.Context;
import javax.naming.InitialContext;

import oracle.apps.atk.ess.view.SmmPojo;

import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.jms.AQjmsTextMessage;
import oracle.jms.AQjmsObject;

public class DemoBean {
public DemoBean() {
}

public void sendMessage(ActionEvent actionEvent) {
// Add event code here...
QSendFunction();
}

public void QSendFunction() {
QueueConnection qc = null;
try {
Context jndiContext = new InitialContext();
javax.sql.DataSource smmAppDS
= (javax.sql.DataSource) jndiContext.lookup ("jdbc/SmmAppDS");

QueueConnectionFactory qcf =
AQjmsFactory.getQueueConnectionFactory(smmAppDS);
qc = qcf.createQueueConnection();

//Message delivery does not begin until you start the connection you created by calling the start method
qc.start();

//session is not transacted
QueueSession m_queueSess =
qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);


// Destination is Queue created in database
Queue m_queue =
((AQjmsSession)m_queueSess).getQueue("", "SMM_AQ_TWITTER_Q");

QueueSender m_sender = m_queueSess.createSender(m_queue);

SmmPojo smmMessagePojo = new SmmPojo();
smmMessagePojo.setName("AMulya Mishra");
smmMessagePojo.setRollnumber(931L);
ObjectMessage objectMessage = m_queueSess.createObjectMessage();
objectMessage.setObject(smmMessagePojo);

//Message mesg = m_queueSess.createTextMessage("Test Message");

m_sender.send(objectMessage);

System.out.println("Message Sent->");

} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
try {
qc.stop();
qc.close();
} catch (JMSException e1) {
}
}
}

public void ReceiveMessage(ActionEvent actionEvent) {
// Add event code here...
QRecvFunction();
}

public void QRecvFunction() {
QueueConnection qc = null;
try {

// QueueConnectionFactory is the preconfigured JMS Object set by administrator.
Context jndiContext = new InitialContext();
javax.sql.DataSource smmAppDS
= (javax.sql.DataSource) jndiContext.lookup ("jdbc/SmmAppDS");

QueueConnectionFactory qcf =
AQjmsFactory.getQueueConnectionFactory(smmAppDS);

// Does not created Destination, there is no look to JNDI that holds QF and Destination by JMS Client (this program)
// we are directly using JMS Client to make JMS Provider
// QueueConnection to JMS Provider ( database with JMS Services demon is the service provider )
// User can create one more connections to QF

qc = qcf.createQueueConnection();
// Start the Connection
//Message delivery does not begin until you start the connection you created by calling the start method
qc.start();

//A session is a single-threaded context for producing and consuming messages
//QueueSession m_queueSess = qc.createQueueSession(true, 0);

QueueSession m_queueSess =
qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue m_queue =
((AQjmsSession)m_queueSess).getQueue("", "SMM_AQ_TWITTER_Q");

//Once you have created a message consumer, it becomes active, and you can use it to receive messages
//a session and is used for receiving messages sent to a destination.
//A message consumer allows a JMS client to register interest in a destination with a JMS provider.
//The JMS provider manages the delivery of messages

QueueReceiver m_receiver = m_queueSess.createReceiver(m_queue);

// Synchronous messaging
//A receiver explicitly fetches the message from the destination by calling the receive method.
// The receive method can block until a message arrives or can time out if a message does not arrive within a specified time limit
//if you do not want your program to consume system resources unnecessarily, use a timed synchronous receive ex:- receive(1000) timeout 1000ms
ObjectMessage objectMessage = (ObjectMessage)m_receiver.receive(5000);
//Message mesg = m_receiver.receive(5000);
//System.out.println(((AQjmsTextMessage)mesg).getText());
if (objectMessage != null){
oracle.jms.AQjmsObjectMessage aqjmsObjectMessage = (oracle.jms.AQjmsObjectMessage)objectMessage;
SmmPojo smmMessagePojo = (SmmPojo)objectMessage.getObject();
System.out.println(smmMessagePojo.getName()+"--->"+smmMessagePojo.getRollnumber());
}
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
try {
qc.stop();
qc.close();
} catch (JMSException e1) {
}
}
}
}

1 comment:

Adi said...

Good Post