FileDocCategorySizeDatePackage
JmsActivation.javaAPI DocJBoss 4.2.115964Fri Jul 13 21:01:16 BST 2007org.jboss.resource.adapter.jms.inflow

JmsActivation

public class JmsActivation extends Object implements javax.jms.ExceptionListener
A generic jms Activation.
author
Adrian Brock
version
$Revision: 60397 $

Fields Summary
private static final Logger
log
The log
public static final Method
ONMESSAGE
The onMessage method
protected org.jboss.resource.adapter.jms.JmsResourceAdapter
ra
The resource adapter
protected JmsActivationSpec
spec
The activation spec
protected javax.resource.spi.endpoint.MessageEndpointFactory
endpointFactory
The message endpoint factory
protected EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean
deliveryActive
Whether delivery is active
protected org.jboss.jms.jndi.JMSProviderAdapter
adapter
The jms provider adapter
protected javax.jms.Destination
destination
The destination
protected javax.jms.Connection
connection
The connection
protected JmsServerSessionPool
pool
The server session pool
protected boolean
isDeliveryTransacted
Is the delivery transacted
protected DLQHandler
dlqHandler
The DLQ handler
protected TransactionManager
tm
The TransactionManager
Constructors Summary
public JmsActivation(org.jboss.resource.adapter.jms.JmsResourceAdapter ra, javax.resource.spi.endpoint.MessageEndpointFactory endpointFactory, JmsActivationSpec spec)

   
   
   
   
      try
      {
         ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[] { Message.class });
      }
      catch (Exception e)
      {
         throw new RuntimeException(e);
      }
   
      this.ra = ra;
      this.endpointFactory = endpointFactory;
      this.spec = spec;
      try
      {
         this.isDeliveryTransacted = endpointFactory.isDeliveryTransacted(ONMESSAGE);
      }
      catch (Exception e)
      {
         throw new ResourceException(e);
      }
   
Methods Summary
public JmsActivationSpecgetActivationSpec()

return
the activation spec

      return spec;
   
public javax.jms.ConnectiongetConnection()

return
the connection

      return connection;
   
public DLQHandlergetDLQHandler()

return
the dlq handler

      return dlqHandler; 
   
public javax.jms.DestinationgetDestination()

return
the destination

      return destination;
   
public javax.resource.spi.endpoint.MessageEndpointFactorygetMessageEndpointFactory()

return
the message endpoint factory

      return endpointFactory;
   
public org.jboss.jms.jndi.JMSProviderAdaptergetProviderAdapter()

return
the provider adapter

      return adapter; 
   
public javax.transaction.TransactionManagergetTransactionManager()

      if (tm == null)
      {
         tm = TransactionManagerLocator.getInstance().locate();

      }

      return tm;
   
public javax.resource.spi.work.WorkManagergetWorkManager()

return
the work manager

      return ra.getWorkManager();
   
public voidhandleFailure(java.lang.Throwable failure)
Handles any failure by trying to reconnect

      log.warn("Failure in jms activation " + spec, failure);
      
      while (deliveryActive.get())
      {
         teardown();
         try
         {
            Thread.sleep(spec.getReconnectIntervalLong());
         }
         catch (InterruptedException e)
         {
            log.debug("Interrupted trying to reconnect " + spec, e);
            break;
         }

         log.info("Attempting to reconnect " + spec);
         try
         {
            setup();
            log.info("Reconnected with messaging provider.");            
            break;
         }
         catch (Throwable t)
         {
            log.error("Unable to reconnect " + spec, t);
         }
         

      }
   
public booleanisDeliveryTransacted()

return
whether delivery is transacted

      return isDeliveryTransacted;
   
public voidonException(javax.jms.JMSException exception)

      handleFailure(exception);
   
protected voidsetup()
Setup the activation

