FileDocCategorySizeDatePackage
PTPMessagingClient.javaAPI DocExample9396Thu Dec 15 21:32:52 GMT 2005com.oreilly.jent.jms

PTPMessagingClient.java

package com.oreilly.jent.jms;

/**
 * In general, you may use the code in this book in your programs and 
 * documentation. You do not need to contact us for permission unless 
 * you're reproducing a significant portion of the code. For example, 
 * writing a program that uses several chunks of code from this book does 
 * not require permission. Selling or distributing a CD-ROM of examples 
 * from O'Reilly books does require permission. Answering a question by 
 * citing this book and quoting example code does not require permission. 
 * Incorporating a significant amount of example code from this book into 
 * your product's documentation does require permission.
 * 
 * We appreciate, but do not require, attribution. An attribution usually 
 * includes the title, author, publisher, and ISBN. For example: 
 * 
 *   "Java Enterprise in a Nutshell, Third Edition, 
 *    by Jim Farley and William Crawford 
 *    with Prakash Malani, John G. Norman, and Justin Gehtland. 
 *    Copyright 2006 O'Reilly Media, Inc., 0-596-10142-2."
 *  
 *  If you feel your use of code examples falls outside fair use or the 
 *  permission given above, feel free to contact us at 
 *  permissions@oreilly.com.
 */

/**
 * PTPMessagingClient: A JMS point-to-point messaging client.  This client 
 * can either send or receive messages from a given Queue.  The main method 
 * allows for running the client from the command-line.
 */

