FileDocCategorySizeDatePackage
PubSubMessagingClient.javaAPI DocExample8820Thu Dec 15 21:33:06 GMT 2005com.oreilly.jent.jms

PubSubMessagingClient

public class PubSubMessagingClient extends Object implements Runnable
PubSubMessagingClient: A JMS publish-subscribe messaging client. This client can either send or receive messages from a given Topic. The main method allows for running the client from the command-line.

Fields Summary
private javax.jms.TopicConnection
mTopicConn
private javax.jms.Topic
mTopic
private javax.jms.TopicSubscriber
mSubscriber
private javax.jms.TopicSession
mSession
private static String
MSG_TYPE
Constructors Summary
public PubSubMessagingClient(String cFactJNDIName, String topicJNDIName)


  // Constructor, with client name, and the JNDI location of the JMS
  // connection factory and topic that we want to use.
       
    init(cFactJNDIName, topicJNDIName);
  
Methods Summary
protected booleaninit(java.lang.String cFactoryJNDIName, java.lang.String topicJNDIName)

    boolean success = true;

    Context ctx = null;
    
    // Attempt to make connection to JNDI service
    try {
      ctx = new InitialContext();
    }
    catch (NamingException ne) {
      System.out.println("Failed to connect to JNDI provider:");
      ne.printStackTrace();
      success = false;
    }

    // If no JNDI context, bail out here
    if (ctx == null) {
      return success;
    }

    // Attempt to lookup JMS connection factory from JNDI service
    TopicConnectionFactory connFactory = null;
    try {
      connFactory = (TopicConnectionFactory)ctx.lookup(cFactoryJNDIName);
      System.out.println("Got JMS connection factory.");
    }
    catch (NamingException ne2) {
      System.out.println("Failed to get JMS connection factory: ");
      ne2.printStackTrace();
      success = false;
    }

    try {
      // Make a connection to the JMS provider and keep it
      // At this point, the connection is not started, so we aren't
      // receiving any messages.
      mTopicConn = connFactory.createTopicConnection();
      // Try to find our designated topic
      mTopic = (Topic)ctx.lookup(topicJNDIName);
      // Make a session for topicing messages
      // no transactions, auto-acknowledge
      mSession =
        mTopicConn.createTopicSession(false,
                                      javax.jms.Session.AUTO_ACKNOWLEDGE);
    }
    catch (JMSException e) {
      System.out.println("Failed to establish connection/topic:");
      e.printStackTrace();
      success = false;
    }
    catch (NamingException ne) {
      System.out.println("JNDI Error looking up factory or topic:");
      ne.printStackTrace();
      success = false;
    }

    try {
      // Make our subscriber, for incoming messages
      // Set the message selector to only receive our type of messages,
      // in case the same topic is being used for other purposes
      // Also indicate that we don't want any message sent from this connection
      mSubscriber =
        mSession.createSubscriber(mTopic, "JMSType = '" + MSG_TYPE + "'",
                                  true);
    }
    catch (JMSException je) {
      System.out.println("Error establishing message subscriber:");
      je.printStackTrace();
    }

    return success;
  
public static voidmain(java.lang.String[] args)

    if (args.length < 3) {
      System.out.println("Usage: PubSubMessagingClient" +
                         " connFactoryName topicName" +
                         " [publish|subscribe|recv_synch] <messageToSend>");
      System.exit(1);
    }

    // Get our client name, and the JNDI name of the connection factory and
    // topic, from the command-line
    String factoryName = args[0];
    String topicName = args[1];

    // Get the command to execute (publish, subscribe, recv_synch)
    String cmd = args[2];
    
    // Create and initialize the messaging participant
    PubSubMessagingClient msger =
      new PubSubMessagingClient(factoryName, topicName);

    // Run the participant in its own thread, so that it can react to
    // incoming messages
    Thread listen = new Thread(msger);
    listen.start();

    // Send a message to the topic
    if (cmd.equals("publish")) {
      String msg = args[3];
      msger.publishMessage(msg);
      System.exit(0);
    }
    // Register a listener
    else if (cmd.equals("subscribe")) {
      MessageListener listener = new TextLogger();
      msger.registerListener(listener);
      System.out.println("Client listening to topic " + topicName
                         + ". . .");
      try { listen.wait(); } catch (Exception we) {}
    }
    // Synchronously receive a message from the topic
    else if (cmd.equals("recv_synch")) {
      String msg = msger.receiveMessage();
      System.out.println("Received message: " + msg);
      System.exit(0);
    }
  
public voidpublishMessage(java.lang.String msg)

    try {
      // Create a JMS msg publisher connected to the destination topic
      TopicPublisher publisher = mSession.createPublisher(mTopic);
      // Use the session to create a text message
      TextMessage tMsg = mSession.createTextMessage();
      tMsg.setJMSType(MSG_TYPE);
      // Set the body of the message
      tMsg.setText(msg);
      // Send the message using the publisher
      publisher.publish(tMsg);
      System.out.println("Published the message");
    }
    catch (JMSException je) {
      System.out.println("Error sending message " + msg + " to topic");
      je.printStackTrace();
    }
  
public java.lang.StringreceiveMessage()

    String msg = "-- No message --";
    try {
      // (Re)start the connection, in case it's not already started
      mTopicConn.start();
      // Check for a message
      Message m = mSubscriber.receive();
      if (m instanceof TextMessage) {
        msg = ((TextMessage)m).getText();
      }
      else {
        msg = "-- Unsupported message type received --";
      }
    }
    catch (JMSException je) {
    }
    return msg;
  
public voidregisterListener(javax.jms.MessageListener listener)

    try {
      // Set the listener on the subscriber
      mSubscriber.setMessageListener(listener);
      // Start the connection, in case it's still stopped
      mTopicConn.start();
    }
    catch (JMSException je) {
      System.out.println("Error registering listener: ");
      je.printStackTrace();
    }
  
public voidrun()

    while (true) {
      try { this.wait(); } catch (Exception we) {}
    }