FileDocCategorySizeDatePackage
AbstractDLQHandler.javaAPI DocJBoss 4.2.19826Fri Jul 13 21:01:16 BST 2007org.jboss.resource.adapter.jms.inflow.dlq

AbstractDLQHandler

public abstract class AbstractDLQHandler extends Object implements org.jboss.resource.adapter.jms.inflow.DLQHandler, javax.jms.ExceptionListener
An abstract DLQ handler.
author
Adrian Brock
version
$Revision: 57189 $

Fields Summary
protected static final Logger
log
The logger
protected org.jboss.resource.adapter.jms.inflow.JmsActivation
activation
The activation
protected javax.jms.Queue
dlq
The DLQ
protected javax.jms.QueueConnection
connection
The DLQ Connection
Constructors Summary
Methods Summary
protected voiddoSend(javax.jms.Message msg, int deliveryMode, int priority, long timeToLive)
Do the message send

param
msg the message

      QueueSession session = null;
      try
      {
         session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
         QueueSender sender = session.createSender(dlq);
         sender.send(msg, deliveryMode, priority, timeToLive);
      }
      catch (Throwable t)
      {
         handleSendError(msg, t);
      }
      finally
      {
         if (session != null)
         {
            try
            {
               session.close();
            }
            catch (Throwable t)
            {
               log.trace("Ignored ", t);
            }
         }
      }
   
protected intgetDeliveryMode(javax.jms.Message msg)
Get the delivery mode for the DLQ message

param
msg the message
return
the delivery mode

      try
      {
         return msg.getJMSDeliveryMode();
      }
      catch (Throwable t)
      {
         return Message.DEFAULT_DELIVERY_MODE;
      }
   
protected intgetPriority(javax.jms.Message msg)
Get the priority for the DLQ message

param
msg the message
return
the priority

      try
      {
         return msg.getJMSPriority();
      }
      catch (Throwable t)
      {
         return Message.DEFAULT_PRIORITY;
      }
   
protected longgetTimeToLive(javax.jms.Message msg)
Get the time to live for the DLQ message

param
msg the message
return
the time to live

      try
      {
         long expires = msg.getJMSExpiration();
         if (expires == Message.DEFAULT_TIME_TO_LIVE)
            return Message.DEFAULT_TIME_TO_LIVE;
         return expires - System.currentTimeMillis();
      }
      catch (Throwable t)
      {
         return Message.DEFAULT_TIME_TO_LIVE;
      }
   
protected abstract booleanhandleDelivery(javax.jms.Message msg)
Do we handle the message?

param
msg the message to handle
return
true when we handle it

public booleanhandleRedeliveredMessage(javax.jms.Message msg)


      
   
      boolean handled = handleDelivery(msg);
      if (handled)
         sendToDLQ(msg);
      return handled;
   
protected voidhandleSendError(javax.jms.Message msg, java.lang.Throwable t)
Handle a failure to send the message to the dlq

param
msg the message
param
t the error

      log.error("DLQ " + dlq + " error sending message " + msg, t);
   
protected javax.jms.MessagemakeWritable(javax.jms.Message msg)
Make a writable copy of the message

param
msg the message
return
the copied message

      boolean trace = log.isTraceEnabled();
      
      try
      {
         HashMap tmp = new HashMap();

         // Save properties
         for (Enumeration en = msg.getPropertyNames(); en.hasMoreElements();)
         {
            String key = (String) en.nextElement();
            tmp.put(key, msg.getObjectProperty(key));
         }
         
         // Make them writable
         msg.clearProperties();

         for (Iterator i = tmp.keySet().iterator(); i.hasNext();)
         {
            String key = (String) i.next();
            try
            {
               msg.setObjectProperty(key, tmp.get(key));
            }
            catch (JMSException ignored)
            {
               if (trace)
                  log.trace("Could not copy message property " + key, ignored);
            }
         }

         msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg.getJMSMessageID());
         Destination destination = msg.getJMSDestination();
         if (destination != null)
            msg.setStringProperty(JBOSS_ORIG_DESTINATION, destination.toString());

         return msg;
      }
      catch (Throwable t)
      {
         log.error("Unable to make writable " + msg, t);
         return null;
      }
   
public voidmessageDelivered(javax.jms.Message msg)

   
public voidonException(javax.jms.JMSException exception)

      activation.handleFailure(exception);
   
protected voidsendToDLQ(javax.jms.Message msg)
Send the message to the dlq

param
msg message to send

      int deliveryMode = getDeliveryMode(msg);
      int priority = getPriority(msg);
      long timeToLive = getTimeToLive(msg);
      
      // If we get a negative time to live that means the message has expired
      if (timeToLive < 0)
      {
         if (log.isTraceEnabled())
            log.trace("Not sending the message to the DLQ, it has expired " + msg);
         return;
      }
      
      Message copy = makeWritable(msg);
      if (copy != null)
         doSend(copy, deliveryMode, priority, timeToLive);
   
public voidsetup(org.jboss.resource.adapter.jms.inflow.JmsActivation activation, javax.naming.Context ctx)

      this.activation = activation;
      setupDLQDestination(ctx);
      setupDLQConnection(ctx);
   
protected voidsetupDLQConnection(javax.naming.Context ctx)
Setup the DLQ Connection

param
ctx the naming context
throws
Exception for any error

      JmsActivationSpec spec = activation.getActivationSpec();
      String user = spec.getDLQUser();
      String pass = spec.getDLQPassword();
      String clientID = spec.getDLQClientID();
      JMSProviderAdapter adapter = activation.getProviderAdapter();
      String queueFactoryRef = adapter.getQueueFactoryRef();
      log.debug("Attempting to lookup dlq connection factory " + queueFactoryRef);
      QueueConnectionFactory qcf = (QueueConnectionFactory) Util.lookup(ctx, queueFactoryRef, QueueConnectionFactory.class);
      log.debug("Got dlq connection factory " + qcf + " from " + queueFactoryRef);
      log.debug("Attempting to create queue connection with user " + user);
      if (user != null)
         connection = qcf.createQueueConnection(user, pass);
      else
         connection = qcf.createQueueConnection();
      if (clientID != null)
         connection.setClientID(clientID);
      connection.setExceptionListener(this);
      log.debug("Using queue connection " + connection);
   
protected voidsetupDLQDestination(javax.naming.Context ctx)
Setup the DLQ Destination

param
ctx the naming context
throws
Exception for any error

      String name = activation.getActivationSpec().getDLQJNDIName();
      dlq = (Queue) Util.lookup(ctx, name, Queue.class);
   
public voidteardown()

      teardownDLQConnection();
      teardownDLQDestination();
   
protected voidteardownDLQConnection()
Teardown the DLQ Connection

      try
      {
         if (connection != null)
         {
            log.debug("Closing the " + connection);
            connection.close();
         }
      }
      catch (Throwable t)
      {
         log.debug("Error closing the connection " + connection, t);
      }
   
protected voidteardownDLQDestination()
Teardown the DLQ Destination

   
protected voidwarnDLQ(javax.jms.Message msg, int count, int max)
Warn that a message is being handled by the DLQ

param
msg
param
count the number of redelivers
param
max the maximum number of redeliveries

      log.warn("Message redelivered=" + count + " max=" + max + " sending it to the dlq " + msg);