FileDocCategorySizeDatePackage
JMSConnectorManager.javaAPI DocApache Axis 1.415519Sat Apr 22 18:57:28 BST 2006org.apache.axis.transport.jms

JMSConnectorManager.java

/*
 * Copyright 2001, 2002,2004 The Apache Software Foundation.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.axis.transport.jms;

import org.apache.axis.AxisFault;
import org.apache.axis.components.jms.JMSVendorAdapter;
import org.apache.axis.components.logger.LogFactory;
import org.apache.axis.utils.Messages;
import org.apache.commons.logging.Log;

import java.util.HashMap;
import java.util.Iterator;

/**
 * JMSConnectorManager manages a pool of connectors and works with the
 * vendor adapters to support the reuse of JMS connections.
 *
 * @author Ray Chun (rchun@sonicsoftware.com)
 */
public class JMSConnectorManager
{
    protected static Log log =
            LogFactory.getLog(JMSConnectorManager.class.getName());

    private static JMSConnectorManager s_instance = new JMSConnectorManager();

    private static HashMap vendorConnectorPools = new HashMap();
    private int DEFAULT_WAIT_FOR_SHUTDOWN = 90000; // 1.5 minutes

    private JMSConnectorManager()
    {
    }

    public static JMSConnectorManager getInstance()
    {
        return s_instance;
    }

    /**
     * Returns the pool of JMSConnectors for a particular vendor
     */
    public ShareableObjectPool getVendorPool(String vendorId)
    {
        return (ShareableObjectPool)vendorConnectorPools.get(vendorId);
    }

    /**
     * Retrieves a JMSConnector that satisfies the provided connector criteria
     */
    public JMSConnector getConnector(HashMap connectorProperties,
                                     HashMap connectionFactoryProperties,
                                     String username,
                                     String password,
                                     JMSVendorAdapter vendorAdapter)
        throws AxisFault
    {
        JMSConnector connector = null;

        try
        {
            // check for a vendor-specific pool, and create if necessary
            ShareableObjectPool vendorConnectors = getVendorPool(vendorAdapter.getVendorId());
            if (vendorConnectors == null)
            {
                synchronized (vendorConnectorPools)
                {
                    vendorConnectors = getVendorPool(vendorAdapter.getVendorId());
                    if (vendorConnectors == null)
                    {
                        vendorConnectors = new ShareableObjectPool();
                        vendorConnectorPools.put(vendorAdapter.getVendorId(), vendorConnectors);
                    }
                }
            }

            // look for a matching JMSConnector among existing connectors
            synchronized (vendorConnectors)
            {
                try
                {

                    connector = JMSConnectorFactory.matchConnector(vendorConnectors.getElements(),
                                                                   connectorProperties,
                                                                   connectionFactoryProperties,
                                                                   username,
                                                                   password,
                                                                   vendorAdapter);
                }
                catch (Exception e) {} // ignore. a new connector will be created if no match is found

                if (connector == null)
                {
                        connector = JMSConnectorFactory.createClientConnector(connectorProperties,
                                                                              connectionFactoryProperties,
                                                                              username,
                                                                              password,
                                                                              vendorAdapter);
                        connector.start();
                }
            }
        }
        catch (Exception e)
        {
            log.error(Messages.getMessage("cannotConnectError"), e);

            if(e instanceof AxisFault)
                throw (AxisFault)e;
            throw new AxisFault("cannotConnect", e);
        }

        return connector;
    }

    /**
     * Closes JMSConnectors in all pools
     */
    void closeAllConnectors()
    {
        if (log.isDebugEnabled()) {
            log.debug("Enter: JMSConnectorManager::closeAllConnectors");
        }

        synchronized (vendorConnectorPools)
        {
            Iterator iter = vendorConnectorPools.values().iterator();
            while (iter.hasNext())
            {
                // close all connectors in the vendor pool
                ShareableObjectPool pool = (ShareableObjectPool)iter.next();
                synchronized (pool)
                {
                    java.util.Iterator connectors = pool.getElements().iterator();
                    while (connectors.hasNext())
                    {
                        JMSConnector conn = (JMSConnector)connectors.next();
                        try
                        {
                            // shutdown automatically decrements the ref count of a connector before closing it
                            // call reserve() to simulate the checkout
                            reserve(conn);
                            closeConnector(conn);
                        }
                        catch (Exception e) {} // ignore. the connector is already being deactivated
                    }
                }
            }
        }

        if (log.isDebugEnabled()) {
            log.debug("Exit: JMSConnectorManager::closeAllConnectors");
        }
    }

