QRetailerpublic class QRetailer extends Object implements javax.jms.MessageListener
Fields Summary |
---|
private javax.jms.QueueConnection | qConnect | private javax.jms.QueueSession | qSession | private javax.jms.QueueSender | qSender | private javax.jms.TopicConnection | tConnect | private javax.jms.TopicSession | tSession | private javax.jms.TopicPublisher | tPublisher | private javax.jms.Topic | hotDealsTopic | private javax.jms.TopicSubscriber | tSubscriber | private static int | msgCount | private String | saveDesc |
Constructors Summary |
---|
public QRetailer(String broker, String username, String password)
try{
TopicConnectionFactory tFactory = null;
QueueConnectionFactory qFactory = null;
InitialContext jndi = null;
Properties env = new Properties();
// ... specify the JNDI properties specific to the JNDI SPI being used
env.put("BROKER", broker);
jndi = new InitialContext(env);
tFactory =
(TopicConnectionFactory)jndi.lookup("TopicConnectionFactory");
qFactory =
(QueueConnectionFactory)jndi.lookup("QueueConnectionFactory");
tConnect = tFactory.createTopicConnection (username, password);
qConnect = qFactory.createQueueConnection (username, password);
tConnect.setClientID(username);
qConnect.setClientID(username);
tSession =
tConnect.createTopicSession(false,Session.CLIENT_ACKNOWLEDGE);
qSession =
qConnect.createQueueSession(false,javax.jms.Session.AUTO_ACKNOWLEDGE);
hotDealsTopic = (Topic)jndi.lookup("Hot Deals");
tSubscriber = tSession.createDurableSubscriber(hotDealsTopic,
"Hot Deals Subscription");
tSubscriber.setMessageListener(this);
tConnect.start();
}catch (javax.jms.JMSException jmse){
jmse.printStackTrace();
System.exit(1);
}catch(javax.naming.NamingException jne){
jne.printStackTrace(); System.exit(1);
}
|
Methods Summary |
---|
private void | autoBuy(javax.jms.Message message)
int count = 1000;
try{
StreamMessage strmMsg = (StreamMessage)message;
//must reset the message stream so that it can be read from the beginning
if( strmMsg.getJMSRedelivered() )
strmMsg.reset();
String dealDesc = strmMsg.readString();
String itemDesc = strmMsg.readString();
float oldprice = strmMsg.readFloat();
float newprice = strmMsg.readFloat();
System.out.println("Received Hot Buy :"+dealDesc);
if ( saveDesc == null){
if (message.getJMSRedelivered())
processCompensatingTransaction();
processInterimMessages( itemDesc );
return;
}
// if price reduction greater than 10 percent, buy
if ((newprice == 0 || oldprice / newprice > 1.1)){
TextMessage textMsg = tSession.createTextMessage();
textMsg.setText(count + " " + saveDesc + ", "
+ count + " " + itemDesc );
textMsg.setJMSCorrelationID("DurableRetailer");
Queue buyQueue = (Queue)message.getJMSReplyTo();
System.out.println ("\nBuying " + count + " "
+ saveDesc + " " + count + " " + itemDesc);
qSender = qSession.createSender(buyQueue);
qSender.send( textMsg,
javax.jms.DeliveryMode.PERSISTENT,
javax.jms.Message.DEFAULT_PRIORITY,
1800000);
// acknowledge the original message
try{
System.out.println("\nAcknowledging messages");
message.acknowledge();
System.out.println("\nMessage acknowledged");
saveDesc = null;
}catch (javax.jms.JMSException jmse){
System.out.println("\nAcknowledgement failed." +
"\nProcessing compensating transaction for interim messages");
processCompensatingTransaction();
}
}else{
System.out.println ("\nBad Deal. Not buying");
}
}catch (javax.jms.JMSException jmse){
jmse.printStackTrace();
}
| private void | exit(java.lang.String s)
try {
if ( s != null &&
s.equalsIgnoreCase("unsubscribe"))
{
tSubscriber.close();
tSession.unsubscribe("Hot Deals Subscription");
}
tConnect.close();
qConnect.close();
}catch (javax.jms.JMSException jmse){
jmse.printStackTrace();
}
System.exit(0);
| public static void | main(java.lang.String[] argv)
String broker, username, password;
if(argv.length == 3){
broker = argv[0];
username = argv[1];
password = argv[2];
}else{
System.out.println("Invalid arguments. Should be: ");
System.out.println
("java QRetailer broker username password");
return;
}
QRetailer retailer = new QRetailer(broker, username, password);
try{
System.out.println("\nRetailer application started.\n");
// Read all standard input and send it as a message.
java.io.BufferedReader stdin =
new java.io.BufferedReader
( new java.io.InputStreamReader( System.in ) );
while ( true ){
String s = stdin.readLine();
if ( s == null )retailer.exit(null);
else if ( s.equalsIgnoreCase("unsubscribe") )
retailer.exit ( s );
}
}catch ( java.io.IOException ioe ){
ioe.printStackTrace();
}
| public void | onMessage(javax.jms.Message aMessage)
try{
autoBuy(aMessage);
}catch (java.lang.RuntimeException rte){
rte.printStackTrace();
}
| private void | processCompensatingTransaction()
System.out.println("Processing compensating");
saveDesc = null; // null out "saved" work
| private void | processInterimMessages(java.lang.String itemDesc)
saveDesc = itemDesc;
|
|