throws
Exception for any error

      log.debug("Setting up " + spec);

      setupJMSProviderAdapter();
      Context ctx = adapter.getInitialContext();
      log.debug("Using context " + ctx.getEnvironment() + " for " + spec);
      try
      {
         setupDLQ(ctx);
         setupDestination(ctx);
         setupConnection(ctx);
      }
      finally
      {
         ctx.close();
      }
      setupSessionPool();
      
      log.debug("Setup complete " + this);
   
protected voidsetupConnection(javax.naming.Context ctx)
Setup the Connection

param
ctx the naming context
throws
Exception for any error

      log.debug("setup connection " + this);

      String user = spec.getUser();
      String pass = spec.getPassword();
      String clientID = spec.getClientId();
      if (spec.isTopic())
         connection = setupTopicConnection(ctx, user, pass, clientID);
      else
         connection = setupQueueConnection(ctx, user, pass, clientID);
      
      log.debug("established connection " + this);
   
protected voidsetupDLQ(javax.naming.Context ctx)
Setup the DLQ

param
ctx the naming context
throws
Exception for any error

      if (spec.isUseDLQ())
      {
         Class clazz = Thread.currentThread().getContextClassLoader().loadClass(spec.getDLQHandler());
         dlqHandler = (DLQHandler) clazz.newInstance();
         dlqHandler.setup(this, ctx);
      }
      
      log.debug("Setup DLQ " + this);
   
protected voidsetupDestination(javax.naming.Context ctx)
Setup the Destination

param
ctx the naming context
throws
Exception for any error

      Class destinationType;
      if (spec.isTopic())
         destinationType = Topic.class;
      else
         destinationType = Queue.class;

      String destinationName = spec.getDestination();
      log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName());
      destination = (Destination) Util.lookup(ctx, destinationName, destinationType);
      log.debug("Got destination " + destination + " from " + destinationName);
   
protected voidsetupJMSProviderAdapter()
Get the jms provider

      String providerAdapterJNDI = spec.getProviderAdapterJNDI();
      if (providerAdapterJNDI.startsWith("java:") == false)
         providerAdapterJNDI = "java:" + providerAdapterJNDI;

      log.debug("Retrieving the jms provider adapter " + providerAdapterJNDI + " for " + this);
      adapter = (JMSProviderAdapter) Util.lookup(providerAdapterJNDI, JMSProviderAdapter.class);
      log.debug("Using jms provider adapter " + adapter + " for " + this);
   
protected javax.jms.QueueConnectionsetupQueueConnection(javax.naming.Context ctx, java.lang.String user, java.lang.String pass, java.lang.String clientID)
Setup a Queue Connection

param
ctx the naming context
param
user the user
param
pass the password
param
clientID the client id
throws
Exception for any error

      String queueFactoryRef = adapter.getQueueFactoryRef();
      log.debug("Attempting to lookup queue connection factory " + queueFactoryRef);
      QueueConnectionFactory qcf = (QueueConnectionFactory) Util.lookup(ctx, queueFactoryRef, QueueConnectionFactory.class);
      log.debug("Got queue connection factory " + qcf + " from " + queueFactoryRef);
      log.debug("Attempting to create queue connection with user " + user);
      QueueConnection result;
      if (qcf instanceof XAQueueConnectionFactory && isDeliveryTransacted)
      {
         XAQueueConnectionFactory xaqcf = (XAQueueConnectionFactory) qcf;
         if (user != null)
            result = xaqcf.createXAQueueConnection(user, pass);
         else
            result = xaqcf.createXAQueueConnection();
      }
      else
      {
         if (user != null)
            result = qcf.createQueueConnection(user, pass);
         else
            result = qcf.createQueueConnection();
      }
      if (clientID != null)
         result.setClientID(clientID);
      result.setExceptionListener(this);
      log.debug("Using queue connection " + result);
      return result;
   
protected voidsetupSessionPool()
Setup the server session pool

throws
Exception for any error

      pool = new JmsServerSessionPool(this);
      log.debug("Created session pool " + pool);
      
      log.debug("Starting session pool " + pool);
      pool.start();
      log.debug("Started session pool " + pool);
      
      log.debug("Starting delivery " + connection);
      connection.start();
      log.debug("Started delivery " + connection);
   
