FileDocCategorySizeDatePackage
TopicConnector.javaAPI DocApache Axis 1.415241Sat Apr 22 18:57:28 BST 2006org.apache.axis.transport.jms

TopicConnector.java

/*
 * Copyright 2001, 2002,2004 The Apache Software Foundation.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.axis.transport.jms;

import org.apache.axis.components.jms.JMSVendorAdapter;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import java.util.HashMap;

/**
 * TopicConnector is a concrete JMSConnector subclass that specifically handles
 *   connections to topics (pub-sub domain).
 *
 * @author Jaime Meritt  (jmeritt@sonicsoftware.com)
 * @author Richard Chung (rchung@sonicsoftware.com)
 * @author Dave Chappell (chappell@sonicsoftware.com)
 */
public class TopicConnector extends JMSConnector
{
    public TopicConnector(TopicConnectionFactory factory,
                          int numRetries,
                          int numSessions,
                          long connectRetryInterval,
                          long interactRetryInterval,
                          long timeoutTime,
                          boolean allowReceive,
                          String clientID,
                          String username,
                          String password,
                          JMSVendorAdapter adapter,
                          JMSURLHelper jmsurl)
        throws JMSException
    {
        super(factory, numRetries, numSessions, connectRetryInterval,
              interactRetryInterval, timeoutTime, allowReceive,
              clientID, username, password, adapter, jmsurl);
    }

    protected Connection internalConnect(ConnectionFactory connectionFactory,
                                         String username, String password)
        throws JMSException
    {
        TopicConnectionFactory tcf = (TopicConnectionFactory)connectionFactory;
        if(username == null)
            return tcf.createTopicConnection();

        return tcf.createTopicConnection(username, password);
    }


    protected SyncConnection createSyncConnection(ConnectionFactory factory,
                                                  Connection connection,
                                                  int numSessions,
                                                  String threadName,
                                                  String clientID,
                                                  String username,
                                                  String password)
        throws JMSException
    {
        return new TopicSyncConnection((TopicConnectionFactory)factory,
                                       (TopicConnection)connection, numSessions,
                                       threadName, clientID, username, password);
    }

    protected AsyncConnection createAsyncConnection(ConnectionFactory factory,
                                                    Connection connection,
                                                    String threadName,
                                                    String clientID,
                                                    String username,
                                                    String password)
        throws JMSException
    {
        return new TopicAsyncConnection((TopicConnectionFactory)factory,
                                        (TopicConnection)connection, threadName,
                                        clientID, username, password);
    }

    public JMSEndpoint createEndpoint(String destination)
    {
        return new TopicEndpoint(destination);
    }

    /**
     * Create an endpoint for a queue destination.
     *
     * @param destination
     * @return
     * @throws JMSException
     */
    public JMSEndpoint createEndpoint(Destination destination)
        throws JMSException
    {
        if(!(destination instanceof Topic))
            throw new IllegalArgumentException("The input be a topic for this connector");
        return new TopicDestinationEndpoint((Topic)destination);
    }

    private TopicSession createTopicSession(TopicConnection connection, int ackMode)
        throws JMSException
    {
        return connection.createTopicSession(false,
                                             ackMode);
    }

    private Topic createTopic(TopicSession session, String subject)
        throws Exception
    {
        return m_adapter.getTopic(session, subject);
    }

    private TopicSubscriber createSubscriber(TopicSession session,
                                             TopicSubscription subscription)
        throws Exception
    {
        if(subscription.isDurable())
            return createDurableSubscriber(session,
                        (Topic)subscription.m_endpoint.getDestination(session),
                        subscription.m_subscriptionName,
                        subscription.m_messageSelector,
                        subscription.m_noLocal);
        else
            return createSubscriber(session,
                        (Topic)subscription.m_endpoint.getDestination(session),
                        subscription.m_messageSelector,
                        subscription.m_noLocal);
    }

    private TopicSubscriber createDurableSubscriber(TopicSession session,
                                                    Topic topic,
                                                    String subscriptionName,
                                                    String messageSelector,
                                                    boolean noLocal)
        throws JMSException
    {
        return session.createDurableSubscriber(topic, subscriptionName,
                                               messageSelector, noLocal);
    }

    private TopicSubscriber createSubscriber(TopicSession session,
                                             Topic topic,
                                             String messageSelector,
                                             boolean noLocal)
        throws JMSException
    {
        return session.createSubscriber(topic, messageSelector, noLocal);
    }




    private final class TopicAsyncConnection extends AsyncConnection
    {

        TopicAsyncConnection(TopicConnectionFactory connectionFactory,
                             TopicConnection connection,
                             String threadName,
                             String clientID,
                             String username,
                             String password)

            throws JMSException
        {
            super(connectionFactory, connection, threadName,
                  clientID, username, password);
        }

        protected ListenerSession createListenerSession(javax.jms.Connection connection,
                                                        Subscription subscription)
            throws Exception
        {
            TopicSession session = createTopicSession((TopicConnection)connection,
                                                      subscription.m_ackMode);
            TopicSubscriber subscriber = createSubscriber(session,
                                                (TopicSubscription)subscription);
            return new TopicListenerSession(session, subscriber,
                                                (TopicSubscription)subscription);
        }

        private final class TopicListenerSession extends ListenerSession
        {

