/*
* Copyright 2001, 2002,2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.axis.transport.jms;
import org.apache.axis.AxisFault;
import org.apache.axis.components.jms.JMSVendorAdapter;
import org.apache.axis.components.logger.LogFactory;
import org.apache.axis.utils.Messages;
import org.apache.commons.logging.Log;
import java.util.HashMap;
import java.util.Iterator;
/**
* JMSConnectorManager manages a pool of connectors and works with the
* vendor adapters to support the reuse of JMS connections.
*
* @author Ray Chun (rchun@sonicsoftware.com)
*/
public class JMSConnectorManager
{
protected static Log log =
LogFactory.getLog(JMSConnectorManager.class.getName());
private static JMSConnectorManager s_instance = new JMSConnectorManager();
private static HashMap vendorConnectorPools = new HashMap();
private int DEFAULT_WAIT_FOR_SHUTDOWN = 90000; // 1.5 minutes
private JMSConnectorManager()
{
}
public static JMSConnectorManager getInstance()
{
return s_instance;
}
/**
* Returns the pool of JMSConnectors for a particular vendor
*/
public ShareableObjectPool getVendorPool(String vendorId)
{
return (ShareableObjectPool)vendorConnectorPools.get(vendorId);
}
/**
* Retrieves a JMSConnector that satisfies the provided connector criteria
*/
public JMSConnector getConnector(HashMap connectorProperties,
HashMap connectionFactoryProperties,
String username,
String password,
JMSVendorAdapter vendorAdapter)
throws AxisFault
{
JMSConnector connector = null;
try
{
// check for a vendor-specific pool, and create if necessary
ShareableObjectPool vendorConnectors = getVendorPool(vendorAdapter.getVendorId());
if (vendorConnectors == null)
{
synchronized (vendorConnectorPools)
{
vendorConnectors = getVendorPool(vendorAdapter.getVendorId());
if (vendorConnectors == null)
{
vendorConnectors = new ShareableObjectPool();
vendorConnectorPools.put(vendorAdapter.getVendorId(), vendorConnectors);
}
}
}
// look for a matching JMSConnector among existing connectors
synchronized (vendorConnectors)
{
try
{
connector = JMSConnectorFactory.matchConnector(vendorConnectors.getElements(),
connectorProperties,
connectionFactoryProperties,
username,
password,
vendorAdapter);
}
catch (Exception e) {} // ignore. a new connector will be created if no match is found
if (connector == null)
{
connector = JMSConnectorFactory.createClientConnector(connectorProperties,
connectionFactoryProperties,
username,
password,
vendorAdapter);
connector.start();
}
}
}
catch (Exception e)
{
log.error(Messages.getMessage("cannotConnectError"), e);
if(e instanceof AxisFault)
throw (AxisFault)e;
throw new AxisFault("cannotConnect", e);
}
return connector;
}
/**
* Closes JMSConnectors in all pools
*/
void closeAllConnectors()
{
if (log.isDebugEnabled()) {
log.debug("Enter: JMSConnectorManager::closeAllConnectors");
}
synchronized (vendorConnectorPools)
{
Iterator iter = vendorConnectorPools.values().iterator();
while (iter.hasNext())
{
// close all connectors in the vendor pool
ShareableObjectPool pool = (ShareableObjectPool)iter.next();
synchronized (pool)
{
java.util.Iterator connectors = pool.getElements().iterator();
while (connectors.hasNext())
{
JMSConnector conn = (JMSConnector)connectors.next();
try
{
// shutdown automatically decrements the ref count of a connector before closing it
// call reserve() to simulate the checkout
reserve(conn);
closeConnector(conn);
}
catch (Exception e) {} // ignore. the connector is already being deactivated
}
}
}
}
if (log.isDebugEnabled()) {
log.debug("Exit: JMSConnectorManager::closeAllConnectors");
}
}
/**
* Closes JMS connectors that match the specified endpoint address
*/
void closeMatchingJMSConnectors(HashMap connectorProps, HashMap cfProps,
String username, String password,
JMSVendorAdapter vendorAdapter)
{
if (log.isDebugEnabled()) {
log.debug("Enter: JMSConnectorManager::closeMatchingJMSConnectors");
}
try
{
String vendorId = vendorAdapter.getVendorId();
// get the vendor-specific pool of connectors
ShareableObjectPool vendorConnectors = null;
synchronized (vendorConnectorPools)
{
vendorConnectors = getVendorPool(vendorId);
}
// it's possible that there is no pool for that vendor
if (vendorConnectors == null)
return;
synchronized (vendorConnectors)
{
// close any matched connectors
JMSConnector connector = null;
while ((vendorConnectors.size() > 0) &&
(connector = JMSConnectorFactory.matchConnector(vendorConnectors.getElements(),
connectorProps,
cfProps,
username,
password,
vendorAdapter)) != null)
{
closeConnector(connector);
}
}
}
catch (Exception e)
{
log.warn(Messages.getMessage("failedJMSConnectorShutdown"), e);
}
if (log.isDebugEnabled()) {
log.debug("Exit: JMSConnectorManager::closeMatchingJMSConnectors");
}
}
private void closeConnector(JMSConnector conn)
{
conn.stop();
conn.shutdown();
}
/**
* Adds a JMSConnector to the appropriate vendor pool
*/
public void addConnectorToPool(JMSConnector conn)
{
if (log.isDebugEnabled()) {
log.debug("Enter: JMSConnectorManager::addConnectorToPool");
}
ShareableObjectPool vendorConnectors = null;
synchronized (vendorConnectorPools)
{
String vendorId = conn.getVendorAdapter().getVendorId();
vendorConnectors = getVendorPool(vendorId);
// it's possible the pool does not yet exist (if, for example, the connector
// is created before invoking the call/JMSTransport, as is the case with
// SimpleJMSListener)
if (vendorConnectors == null)
{
vendorConnectors = new ShareableObjectPool();
vendorConnectorPools.put(vendorId, vendorConnectors);
}
}
synchronized (vendorConnectors)
{
vendorConnectors.addObject(conn);
}
if (log.isDebugEnabled()) {
log.debug("Exit: JMSConnectorManager::addConnectorToPool");
}
}
/**
* Removes a JMSConnector from the appropriate vendor pool
*/
public void removeConnectorFromPool(JMSConnector conn)
{
if (log.isDebugEnabled()) {
log.debug("Enter: JMSConnectorManager::removeConnectorFromPool");
}
ShareableObjectPool vendorConnectors = null;
synchronized (vendorConnectorPools)
{
vendorConnectors = getVendorPool(conn.getVendorAdapter().getVendorId());
}
if (vendorConnectors == null)
return;
synchronized (vendorConnectors)
{
// first release, to decrement the ref count (it is automatically incremented when
// the connector is matched)
vendorConnectors.release(conn);
vendorConnectors.removeObject(conn);
}
if (log.isDebugEnabled()) {
log.debug("Exit: JMSConnectorManager::removeConnectorFromPool");
}
}
/**
* Performs a non-exclusive checkout of the JMSConnector
*/
public void reserve(JMSConnector connector) throws Exception
{
ShareableObjectPool pool = null;
synchronized (vendorConnectorPools)
{
pool = getVendorPool(connector.getVendorAdapter().getVendorId());
}
if (pool != null)
pool.reserve(connector);
}
/**
* Performs a non-exclusive checkin of the JMSConnector
*/
public void release(JMSConnector connector)
{
ShareableObjectPool pool = null;
synchronized (vendorConnectorPools)
{
pool = getVendorPool(connector.getVendorAdapter().getVendorId());
}
if (pool != null)
pool.release(connector);
}
/**
* A simple non-blocking pool impl for objects that can be shared.
* Only a ref count is necessary to prevent collisions at shutdown.
* Todo: max size, cleanup stale connections
*/
public class ShareableObjectPool
{
// maps object to ref count wrapper
private java.util.HashMap m_elements;
// holds objects which should no longer be leased (pending removal)
private java.util.HashMap m_expiring;
private int m_numElements = 0;
public ShareableObjectPool()
{
m_elements = new java.util.HashMap();
m_expiring = new java.util.HashMap();
}
/**
* Adds the object to the pool, if not already added
*/
public void addObject(Object obj)
{
ReferenceCountedObject ref = new ReferenceCountedObject(obj);
synchronized (m_elements)
{
if (!m_elements.containsKey(obj) && !m_expiring.containsKey(obj))
m_elements.put(obj, ref);
}
}
/**
* Removes the object from the pool. If the object is reserved,
* waits the specified time before forcibly removing
* Todo: check expirations with the next request instead of holding up the current request
*/
public void removeObject(Object obj, long waitTime)
{
ReferenceCountedObject ref = null;
synchronized (m_elements)
{
ref = (ReferenceCountedObject)m_elements.get(obj);
if (ref == null)
return;
m_elements.remove(obj);
if (ref.count() == 0)
return;
else
// mark the object for expiration
m_expiring.put(obj, ref);
}
// connector is now marked for expiration. wait for the ref count to drop to zero
long expiration = System.currentTimeMillis() + waitTime;
while (ref.count() > 0)
{
try
{
Thread.sleep(5000);
}
catch (InterruptedException e) {} // ignore
if (System.currentTimeMillis() > expiration)
break;
}
// also clear from the expiring list
m_expiring.remove(obj);
}
public void removeObject(Object obj)
{
removeObject(obj, DEFAULT_WAIT_FOR_SHUTDOWN);
}
/**
* Marks the connector as in use by incrementing the connector's reference count
*/
public void reserve(Object obj) throws Exception
{
synchronized (m_elements)
{
if (m_expiring.containsKey(obj))
throw new Exception("resourceUnavailable");
ReferenceCountedObject ref = (ReferenceCountedObject)m_elements.get(obj);
ref.increment();
}
}
/**
* Decrements the connector's reference count
*/
public void release(Object obj)
{
synchronized (m_elements)
{
ReferenceCountedObject ref = (ReferenceCountedObject)m_elements.get(obj);
ref.decrement();
}
}
public synchronized java.util.Set getElements()
{
return m_elements.keySet();
}
public synchronized int size()
{
return m_elements.size();
}
/**
* Wrapper to track the use count of an object
*/
public class ReferenceCountedObject
{
private Object m_object;
private int m_refCount;
public ReferenceCountedObject(Object obj)
{
m_object = obj;
m_refCount = 0;
}
public synchronized void increment()
{
m_refCount++;
}
public synchronized void decrement()
{
if (m_refCount > 0)
m_refCount--;
}
public synchronized int count()
{
return m_refCount;
}
}
}
} |