protected javax.jms.TopicConnectionsetupTopicConnection(javax.naming.Context ctx, java.lang.String user, java.lang.String pass, java.lang.String clientID)
Setup a Topic Connection

param
ctx the naming context
param
user the user
param
pass the password
param
clientID the client id
throws
Exception for any error

      String topicFactoryRef = adapter.getTopicFactoryRef();
      log.debug("Attempting to lookup topic connection factory " + topicFactoryRef);
      TopicConnectionFactory tcf = (TopicConnectionFactory) Util.lookup(ctx, topicFactoryRef, TopicConnectionFactory.class);
      log.debug("Got topic connection factory " + tcf + " from " + topicFactoryRef);
      log.debug("Attempting to create topic connection with user " + user);
      TopicConnection result;
      if (tcf instanceof XATopicConnectionFactory && isDeliveryTransacted)
      {
         XATopicConnectionFactory xatcf = (XATopicConnectionFactory) tcf;
         if (user != null)
            result = xatcf.createXATopicConnection(user, pass);
         else
            result = xatcf.createXATopicConnection();
      }
      else
      {
         if (user != null)
            result = tcf.createTopicConnection(user, pass);
         else
            result = tcf.createTopicConnection();
      }
      if (clientID != null)
         result.setClientID(clientID);
      result.setExceptionListener(this);
      log.debug("Using topic connection " + result);
      return result;
   
public voidstart()
Start the activation

throws
ResourceException for any error

      deliveryActive = new SynchronizedBoolean(true);
      ra.getWorkManager().scheduleWork(new SetupActivation());
   
public voidstop()
Stop the activation

      deliveryActive.set(false);
      teardown();
   
protected voidteardown()
Teardown the activation

      log.debug("Tearing down " + spec);

      teardownSessionPool();
      teardownConnection();
      teardownDestination();
      teardownDLQ();

      log.debug("Tearing down complete " + this);
   
protected voidteardownConnection()
Teardown the connection

      try
      {
         if (connection != null)
         {
            log.debug("Closing the " + connection);
            connection.close();
         }
      }
      catch (Throwable t)
      {
         log.debug("Error closing the connection " + connection, t);
      }
      connection = null;
   
protected voidteardownDLQ()
Teardown the DLQ

      log.debug("Removing DLQ " + this);
      try
      {
         if (dlqHandler != null)
            dlqHandler.teardown();
      }
      catch (Throwable t)
      {
         log.debug("Error tearing down the DLQ " + dlqHandler, t);
      }
      dlqHandler = null;
   
protected voidteardownDestination()
Teardown the destination

   
protected voidteardownSessionPool()
Teardown the server session pool

      try
      {
         if (connection != null)
         {
            log.debug("Stopping delivery " + connection);
            connection.stop();
         }
      }
      catch (Throwable t)
      {
         log.debug("Error stopping delivery " + connection, t);
      }

      try
      {
         if (pool != null)
         {
            log.debug("Stopping the session pool " + pool);
            pool.stop();
         }
      }
      catch (Throwable t)
      {
         log.debug("Error clearing the pool " + pool, t);
      }
   
public java.lang.StringtoString()

      StringBuffer buffer = new StringBuffer();
      buffer.append(Strings.defaultToString(this)).append('(");
      buffer.append("spec=").append(Strings.defaultToString(spec));
      buffer.append(" mepf=").append(Strings.defaultToString(endpointFactory));
      buffer.append(" active=").append(deliveryActive.get());
      if (destination != null)
         buffer.append(" destination=").append(destination);
      if (connection != null)
         buffer.append(" connection=").append(connection);
      if (pool != null)
         buffer.append(" pool=").append(Strings.defaultToString(pool));
      if (dlqHandler != null)
         buffer.append(" dlq=").append(Strings.defaultToString(dlqHandler));
      buffer.append(" transacted=").append(isDeliveryTransacted);
      buffer.append(')");
      return buffer.toString();