FileDocCategorySizeDatePackage
PubSubMessagingClient.javaAPI DocExample8820Thu Dec 15 21:33:06 GMT 2005com.oreilly.jent.jms

PubSubMessagingClient.java

package com.oreilly.jent.jms;

/**
 * In general, you may use the code in this book in your programs and 
 * documentation. You do not need to contact us for permission unless 
 * you're reproducing a significant portion of the code. For example, 
 * writing a program that uses several chunks of code from this book does 
 * not require permission. Selling or distributing a CD-ROM of examples 
 * from O'Reilly books does require permission. Answering a question by 
 * citing this book and quoting example code does not require permission. 
 * Incorporating a significant amount of example code from this book into 
 * your product's documentation does require permission.
 * 
 * We appreciate, but do not require, attribution. An attribution usually 
 * includes the title, author, publisher, and ISBN. For example: 
 * 
 *   "Java Enterprise in a Nutshell, Third Edition, 
 *    by Jim Farley and William Crawford 
 *    with Prakash Malani, John G. Norman, and Justin Gehtland. 
 *    Copyright 2006 O'Reilly Media, Inc., 0-596-10142-2."
 *  
 *  If you feel your use of code examples falls outside fair use or the 
 *  permission given above, feel free to contact us at 
 *  permissions@oreilly.com.
 */