import java.util.Enumeration;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class PTPMessagingClient implements Runnable {
  
  // Our connection to the JMS provider.  Only one is needed for this client.
  private QueueConnection mQueueConn = null;
  
  // The queue used for message-passing
  private Queue mQueue = null;
  
  // Our message receiver - only need one.
  private QueueReceiver mReceiver = null;
  
  // A single session for sending and receiving from all remote peers.
  private QueueSession mSession = null;
  
  // The message type we tag all our messages with
  private static String MSG_TYPE = "JavaEntMessage";
  
  // Constructor, with client name, and the JNDI locations of the JMS
  // connection factory and queue that we want to use.
  public PTPMessagingClient(String cFactoryJNDIName, String queueJNDIName) {
    init(cFactoryJNDIName, queueJNDIName);
  }
  
  // Do all the JMS-setup for this client.  Assumes that the JVM is
  // configured (perhaps using jndi.properties) so that the default JNDI
  // InitialContext points to the JMS provider's JNDI service.
  protected boolean init(String cFactoryJNDIName, String queueJNDIName) {
    boolean success = true;
    
    Context ctx = null;
    // Attempt to make connection to JNDI service
    try {
      ctx = new InitialContext();
    }
    catch (NamingException ne) {
      System.out.println("Failed to connect to JNDI provider:");
      ne.printStackTrace();
      success = false;
    }
    
    // If no JNDI context, bail out here
    if (ctx == null) {
      return success;
    }
    
    // Attempt to lookup JMS connection factory from JNDI service
    QueueConnectionFactory connFactory = null;
    try {
      connFactory = (QueueConnectionFactory)ctx.lookup(cFactoryJNDIName);
      System.out.println("Got JMS connection factory.");
    }
    catch (NamingException ne2) {
      System.out.println("Failed to get JMS connection factory: ");
      ne2.printStackTrace();
      success = false;
    }
    
    try {
      // Make a connection to the JMS provider and keep it.
      // At this point, the connection is not started, so we aren't
      // receiving any messages.
      mQueueConn = connFactory.createQueueConnection();
      // Try to find our designated queue
      mQueue = (Queue)ctx.lookup(queueJNDIName);
      // Make a session for queueing messages: no transactions,
      // auto-acknowledge
      mSession =
        mQueueConn.createQueueSession(false,
                                      javax.jms.Session.AUTO_ACKNOWLEDGE);
    }
    catch (JMSException e) {
      System.out.println("Failed to establish connection/queue:");
      e.printStackTrace();
      success = false;
    }
    catch (NamingException ne) {
      System.out.println("JNDI Error looking up factory or queue:");
      ne.printStackTrace();
      success = false;
    }
    
    try {
      // Make our receiver, for incoming messages.
      // Set the message selector to only receive our type of messages,
      // in case the same queue is being used for other purposes.
      mReceiver = mSession.createReceiver(mQueue,
                                          "JMSType = '" + MSG_TYPE + "'");
    }
    catch (JMSException je) {
      System.out.println("Error establishing message receiver:");
      je.printStackTrace();
    }
    
    return success;
  }
  
  // Send a message to the queue
  public void sendMessage(String msg) {
    try {
      // Create a JMS msg sender connected to the destination queue
      QueueSender sender = mSession.createSender(mQueue);
      // Use the session to create a text message
      TextMessage tMsg = mSession.createTextMessage();
      tMsg.setJMSType(MSG_TYPE);
      // Set the body of the message
      tMsg.setText(msg);
      // Send the message using the sender
      sender.send(tMsg);
      System.out.println("Sent the message");
    }
    catch (JMSException je) {
      System.out.println("Error sending message " + msg + " to queue");
      je.printStackTrace();
    }
  }
  
  // Register a MessageListener with the queue to receive
  // messages asynchronously
  public void registerListener(MessageListener listener) {
    try {
      // Set the listener on the receiver
      mReceiver.setMessageListener(listener);
      // Start the connection, in case it's still stopped
      mQueueConn.start();
    }
    catch (JMSException je) {
      System.out.println("Error registering listener: ");
      je.printStackTrace();
    }
  }
  
  // Perform an synchronous receive of a message from the queue.  If it's a
  // TextMessage, print the contents.
  public String receiveMessage() {
    String msg = "-- No message --";
    try {
      // (Re)start the connection, in case it's not already started
      mQueueConn.start();
      // Check for a message
      Message m = mReceiver.receive();
      if (m instanceof TextMessage) {
        msg = ((TextMessage)m).getText();
      }
      else {
        msg = "-- Unsupported message type received --";
      }
    }
    catch (JMSException je) {
    }
    return msg;
  }
  
  // Print the current contents of the message queue, using a QueueBrowser
  // so that we don't remove any messages from the queue
  public void printQueue() {
    try {
      QueueBrowser browser = mSession.createBrowser(mQueue);
      Enumeration msgEnum = browser.getEnumeration();
      System.out.println("Queue contents:");
      while (msgEnum.hasMoreElements()) {
        System.out.println("\t" + (Message)msgEnum.nextElement());
      }
    }
    catch (JMSException je) {
      System.out.println("Error browsing queue: " + je.getMessage());
    }
  }
  
  // When run within a thread, just wait for messages to be delivered to us
  public void run() {
    while (true) {
      try { this.wait(); } catch (Exception we) {}
    }
  }
  
  // Take command-line arguments and send or receive messages from the
  // named queue
  public static void main(String args[]) {
    if (args.length < 3) {
      System.out.println("Usage: PTPMessagingClient" +
                         " connFactoryName queueName" +
      " [send|listen|recv_synch] <messageToSend>");
      System.exit(1);
    }
    
    // Get the JNDI names of the connection factory and
    // queue, from the command-line
    String factoryName = args[0];
    String queueName = args[1];
    
    // Get the command to execute (send, recv, recv_synch)
    String cmd = args[2];
    
    // Create and initialize the messaging participant
    PTPMessagingClient msger =
      new PTPMessagingClient(factoryName, queueName);
    
    // Run the participant in its own thread, so that it can react to
    // incoming messages
    Thread listen = new Thread(msger);
    listen.start();
    
    // Send a message to the queue
    if (cmd.equals("send")) {
      String msg = args[3];
      msger.sendMessage(msg);
      System.exit(0);
    }
    // Register a listener
    else if (cmd.equals("listen")) {
      MessageListener listener = new TextLogger();
      msger.registerListener(listener);
      System.out.println("Client listening to queue " + queueName
                         + ". . .");
      System.out.flush();
      try { listen.wait(); } catch (Exception we) {}
    }
    // Synchronously receive a message from the queue
    else if (cmd.equals("recv_synch")) {
      String msg = msger.receiveMessage();
      System.out.println("Received message: " + msg);
      System.exit(0);
    }
    else if (cmd.equals("browse")) {
      msger.printQueue();
      System.exit(0);
    }
  }
}