FileDocCategorySizeDatePackage
Wholesaler.javaAPI DocExample7974Wed Dec 06 19:09:40 GMT 2000chap6.b2b

Wholesaler

public class Wholesaler extends Object implements javax.jms.MessageListener, javax.jms.ExceptionListener

Fields Summary
private javax.jms.TopicConnection
connect
private javax.jms.TopicSession
session
private javax.jms.TopicPublisher
publisher
private javax.jms.TopicSubscriber
subscriber
private javax.jms.Topic
hotDealsTopic
private javax.jms.Topic
buyOrdersTopic
private String
mBroker
private String
mUsername
private String
mPassword
private static final int
CONNECTION_RETRY_PERIOD
private static boolean
inSequence
Constructors Summary
public Wholesaler(String broker, String username, String password)


         
        mBroker = broker;
        mUsername = username;
        mPassword = password;

        establishConnection(broker, username, password);
   
Methods Summary
private voidestablishConnection(java.lang.String broker, java.lang.String username, java.lang.String password)

      try{
        TopicConnectionFactory factory = null;
        InitialContext jndi = null;

        Properties env = new Properties();
        // ... specify the JNDI properties specific to the JNDI SPI being used
        env.put("BROKER", broker);
        jndi = new InitialContext(env);
        factory =
            (TopicConnectionFactory)jndi.lookup("TopicConnectionFactory");

        while (connect == null)
        {
            try{
                connect = factory.createTopicConnection (username, password);
            } catch (javax.jms.JMSException jmse)
            {
                System.out.print("Cannot connect to message server: " + broker + "...");
                System.out.println("Pausing " +
                    CONNECTION_RETRY_PERIOD / 1000 + " seconds before retry.");
                try
                {
                    Thread.sleep(CONNECTION_RETRY_PERIOD);
                } catch (java.lang.InterruptedException ie) {ie.printStackTrace();}
                continue;
            }
        }
        System.out.println("\nConnection established");
        session =
        connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);

        hotDealsTopic = (Topic)jndi.lookup("Hot Deals");
        buyOrdersTopic = (Topic)jndi.lookup("Buy Order");

        publisher = session.createPublisher(hotDealsTopic);

        //buyOrdersTopic = session.createTemporaryTopic();

        subscriber = session.createSubscriber(buyOrdersTopic);
        subscriber.setMessageListener(this);

        connect.setExceptionListener( (javax.jms.ExceptionListener) this);
        connect.start();

      }catch (javax.jms.JMSException jmse){
         jmse.printStackTrace(); System.exit(1);
      }catch(javax.naming.NamingException jne){
         jne.printStackTrace(); System.exit(1);
      }
   
public voidexit()

      try{
        connect.close();
      }catch (javax.jms.JMSException jmse){
        jmse.printStackTrace();
      }
      System.exit(0);
   
public static voidmain(java.lang.String[] argv)

      String broker, username, password;
      if(argv.length == 3){
         broker = argv[0];
         username = argv[1];
         password = argv[2];
      }else{
         System.out.println("Invalid arguments. Should be: ");
         System.out.println("java Wholesaler broker username password");
         return;
      }

      Wholesaler wholesaler = new Wholesaler(broker, username, password);

      try{
         // Read all standard input and send it as a message.
         java.io.BufferedReader stdin = new java.io.BufferedReader
            (new java.io.InputStreamReader( System.in ) );
         System.out.println ("Enter: Item, Old Price, New Price ");
         System.out.println("\ne.g. Bowling Shoes, 100.00, 55.00");

         while ( true ){
            String dealDesc = stdin.readLine();
            if( dealDesc != null && dealDesc.length() > 0 ){
                if( dealDesc.substring(0,3).equalsIgnoreCase("END") ){
                    wholesaler.sendSequenceMarker( "END_SEQUENCE" );
                }else{
                    // parse the deal description
                    StringTokenizer tokenizer =
                    new StringTokenizer(dealDesc,",") ;
                    String itemdesc = tokenizer.nextToken();
                    String temp = tokenizer.nextToken();
                    float oldprice =
                        Float.valueOf(temp.trim()).floatValue();
                    temp = tokenizer.nextToken();
                    float newprice =
                        Float.valueOf(temp.trim()).floatValue();

                    wholesaler.publishPriceQuotes(dealDesc,username,
                                                    itemdesc, oldprice,newprice);
                }
            }else{
                wholesaler.exit();
            }
         }
      }catch( java.io.IOException ioe ){
         ioe.printStackTrace();
      }
   
public voidonException(javax.jms.JMSException jsme)

        // See if connection was dropped.

        // Tell the user that there is a problem.
        System.err.println ("\n\nThere is a problem with the connection.");
        System.err.println ("   JMSException: " + jsme.getMessage());

        System.err.println ("Please wait while the application tries to "+
                                "re-establish the connection...");
        // Reestablish the connection
        connect = null;
        establishConnection(mBroker, mUsername, mPassword);
   
public voidonMessage(javax.jms.Message message)

      try{
         TextMessage textMessage = (TextMessage) message;
         String text = textMessage.getText();
         System.out.println("Order received - "+text+
                            " from " + message.getJMSCorrelationID());
      }catch (java.lang.Exception rte){
         rte.printStackTrace();
      }
   
private voidpublishPriceQuotes(java.lang.String dealDesc, java.lang.String username, java.lang.String itemdesc, float oldprice, float newprice)

      try{
        javax.jms.StreamMessage message = session.createStreamMessage();
        message.writeString(dealDesc);
        message.writeString(itemdesc);
        message.writeFloat(oldprice);
        message.writeFloat(newprice);

        message.setStringProperty("Username", username);
        message.setStringProperty("Itemdesc", itemdesc);

        message.setJMSReplyTo(buyOrdersTopic);

        publisher.publish(
            message,
            javax.jms.DeliveryMode.PERSISTENT,
            javax.jms.Message.DEFAULT_PRIORITY,
            1800000);

      }catch ( javax.jms.JMSException jmse ){
         jmse.printStackTrace();
      }
   
private voidsendSequenceMarker(java.lang.String sequenceMarker)

      try{
        javax.jms.StreamMessage message = session.createStreamMessage();
        message.setStringProperty("SEQUENCE_MARKER",sequenceMarker);

        publisher.publish(
            message,
            javax.jms.DeliveryMode.PERSISTENT,
            javax.jms.Message.DEFAULT_PRIORITY,
            1800000);

      }catch ( javax.jms.JMSException jmse ){
         jmse.printStackTrace();
      }