    /**
     * Closes JMS connectors that match the specified endpoint address
     */
    void closeMatchingJMSConnectors(HashMap connectorProps, HashMap cfProps,
                                    String username, String password,
                                    JMSVendorAdapter vendorAdapter)
    {
        if (log.isDebugEnabled()) {
            log.debug("Enter: JMSConnectorManager::closeMatchingJMSConnectors");
        }

        try
        {
            String vendorId = vendorAdapter.getVendorId();

            // get the vendor-specific pool of connectors
            ShareableObjectPool vendorConnectors = null;
            synchronized (vendorConnectorPools)
            {
                vendorConnectors = getVendorPool(vendorId);
            }

            // it's possible that there is no pool for that vendor
            if (vendorConnectors == null)
                return;

            synchronized (vendorConnectors)
            {
                // close any matched connectors
                JMSConnector connector = null;
                while ((vendorConnectors.size() > 0) &&
                       (connector = JMSConnectorFactory.matchConnector(vendorConnectors.getElements(),
                                                                       connectorProps,
                                                                       cfProps,
                                                                       username,
                                                                       password,
                                                                       vendorAdapter)) != null)
                {
                    closeConnector(connector);
                }
            }
        }
        catch (Exception e)
        {
            log.warn(Messages.getMessage("failedJMSConnectorShutdown"), e);
        }

        if (log.isDebugEnabled()) {
            log.debug("Exit: JMSConnectorManager::closeMatchingJMSConnectors");
        }
    }

    private void closeConnector(JMSConnector conn)
    {
        conn.stop();
        conn.shutdown();
    }

    /**
     * Adds a JMSConnector to the appropriate vendor pool
     */
    public void addConnectorToPool(JMSConnector conn)
    {
        if (log.isDebugEnabled()) {
            log.debug("Enter: JMSConnectorManager::addConnectorToPool");
        }

        ShareableObjectPool vendorConnectors = null;
        synchronized (vendorConnectorPools)
        {
            String vendorId = conn.getVendorAdapter().getVendorId();
            vendorConnectors = getVendorPool(vendorId);
            // it's possible the pool does not yet exist (if, for example, the connector
            // is created before invoking the call/JMSTransport, as is the case with
            // SimpleJMSListener)
            if (vendorConnectors == null)
            {
                vendorConnectors = new ShareableObjectPool();
                vendorConnectorPools.put(vendorId, vendorConnectors);
            }
        }

        synchronized (vendorConnectors)
        {
            vendorConnectors.addObject(conn);
        }

        if (log.isDebugEnabled()) {
            log.debug("Exit: JMSConnectorManager::addConnectorToPool");
        }
    }

    /**
     * Removes a JMSConnector from the appropriate vendor pool
     */
    public void removeConnectorFromPool(JMSConnector conn)
    {
        if (log.isDebugEnabled()) {
            log.debug("Enter: JMSConnectorManager::removeConnectorFromPool");
        }

        ShareableObjectPool vendorConnectors = null;
        synchronized (vendorConnectorPools)
        {
            vendorConnectors = getVendorPool(conn.getVendorAdapter().getVendorId());
        }
        if (vendorConnectors == null)
            return;

        synchronized (vendorConnectors)
        {
            // first release, to decrement the ref count (it is automatically incremented when
            // the connector is matched)
            vendorConnectors.release(conn);
            vendorConnectors.removeObject(conn);
        }

        if (log.isDebugEnabled()) {
            log.debug("Exit: JMSConnectorManager::removeConnectorFromPool");
        }
    }