            TopicListenerSession(TopicSession session,
                                 TopicSubscriber subscriber,
                                 TopicSubscription subscription)
                throws Exception
            {
                super(session, subscriber, subscription);
            }

            void cleanup()
            {
                try{m_consumer.close();}catch(Exception ignore){}
                try
                {
                    TopicSubscription sub = (TopicSubscription)m_subscription;
                    if(sub.isDurable() && sub.m_unsubscribe)
                    {
                        ((TopicSession)m_session).unsubscribe(sub.m_subscriptionName);
                    }
                }
                catch(Exception ignore){}
                try{m_session.close();}catch(Exception ignore){}

            }
        }
    }

    private final class TopicSyncConnection extends SyncConnection
    {
        TopicSyncConnection(TopicConnectionFactory connectionFactory,
                            TopicConnection connection,
                            int numSessions,
                            String threadName,
                            String clientID,
                            String username,
                            String password)

            throws JMSException
        {
            super(connectionFactory, connection, numSessions, threadName,
                  clientID, username, password);
        }

        protected SendSession createSendSession(javax.jms.Connection connection)
            throws JMSException
        {
            TopicSession session = createTopicSession((TopicConnection)connection,
                                            JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
            TopicPublisher publisher = session.createPublisher(null);
            return new TopicSendSession(session, publisher);
        }

        private final class TopicSendSession extends SendSession
        {
            TopicSendSession(TopicSession session,
                             TopicPublisher publisher)
                throws JMSException
            {
                super(session, publisher);
            }


            protected MessageConsumer createConsumer(Destination destination)
                throws JMSException
            {
                return createSubscriber((TopicSession)m_session, (Topic)destination,
                                        null, JMSConstants.DEFAULT_NO_LOCAL);
            }

            protected void deleteTemporaryDestination(Destination destination)
                throws JMSException
            {
                ((TemporaryTopic)destination).delete();
            }


            protected Destination createTemporaryDestination()
                throws JMSException
            {
                return ((TopicSession)m_session).createTemporaryTopic();
            }

            protected void send(Destination destination, Message message,
                                int deliveryMode, int priority, long timeToLive)
                throws JMSException
            {
                ((TopicPublisher)m_producer).publish((Topic)destination, message,
                                                deliveryMode, priority, timeToLive);
            }

        }
    }



    private class TopicEndpoint
        extends JMSEndpoint
    {
        String m_topicName;

        TopicEndpoint(String topicName)
        {
            super(TopicConnector.this);
            m_topicName = topicName;
        }

        Destination getDestination(Session session)
            throws Exception
        {
            return createTopic((TopicSession)session, m_topicName);
        }

        protected Subscription createSubscription(MessageListener listener,
                                                  HashMap properties)
        {
            return new TopicSubscription(listener, this, properties);
        }

        public String toString()
        {
            StringBuffer buffer = new StringBuffer("TopicEndpoint:");
            buffer.append(m_topicName);
            return buffer.toString();
        }

        public boolean equals(Object object)
        {
            if(!super.equals(object))
                return false;

            if(!(object instanceof TopicEndpoint))
                return false;

            return m_topicName.equals(((TopicEndpoint)object).m_topicName);
        }
    }

    private final class TopicSubscription extends Subscription
    {
        String m_subscriptionName;
        boolean m_unsubscribe;
        boolean m_noLocal;

        TopicSubscription(MessageListener listener,
                          JMSEndpoint endpoint,
                          HashMap properties)
        {
            super(listener, endpoint, properties);
            m_subscriptionName = MapUtils.removeStringProperty(properties,
                                                JMSConstants.SUBSCRIPTION_NAME,
                                                null);
            m_unsubscribe = MapUtils.removeBooleanProperty(properties,
                                                JMSConstants.UNSUBSCRIBE,
                                                JMSConstants.DEFAULT_UNSUBSCRIBE);
            m_noLocal = MapUtils.removeBooleanProperty(properties,
                                                JMSConstants.NO_LOCAL,
                                                JMSConstants.DEFAULT_NO_LOCAL);
        }

        boolean isDurable()
        {
            return m_subscriptionName != null;
        }

        public boolean equals(Object obj)
        {
            if(!super.equals(obj))
                return false;
            if(!(obj instanceof TopicSubscription))
                return false;

            TopicSubscription other = (TopicSubscription)obj;
            if(other.m_unsubscribe != m_unsubscribe || other.m_noLocal != m_noLocal)
                return false;

            if(isDurable())
            {
                return other.isDurable() && other.m_subscriptionName.equals(m_subscriptionName);
            }
            else if(other.isDurable())
                return false;
            else
                return true;
        }

        public String toString()
        {
            StringBuffer buffer = new StringBuffer(super.toString());
            buffer.append(":").append(m_noLocal).append(":").append(m_unsubscribe);
            if(isDurable())
            {
                buffer.append(":");
                buffer.append(m_subscriptionName);
            }
            return buffer.toString();
        }

    }

    private final class TopicDestinationEndpoint
        extends TopicEndpoint
    {
        Topic m_topic;

        TopicDestinationEndpoint(Topic topic)
            throws JMSException
        {
            super(topic.getTopicName());
            m_topic = topic;
        }

        Destination getDestination(Session session)
        {
            return m_topic;
        }

    }


}