FileDocCategorySizeDatePackage
QueueConnector.javaAPI DocApache Axis 1.410253Sat Apr 22 18:57:26 BST 2006org.apache.axis.transport.jms

QueueConnector.java

/*
 * Copyright 2001, 2002,2004 The Apache Software Foundation.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.axis.transport.jms;

import org.apache.axis.components.jms.JMSVendorAdapter;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TemporaryQueue;

/**
 * QueueConnector is a concrete JMSConnector subclass that specifically handles
 *   connections to queues (ptp domain).
 *
 * @author Jaime Meritt  (jmeritt@sonicsoftware.com)
 * @author Richard Chung (rchung@sonicsoftware.com)
 * @author Dave Chappell (chappell@sonicsoftware.com)
 */
public class QueueConnector extends JMSConnector
{

    public QueueConnector(ConnectionFactory factory,
                          int numRetries,
                          int numSessions,
                          long connectRetryInterval,
                          long interactRetryInterval,
                          long timeoutTime,
                          boolean allowReceive,
                          String clientID,
                          String username,
                          String password,
                          JMSVendorAdapter adapter,
                          JMSURLHelper jmsurl)
        throws JMSException
    {
        super(factory, numRetries, numSessions, connectRetryInterval,
              interactRetryInterval, timeoutTime, allowReceive, clientID,
              username, password, adapter, jmsurl);
    }

    public JMSEndpoint createEndpoint(String destination)
    {
        return new QueueEndpoint(destination);
    }

    /**
     * Create an endpoint for a queue destination.
     *
     * @param destination
     * @return
     * @throws JMSException
     */
    public JMSEndpoint createEndpoint(Destination destination)
        throws JMSException
    {
        if(!(destination instanceof Queue))
            throw new IllegalArgumentException("The input must be a queue for this connector");
        return new QueueDestinationEndpoint((Queue)destination);
    }

    protected Connection internalConnect(ConnectionFactory connectionFactory,
                                         String username,
                                         String password)
        throws JMSException
    {
        QueueConnectionFactory qcf = (QueueConnectionFactory)connectionFactory;
        if(username == null)
            return qcf.createQueueConnection();

        return qcf.createQueueConnection(username, password);
    }


    protected SyncConnection createSyncConnection(ConnectionFactory factory,
                                                  Connection connection,
                                                  int numSessions,
                                                  String threadName,
                                                  String clientID,
                                                  String username,
                                                  String password)

        throws JMSException
    {
        return new QueueSyncConnection((QueueConnectionFactory)factory,
                                       (QueueConnection)connection, numSessions,
                                       threadName, clientID, username, password);
    }

    private QueueSession createQueueSession(QueueConnection connection, int ackMode)
        throws JMSException
    {
        return connection.createQueueSession(false, ackMode);
    }

    private Queue createQueue(QueueSession session, String subject)
        throws Exception
    {
        return m_adapter.getQueue(session, subject);
    }

    private QueueReceiver createReceiver(QueueSession session,
                                         Queue queue,
                                         String messageSelector)
        throws JMSException
    {
        return session.createReceiver(queue, messageSelector);
    }

    private final class QueueSyncConnection extends SyncConnection
    {
        QueueSyncConnection(QueueConnectionFactory connectionFactory,
                            QueueConnection connection,
                            int numSessions,
                            String threadName,
                            String clientID,
                            String username,
                            String password)
            throws JMSException
        {
            super(connectionFactory, connection, numSessions, threadName,
                  clientID, username, password);
        }

        protected SendSession createSendSession(javax.jms.Connection connection)
            throws JMSException
        {
            QueueSession session = createQueueSession((QueueConnection)connection,
                                        JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
            QueueSender sender = session.createSender(null);
            return new QueueSendSession(session, sender);
        }

        private final class QueueSendSession extends SendSession
        {
            QueueSendSession(QueueSession session,
                             QueueSender  sender)
                throws JMSException
            {
                super(session, sender);
            }

            protected MessageConsumer createConsumer(Destination destination)
                throws JMSException
            {
                return createReceiver((QueueSession)m_session, (Queue)destination, null);
            }


            protected Destination createTemporaryDestination()
                throws JMSException
            {
                return ((QueueSession)m_session).createTemporaryQueue();
            }

            protected void deleteTemporaryDestination(Destination destination)
                throws JMSException
            {
                ((TemporaryQueue)destination).delete();
            }

            protected void send(Destination destination, Message message,
                                int deliveryMode, int priority, long timeToLive)
                throws JMSException
            {
                ((QueueSender)m_producer).send((Queue)destination, message,
                                                deliveryMode, priority, timeToLive);
            }

        }
    }

    private class QueueEndpoint
        extends JMSEndpoint
    {
        String m_queueName;

        QueueEndpoint(String queueName)
        {
            super(QueueConnector.this);
            m_queueName = queueName;
        }

        Destination getDestination(Session session)
            throws Exception
        {
            return createQueue((QueueSession)session, m_queueName);
        }

        public String toString()
        {
            StringBuffer buffer = new StringBuffer("QueueEndpoint:");
            buffer.append(m_queueName);
            return buffer.toString();
        }

        public boolean equals(Object object)
        {
            if(!super.equals(object))
                return false;

            if(!(object instanceof QueueEndpoint))
                return false;

            return m_queueName.equals(((QueueEndpoint)object).m_queueName);
        }
    }


    private final class QueueDestinationEndpoint
        extends QueueEndpoint
    {
        Queue m_queue;

        QueueDestinationEndpoint(Queue queue)
            throws JMSException
        {
            super(queue.getQueueName());
            m_queue = queue;
        }

        Destination getDestination(Session session)
        {
            return m_queue;
        }

    }

    protected AsyncConnection createAsyncConnection(ConnectionFactory factory,
                                                    Connection connection,
                                                    String threadName,
                                                    String clientID,
                                                    String username,
                                                    String password)
        throws JMSException
    {
        return new QueueAsyncConnection((QueueConnectionFactory)factory,
                                        (QueueConnection)connection, threadName,
                                        clientID, username, password);
    }

    private final class QueueAsyncConnection extends AsyncConnection
    {

        QueueAsyncConnection(QueueConnectionFactory connectionFactory,
                             QueueConnection connection,
                             String threadName,
                             String clientID,
                             String username,
                             String password)
            throws JMSException
        {
            super(connectionFactory, connection, threadName, clientID, username, password);
        }

        protected ListenerSession createListenerSession(javax.jms.Connection connection,
                                                        Subscription subscription)
            throws Exception
        {
            QueueSession session = createQueueSession((QueueConnection)connection,
                                                      subscription.m_ackMode);
            QueueReceiver receiver = createReceiver(session,
                        (Queue)subscription.m_endpoint.getDestination(session),
                        subscription.m_messageSelector);
            return new ListenerSession(session, receiver, subscription);
        }

    }

}