    /**
     * Performs a non-exclusive checkout of the JMSConnector
     */
    public void reserve(JMSConnector connector) throws Exception
    {
        ShareableObjectPool pool = null;
        synchronized (vendorConnectorPools)
        {
            pool = getVendorPool(connector.getVendorAdapter().getVendorId());
        }
        if (pool != null)
            pool.reserve(connector);
    }

    /**
     * Performs a non-exclusive checkin of the JMSConnector
     */
    public void release(JMSConnector connector)
    {
        ShareableObjectPool pool = null;
        synchronized (vendorConnectorPools)
        {
            pool = getVendorPool(connector.getVendorAdapter().getVendorId());
        }
        if (pool != null)
            pool.release(connector);
    }

    /**
     * A simple non-blocking pool impl for objects that can be shared.
     * Only a ref count is necessary to prevent collisions at shutdown.
     * Todo: max size, cleanup stale connections
     */
    public class ShareableObjectPool
    {
        // maps object to ref count wrapper
        private java.util.HashMap m_elements;

        // holds objects which should no longer be leased (pending removal)
        private java.util.HashMap m_expiring;

        private int m_numElements = 0;

        public ShareableObjectPool()
        {
            m_elements = new java.util.HashMap();
            m_expiring = new java.util.HashMap();
        }

        /**
         * Adds the object to the pool, if not already added
         */
        public void addObject(Object obj)
        {
            ReferenceCountedObject ref = new ReferenceCountedObject(obj);
            synchronized (m_elements)
            {
                if (!m_elements.containsKey(obj) && !m_expiring.containsKey(obj))
                    m_elements.put(obj, ref);
            }
        }

        /**
         * Removes the object from the pool.  If the object is reserved,
         * waits the specified time before forcibly removing
         * Todo: check expirations with the next request instead of holding up the current request
         */
        public void removeObject(Object obj, long waitTime)
        {
            ReferenceCountedObject ref = null;
            synchronized (m_elements)
            {
                ref = (ReferenceCountedObject)m_elements.get(obj);
                if (ref == null)
                    return;

                m_elements.remove(obj);

                if (ref.count() == 0)
                    return;
                else
                    // mark the object for expiration
                    m_expiring.put(obj, ref);
            }

            // connector is now marked for expiration. wait for the ref count to drop to zero
            long expiration = System.currentTimeMillis() + waitTime;
            while (ref.count() > 0)
            {
                try
                {
                    Thread.sleep(5000);
                }
                catch (InterruptedException e) {} // ignore
                if (System.currentTimeMillis() > expiration)
                    break;
            }

            // also clear from the expiring list
            m_expiring.remove(obj);
        }

        public void removeObject(Object obj)
        {
            removeObject(obj, DEFAULT_WAIT_FOR_SHUTDOWN);
        }

        /**
         * Marks the connector as in use by incrementing the connector's reference count
         */
        public void reserve(Object obj) throws Exception
        {
            synchronized (m_elements)
            {
                if (m_expiring.containsKey(obj))
                    throw new Exception("resourceUnavailable");

                ReferenceCountedObject ref = (ReferenceCountedObject)m_elements.get(obj);
                ref.increment();
            }
        }

        /**
         * Decrements the connector's reference count
         */
        public void release(Object obj)
        {
            synchronized (m_elements)
            {
                ReferenceCountedObject ref = (ReferenceCountedObject)m_elements.get(obj);
                ref.decrement();
            }
        }

        public synchronized java.util.Set getElements()
        {
            return m_elements.keySet();
        }

        public synchronized int size()
        {
            return m_elements.size();
        }

        /**
         * Wrapper to track the use count of an object
         */
        public class ReferenceCountedObject
        {
            private Object m_object;
            private int m_refCount;

            public ReferenceCountedObject(Object obj)
            {
                m_object = obj;
                m_refCount = 0;
            }

            public synchronized void increment()
            {
                m_refCount++;
            }

            public synchronized void decrement()
            {
                if (m_refCount > 0)
                    m_refCount--;
            }

            public synchronized int count()
            {
                return m_refCount;
            }
        }
    }
}