AbstractDLQHandler.javaAPI DocJBoss 4.2.19826Fri Jul 13 21:01:16 BST 2007org.jboss.resource.adapter.jms.inflow.dlq

 * JBoss, Home of Professional Open Source.
 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
 * as indicated by the @author tags. See the copyright.txt file in the
 * distribution for a full listing of individual contributors.
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * Lesser General Public License for more details.
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site:
package org.jboss.resource.adapter.jms.inflow.dlq;

import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;

import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;

import org.jboss.jms.jndi.JMSProviderAdapter;
import org.jboss.logging.Logger;
import org.jboss.resource.adapter.jms.inflow.DLQHandler;
import org.jboss.resource.adapter.jms.inflow.JmsActivation;
import org.jboss.resource.adapter.jms.inflow.JmsActivationSpec;
import org.jboss.util.naming.Util;

 * An abstract DLQ handler.
 * @author <a href="">Adrian Brock</a>
 * @version $Revision: 57189 $
public abstract class AbstractDLQHandler implements DLQHandler, ExceptionListener
   /** The logger */
   protected static final Logger log = Logger.getLogger(AbstractDLQHandler.class);

   /** The activation */
   protected JmsActivation activation;
   /** The DLQ */
   protected Queue dlq;
   /** The DLQ Connection*/
   protected QueueConnection connection;

   public boolean handleRedeliveredMessage(Message msg)
      boolean handled = handleDelivery(msg);
      if (handled)
      return handled;

   public void messageDelivered(Message msg)
   public void setup(JmsActivation activation, Context ctx) throws Exception
      this.activation = activation;

   public void teardown()

   public void onException(JMSException exception)

    * Setup the DLQ Destination
    * @param ctx the naming context
    * @throws Exception for any error
   protected void setupDLQDestination(Context ctx) throws Exception
      String name = activation.getActivationSpec().getDLQJNDIName();
      dlq = (Queue) Util.lookup(ctx, name, Queue.class);

    * Teardown the DLQ Destination
   protected void teardownDLQDestination()

    * Setup the DLQ Connection
    * @param ctx the naming context
    * @throws Exception for any error
   protected void setupDLQConnection(Context ctx) throws Exception
      JmsActivationSpec spec = activation.getActivationSpec();
      String user = spec.getDLQUser();
      String pass = spec.getDLQPassword();
      String clientID = spec.getDLQClientID();
      JMSProviderAdapter adapter = activation.getProviderAdapter();
      String queueFactoryRef = adapter.getQueueFactoryRef();
      log.debug("Attempting to lookup dlq connection factory " + queueFactoryRef);
      QueueConnectionFactory qcf = (QueueConnectionFactory) Util.lookup(ctx, queueFactoryRef, QueueConnectionFactory.class);
      log.debug("Got dlq connection factory " + qcf + " from " + queueFactoryRef);
      log.debug("Attempting to create queue connection with user " + user);
      if (user != null)
         connection = qcf.createQueueConnection(user, pass);
         connection = qcf.createQueueConnection();
      if (clientID != null)
      log.debug("Using queue connection " + connection);

    * Teardown the DLQ Connection
   protected void teardownDLQConnection()
         if (connection != null)
            log.debug("Closing the " + connection);
      catch (Throwable t)
         log.debug("Error closing the connection " + connection, t);
    * Do we handle the message?
    * @param msg the message to handle
    * @return true when we handle it
   protected abstract boolean handleDelivery(Message msg);
    * Warn that a message is being handled by the DLQ
    * @param msg
    * @param count the number of redelivers
    * @param max the maximum number of redeliveries
   protected void warnDLQ(Message msg, int count, int max)
      log.warn("Message redelivered=" + count + " max=" + max + " sending it to the dlq " + msg);
    * Send the message to the dlq
    * @param msg message to send
   protected void sendToDLQ(Message msg)
      int deliveryMode = getDeliveryMode(msg);
      int priority = getPriority(msg);
      long timeToLive = getTimeToLive(msg);
      // If we get a negative time to live that means the message has expired
      if (timeToLive < 0)
         if (log.isTraceEnabled())
            log.trace("Not sending the message to the DLQ, it has expired " + msg);
      Message copy = makeWritable(msg);
      if (copy != null)
         doSend(copy, deliveryMode, priority, timeToLive);
    * Get the delivery mode for the DLQ message
    * @param msg the message
    * @return the delivery mode
   protected int getDeliveryMode(Message msg)
         return msg.getJMSDeliveryMode();
      catch (Throwable t)
         return Message.DEFAULT_DELIVERY_MODE;
    * Get the priority for the DLQ message
    * @param msg the message
    * @return the priority
   protected int getPriority(Message msg)
         return msg.getJMSPriority();
      catch (Throwable t)
         return Message.DEFAULT_PRIORITY;
    * Get the time to live for the DLQ message
    * @param msg the message
    * @return the time to live
   protected long getTimeToLive(Message msg)
         long expires = msg.getJMSExpiration();
         if (expires == Message.DEFAULT_TIME_TO_LIVE)
            return Message.DEFAULT_TIME_TO_LIVE;
         return expires - System.currentTimeMillis();
      catch (Throwable t)
         return Message.DEFAULT_TIME_TO_LIVE;
    * Make a writable copy of the message
    * @param msg the message
    * @return the copied message
   protected Message makeWritable(Message msg)
      boolean trace = log.isTraceEnabled();
         HashMap tmp = new HashMap();

         // Save properties
         for (Enumeration en = msg.getPropertyNames(); en.hasMoreElements();)
            String key = (String) en.nextElement();
            tmp.put(key, msg.getObjectProperty(key));
         // Make them writable

         for (Iterator i = tmp.keySet().iterator(); i.hasNext();)
            String key = (String);
               msg.setObjectProperty(key, tmp.get(key));
            catch (JMSException ignored)
               if (trace)
                  log.trace("Could not copy message property " + key, ignored);

         msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg.getJMSMessageID());
         Destination destination = msg.getJMSDestination();
         if (destination != null)
            msg.setStringProperty(JBOSS_ORIG_DESTINATION, destination.toString());

         return msg;
      catch (Throwable t)
         log.error("Unable to make writable " + msg, t);
         return null;
    * Do the message send
    * @param msg the message
   protected void doSend(Message msg, int deliveryMode, int priority, long timeToLive)
      QueueSession session = null;
         session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
         QueueSender sender = session.createSender(dlq);
         sender.send(msg, deliveryMode, priority, timeToLive);
      catch (Throwable t)
         handleSendError(msg, t);
         if (session != null)
            catch (Throwable t)
               log.trace("Ignored ", t);
    * Handle a failure to send the message to the dlq
    * @param msg the message
    * @param t the error
   protected void handleSendError(Message msg, Throwable t)
      log.error("DLQ " + dlq + " error sending message " + msg, t);