/**
 * PubSubMessagingClient: A JMS publish-subscribe messaging client.  
 * This client can either send or receive messages from a given Topic.  The 
 * main method allows for running the client from the command-line.
 */

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class PubSubMessagingClient implements Runnable {

  // Our connection to the JMS provider.  Only one is needed for this client.
  private TopicConnection mTopicConn = null;
  
  // The topic used for message-passing
  private Topic mTopic = null;

  // Our message subscriber - only need one.
  private TopicSubscriber mSubscriber = null;

  // A single session for sending and receiving from all remote peers.
  private TopicSession mSession = null;

  // The message type we tag all our messages with
  private static String MSG_TYPE = "JavaEntMessage";

  // Constructor, with client name, and the JNDI location of the JMS
  // connection factory and topic that we want to use.
  public PubSubMessagingClient(String cFactJNDIName, String topicJNDIName) {
    init(cFactJNDIName, topicJNDIName);
  }
  
  // Do all the JMS-setup for this client.  Assumes that the JVM is
  // configured (perhaps using jndi.properties) so that the default JNDI
  // InitialContext points to the JMS provider's JNDI service.
  protected boolean init(String cFactoryJNDIName, String topicJNDIName) {
    boolean success = true;

    Context ctx = null;
    
    // Attempt to make connection to JNDI service
    try {
      ctx = new InitialContext();
    }
    catch (NamingException ne) {
      System.out.println("Failed to connect to JNDI provider:");
      ne.printStackTrace();
      success = false;
    }

    // If no JNDI context, bail out here
    if (ctx == null) {
      return success;
    }

    // Attempt to lookup JMS connection factory from JNDI service
    TopicConnectionFactory connFactory = null;
    try {
      connFactory = (TopicConnectionFactory)ctx.lookup(cFactoryJNDIName);
      System.out.println("Got JMS connection factory.");
    }
    catch (NamingException ne2) {
      System.out.println("Failed to get JMS connection factory: ");
      ne2.printStackTrace();
      success = false;
    }

    try {
      // Make a connection to the JMS provider and keep it
      // At this point, the connection is not started, so we aren't
      // receiving any messages.
      mTopicConn = connFactory.createTopicConnection();
      // Try to find our designated topic
      mTopic = (Topic)ctx.lookup(topicJNDIName);
      // Make a session for topicing messages
      // no transactions, auto-acknowledge
      mSession =
        mTopicConn.createTopicSession(false,
                                      javax.jms.Session.AUTO_ACKNOWLEDGE);
    }
    catch (JMSException e) {
      System.out.println("Failed to establish connection/topic:");
      e.printStackTrace();
      success = false;
    }
    catch (NamingException ne) {
      System.out.println("JNDI Error looking up factory or topic:");
      ne.printStackTrace();
      success = false;
    }

    try {
      // Make our subscriber, for incoming messages
      // Set the message selector to only receive our type of messages,
      // in case the same topic is being used for other purposes
      // Also indicate that we don't want any message sent from this connection
      mSubscriber =
        mSession.createSubscriber(mTopic, "JMSType = '" + MSG_TYPE + "'",
                                  true);
    }
    catch (JMSException je) {
      System.out.println("Error establishing message subscriber:");
      je.printStackTrace();
    }

    return success;
  }
      
  // Send a message to the topic
  public void publishMessage(String msg) {
    try {
      // Create a JMS msg publisher connected to the destination topic
      TopicPublisher publisher = mSession.createPublisher(mTopic);
      // Use the session to create a text message
      TextMessage tMsg = mSession.createTextMessage();
      tMsg.setJMSType(MSG_TYPE);
      // Set the body of the message
      tMsg.setText(msg);
      // Send the message using the publisher
      publisher.publish(tMsg);
      System.out.println("Published the message");
    }
    catch (JMSException je) {
      System.out.println("Error sending message " + msg + " to topic");
      je.printStackTrace();
    }
  }

  // Register a MessageListener with the topic to receive
  // messages asynchronously
  public void registerListener(MessageListener listener) {
    try {
      // Set the listener on the subscriber
      mSubscriber.setMessageListener(listener);
      // Start the connection, in case it's still stopped
      mTopicConn.start();
    }
    catch (JMSException je) {
      System.out.println("Error registering listener: ");
      je.printStackTrace();
    }
  }
     
  // Perform an synchronous receive of a message from the topic.  If it's a
  // TextMessage, print the contents.
  public String receiveMessage() {
    String msg = "-- No message --";
    try {
      // (Re)start the connection, in case it's not already started
      mTopicConn.start();
      // Check for a message
      Message m = mSubscriber.receive();
      if (m instanceof TextMessage) {
        msg = ((TextMessage)m).getText();
      }
      else {
        msg = "-- Unsupported message type received --";
      }
    }
    catch (JMSException je) {
    }
    return msg;
  }
  
  // When run within a thread, just wait for messages to be delivered to us
  public void run() {
    while (true) {
      try { this.wait(); } catch (Exception we) {}
    }
  }

  // Take command-line arguments and send or receive messages from the
  // named topic
  public static void main(String args[]) {
    if (args.length < 3) {
      System.out.println("Usage: PubSubMessagingClient" +
                         " connFactoryName topicName" +
                         " [publish|subscribe|recv_synch] <messageToSend>");
      System.exit(1);
    }

    // Get our client name, and the JNDI name of the connection factory and
    // topic, from the command-line
    String factoryName = args[0];
    String topicName = args[1];

    // Get the command to execute (publish, subscribe, recv_synch)
    String cmd = args[2];
    
    // Create and initialize the messaging participant
    PubSubMessagingClient msger =
      new PubSubMessagingClient(factoryName, topicName);

    // Run the participant in its own thread, so that it can react to
    // incoming messages
    Thread listen = new Thread(msger);
    listen.start();

    // Send a message to the topic
    if (cmd.equals("publish")) {
      String msg = args[3];
      msger.publishMessage(msg);
      System.exit(0);
    }
    // Register a listener
    else if (cmd.equals("subscribe")) {
      MessageListener listener = new TextLogger();
      msger.registerListener(listener);
      System.out.println("Client listening to topic " + topicName
                         + ". . .");
      try { listen.wait(); } catch (Exception we) {}
    }
    // Synchronously receive a message from the topic
    else if (cmd.equals("recv_synch")) {
      String msg = msger.receiveMessage();
      System.out.println("Received message: " + msg);
      System.exit(0);
    }
  }
}