/*
* Copyright 2001, 2002,2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.axis.transport.jms;
import org.apache.axis.components.jms.JMSVendorAdapter;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
// No vendor dependent exception classes
//import progress.message.client.EUserAlreadyConnected;
//import progress.message.jclient.ErrorCodes;
/**
* JMSConnector is an abstract class that encapsulates the work of connecting
* to JMS destinations. Its subclasses are TopicConnector and QueueConnector
* which further specialize connections to the pub-sub and the ptp domains.
* It also implements the capability to retry connections in the event of
* failures.
*
* @author Jaime Meritt (jmeritt@sonicsoftware.com)
* @author Richard Chung (rchung@sonicsoftware.com)
* @author Dave Chappell (chappell@sonicsoftware.com)
* @author Ray Chun (rchun@sonicsoftware.com)
*/
public abstract class JMSConnector
{
protected int m_numRetries;
protected long m_connectRetryInterval;
protected long m_interactRetryInterval;
protected long m_timeoutTime;
protected long m_poolTimeout;
protected AsyncConnection m_receiveConnection;
protected SyncConnection m_sendConnection;
protected int m_numSessions;
protected boolean m_allowReceive;
protected JMSVendorAdapter m_adapter;
protected JMSURLHelper m_jmsurl;
public JMSConnector(ConnectionFactory connectionFactory,
int numRetries,
int numSessions,
long connectRetryInterval,
long interactRetryInterval,
long timeoutTime,
boolean allowReceive,
String clientID,
String username,
String password,
JMSVendorAdapter adapter,
JMSURLHelper jmsurl)
throws JMSException
{
m_numRetries = numRetries;
m_connectRetryInterval = connectRetryInterval;
m_interactRetryInterval = interactRetryInterval;
m_timeoutTime = timeoutTime;
m_poolTimeout = timeoutTime/(long)numRetries;
m_numSessions = numSessions;
m_allowReceive = allowReceive;
m_adapter = adapter;
m_jmsurl = jmsurl;
// try to connect initially so we can fail fast
// in the case of irrecoverable errors.
// If we fail in a recoverable fashion we will retry
javax.jms.Connection sendConnection = createConnectionWithRetry(
connectionFactory,
username,
password);
m_sendConnection = createSyncConnection(connectionFactory, sendConnection,
m_numSessions, "SendThread",
clientID,
username,
password);
m_sendConnection.start();
if(m_allowReceive)
{
javax.jms.Connection receiveConnection = createConnectionWithRetry(
connectionFactory,
username,
password);
m_receiveConnection = createAsyncConnection(connectionFactory,
receiveConnection,
"ReceiveThread",
clientID,
username,
password);
m_receiveConnection.start();
}
}
public int getNumRetries()
{
return m_numRetries;
}
public int numSessions()
{
return m_numSessions;
}
public ConnectionFactory getConnectionFactory()
{
// there is always a send connection
return getSendConnection().getConnectionFactory();
}
public String getClientID()
{
return getSendConnection().getClientID();
}
public String getUsername()
{
return getSendConnection().getUsername();
}
public String getPassword()
{
return getSendConnection().getPassword();
}
public JMSVendorAdapter getVendorAdapter()
{
return m_adapter;
}
public JMSURLHelper getJMSURL()
{
return m_jmsurl;
}
protected javax.jms.Connection createConnectionWithRetry(
ConnectionFactory connectionFactory,
String username,
String password)
throws JMSException
{
javax.jms.Connection connection = null;
for(int numTries = 1; connection == null; numTries++)
{
try
{
connection = internalConnect(connectionFactory, username, password);
}
catch(JMSException jmse)
{
if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION) || numTries == m_numRetries)
throw jmse;
else
try{Thread.sleep(m_connectRetryInterval);}catch(InterruptedException ie){};
}
}
return connection;
}
public void stop()
{
JMSConnectorManager.getInstance().removeConnectorFromPool(this);
m_sendConnection.stopConnection();
if(m_allowReceive)
m_receiveConnection.stopConnection();
}
public void start()
{
m_sendConnection.startConnection();
if(m_allowReceive)
m_receiveConnection.startConnection();
JMSConnectorManager.getInstance().addConnectorToPool(this);
}
public void shutdown()
{
m_sendConnection.shutdown();
if(m_allowReceive)
m_receiveConnection.shutdown();
}
public abstract JMSEndpoint createEndpoint(String destinationName)
throws JMSException;
public abstract JMSEndpoint createEndpoint(Destination destination)
throws JMSException;
protected abstract javax.jms.Connection internalConnect(
ConnectionFactory connectionFactory,
String username,
String password)
throws JMSException;
private abstract class Connection extends Thread implements ExceptionListener
{
private ConnectionFactory m_connectionFactory;
protected javax.jms.Connection m_connection;
protected boolean m_isActive;
private boolean m_needsToConnect;
private boolean m_startConnection;
private String m_clientID;
private String m_username;
private String m_password;
private Object m_jmsLock;
private Object m_lifecycleLock;
protected Connection(ConnectionFactory connectionFactory,
javax.jms.Connection connection,
String threadName,
String clientID,
String username,
String password)
throws JMSException
{
super(threadName);
m_connectionFactory = connectionFactory;
m_clientID = clientID;
m_username = username;
m_password = password;
m_jmsLock = new Object();
m_lifecycleLock = new Object();
if (connection != null)
{
m_needsToConnect = false;
m_connection = connection;
m_connection.setExceptionListener(this);
if(m_clientID != null)
m_connection.setClientID(m_clientID);
}
else
{
m_needsToConnect = true;
}
m_isActive = true;
}
public ConnectionFactory getConnectionFactory()
{
return m_connectionFactory;
}
public String getClientID()
{
return m_clientID;
}
public String getUsername()
{
return m_username;
}
public String getPassword()
{
return m_password;
}
/**
* @todo handle non-recoverable errors
*/
public void run()
{
// loop until a connection is made and when a connection is made (re)establish
// any subscriptions
while (m_isActive)
{
if (m_needsToConnect)
{
m_connection = null;
try
{
m_connection = internalConnect(m_connectionFactory,
m_username, m_password);
m_connection.setExceptionListener(this);
if(m_clientID != null)
m_connection.setClientID(m_clientID);
}
catch(JMSException e)
{
// simply backoff for a while and then retry
try { Thread.sleep(m_connectRetryInterval); } catch(InterruptedException ie) { }
continue;
}
}
else
m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because
// we lost the connection
// we now have a valid connection so establish some context
try
{
internalOnConnect();
}
catch(Exception e)
{
// insert code to handle non recoverable errors
// simply retry
continue;
}
synchronized(m_jmsLock)
{
try { m_jmsLock.wait(); } catch(InterruptedException ie) { } // until notified due to some change in status
}
}
// no longer staying connected, so see what we can cleanup
internalOnShutdown();
}
void startConnection()
{
synchronized(m_lifecycleLock)
{
if(m_startConnection)
return;
m_startConnection = true;
try {m_connection.start();}catch(Throwable e) { } // ignore
}
}
void stopConnection()
{
synchronized(m_lifecycleLock)
{
if(!m_startConnection)
return;
m_startConnection = false;
try {m_connection.stop();}catch(Throwable e) { } // ignore
}
}
void shutdown()
{
m_isActive = false;
synchronized(m_jmsLock)
{
m_jmsLock.notifyAll();
}
}
public void onException(JMSException exception)
{
if(m_adapter.isRecoverable(exception,
JMSVendorAdapter.ON_EXCEPTION_ACTION))
return;
onException();
synchronized(m_jmsLock)
{
m_jmsLock.notifyAll();
}
}
private final void internalOnConnect()
throws Exception
{
onConnect();
synchronized(m_lifecycleLock)
{
if(m_startConnection)
{
try {m_connection.start();}catch(Throwable e) { } // ignore
}
}
}
private final void internalOnShutdown()
{
stopConnection();
onShutdown();
try { m_connection.close(); } catch(Throwable e) { } // ignore
}
protected abstract void onConnect()throws Exception;
protected abstract void onShutdown();
protected abstract void onException();
}
protected abstract SyncConnection createSyncConnection(ConnectionFactory factory,
javax.jms.Connection connection,
int numSessions,
String threadName,
String clientID,
String username,
String password)
throws JMSException;
SyncConnection getSendConnection()
{
return m_sendConnection;
}
protected abstract class SyncConnection extends Connection
{
LinkedList m_senders;
int m_numSessions;
Object m_senderLock;
SyncConnection(ConnectionFactory connectionFactory,
javax.jms.Connection connection,
int numSessions,
String threadName,
String clientID,
String username,
String password)
throws JMSException
{
super(connectionFactory, connection, threadName,
clientID, username, password);
m_senders = new LinkedList();
m_numSessions = numSessions;
m_senderLock = new Object();
}
protected abstract SendSession createSendSession(javax.jms.Connection connection)
throws JMSException;
protected void onConnect()
throws JMSException
{
synchronized(m_senderLock)
{
for(int i = 0; i < m_numSessions; i++)
{
m_senders.add(createSendSession(m_connection));
}
m_senderLock.notifyAll();
}
}
byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties)
throws Exception
{
long timeoutTime = System.currentTimeMillis() + timeout;
while(true)
{
if(System.currentTimeMillis() > timeoutTime)
{
throw new InvokeTimeoutException("Unable to complete call in time allotted");
}
SendSession sendSession = null;
try
{
sendSession = getSessionFromPool(m_poolTimeout);
byte[] response = sendSession.call(endpoint,
message,
timeoutTime - System.currentTimeMillis(),
properties);
returnSessionToPool(sendSession);
if(response == null)
{
throw new InvokeTimeoutException("Unable to complete call in time allotted");
}
return response;
}
catch(JMSException jmse)
{
if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION))
{
//this we cannot recover from
//but it does not invalidate the session
returnSessionToPool(sendSession);
throw jmse;
}
//for now we will assume this is a reconnect related issue
//and let the sender be collected
//give the reconnect thread a chance to fill the pool
Thread.yield();
continue;
}
catch(NullPointerException npe)
{
Thread.yield();
continue;
}
}
}
/** @todo add in handling for security exceptions
* @todo add support for timeouts */
void send(JMSEndpoint endpoint, byte[] message, HashMap properties)
throws Exception
{
long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
while(true)
{
if(System.currentTimeMillis() > timeoutTime)
{
throw new InvokeTimeoutException("Cannot complete send in time allotted");
}
SendSession sendSession = null;
try
{
sendSession = getSessionFromPool(m_poolTimeout);
sendSession.send(endpoint, message, properties);
returnSessionToPool(sendSession);
}
catch(JMSException jmse)
{
if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION))
{
//this we cannot recover from
//but it does not invalidate the session
returnSessionToPool(sendSession);
throw jmse;
}
//for now we will assume this is a reconnect related issue
//and let the sender be collected
//give the reconnect thread a chance to fill the pool
Thread.yield();
continue;
}
catch(NullPointerException npe)
{
//give the reconnect thread a chance to fill the pool
Thread.yield();
continue;
}
break;
}
}
protected void onException()
{
synchronized(m_senderLock)
{
m_senders.clear();
}
}
protected void onShutdown()
{
synchronized(m_senderLock)
{
Iterator senders = m_senders.iterator();
while(senders.hasNext())
{
SendSession session = (SendSession)senders.next();
session.cleanup();
}
m_senders.clear();
}
}
private SendSession getSessionFromPool(long timeout)
{
synchronized(m_senderLock)
{
while(m_senders.size() == 0)
{
try
{
m_senderLock.wait(timeout);
if(m_senders.size() == 0)
{
return null;
}
}
catch(InterruptedException ignore)
{
return null;
}
}
return (SendSession)m_senders.removeFirst();
}
}
private void returnSessionToPool(SendSession sendSession)
{
synchronized(m_senderLock)
{
m_senders.addLast(sendSession);
m_senderLock.notifyAll();
}
}
protected abstract class SendSession extends ConnectorSession
{
MessageProducer m_producer;
SendSession(Session session,
MessageProducer producer)
throws JMSException
{
super(session);
m_producer = producer;
}
protected abstract Destination createTemporaryDestination()
throws JMSException;
protected abstract void deleteTemporaryDestination(Destination destination)
throws JMSException;
protected abstract MessageConsumer createConsumer(Destination destination)
throws JMSException;
protected abstract void send(Destination destination,
Message message,
int deliveryMode,
int priority,
long timeToLive)
throws JMSException;
void send(JMSEndpoint endpoint, byte[] message, HashMap properties)
throws Exception
{
BytesMessage jmsMessage = m_session.createBytesMessage();
jmsMessage.writeBytes(message);
int deliveryMode = extractDeliveryMode(properties);
int priority = extractPriority(properties);
long timeToLive = extractTimeToLive(properties);
if(properties != null && !properties.isEmpty())
setProperties(properties, jmsMessage);
send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
priority, timeToLive);
}
void cleanup()
{
try{m_producer.close();}catch(Throwable t){}
try{m_session.close();}catch(Throwable t){}
}
byte[] call(JMSEndpoint endpoint, byte[] message, long timeout,
HashMap properties)
throws Exception
{
Destination reply = createTemporaryDestination();
MessageConsumer subscriber = createConsumer(reply);
BytesMessage jmsMessage = m_session.createBytesMessage();
jmsMessage.writeBytes(message);
jmsMessage.setJMSReplyTo(reply);
int deliveryMode = extractDeliveryMode(properties);
int priority = extractPriority(properties);
long timeToLive = extractTimeToLive(properties);
if(properties != null && !properties.isEmpty())
setProperties(properties, jmsMessage);
send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
priority, timeToLive);
BytesMessage response = null;
try {
response = (BytesMessage)subscriber.receive(timeout);
} catch (ClassCastException cce) {
throw new InvokeException
("Error: unexpected message type received - expected BytesMessage");
}
byte[] respBytes = null;
if(response != null)
{
byte[] buffer = new byte[8 * 1024];
ByteArrayOutputStream out = new ByteArrayOutputStream();
for(int bytesRead = response.readBytes(buffer);
bytesRead != -1; bytesRead = response.readBytes(buffer))
{
out.write(buffer, 0, bytesRead);
}
respBytes = out.toByteArray();
}
subscriber.close();
deleteTemporaryDestination(reply);
return respBytes;
}
private int extractPriority(HashMap properties)
{
return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,
JMSConstants.DEFAULT_PRIORITY);
}
private int extractDeliveryMode(HashMap properties)
{
return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE,
JMSConstants.DEFAULT_DELIVERY_MODE);
}
private long extractTimeToLive(HashMap properties)
{
return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE,
JMSConstants.DEFAULT_TIME_TO_LIVE);
}
private void setProperties(HashMap properties, Message message)
throws JMSException
{
Iterator propertyIter = properties.entrySet().iterator();
while(propertyIter.hasNext())
{
Map.Entry property = (Map.Entry)propertyIter.next();
setProperty((String)property.getKey(), property.getValue(),
message);
}
}
private void setProperty(String property, Object value, Message message)
throws JMSException
{
if(property == null)
return;
if(property.equals(JMSConstants.JMS_CORRELATION_ID))
message.setJMSCorrelationID((String)value);
else if(property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES))
message.setJMSCorrelationIDAsBytes((byte[])value);
else if(property.equals(JMSConstants.JMS_TYPE))
message.setJMSType((String)value);
else
message.setObjectProperty(property, value);
}
}
}
AsyncConnection getReceiveConnection()
{
return m_receiveConnection;
}
protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,
javax.jms.Connection connection,
String threadName,
String clientID,
String username,
String password)
throws JMSException;
protected abstract class AsyncConnection extends Connection
{
HashMap m_subscriptions;
Object m_subscriptionLock;
protected AsyncConnection(ConnectionFactory connectionFactory,
javax.jms.Connection connection,
String threadName,
String clientID,
String username,
String password)
throws JMSException
{
super(connectionFactory, connection, threadName,
clientID, username, password);
m_subscriptions = new HashMap();
m_subscriptionLock = new Object();
}
protected abstract ListenerSession createListenerSession(
javax.jms.Connection connection,
Subscription subscription)
throws Exception;
protected void onShutdown()
{
synchronized(m_subscriptionLock)
{
Iterator subscriptions = m_subscriptions.keySet().iterator();
while(subscriptions.hasNext())
{
Subscription subscription = (Subscription)subscriptions.next();
ListenerSession session = (ListenerSession)
m_subscriptions.get(subscription);
if(session != null)
{
session.cleanup();
}
}
m_subscriptions.clear();
}
}
/**
* @todo add in security exception propagation
* @param subscription
*/
void subscribe(Subscription subscription)
throws Exception
{
long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
synchronized(m_subscriptionLock)
{
if(m_subscriptions.containsKey(subscription))
return;
while(true)
{
if(System.currentTimeMillis() > timeoutTime)
{
throw new InvokeTimeoutException("Cannot subscribe listener");
}
try
{
ListenerSession session = createListenerSession(m_connection,
subscription);
m_subscriptions.put(subscription, session);
break;
}
catch(JMSException jmse)
{
if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION))
{
throw jmse;
}
try{m_subscriptionLock.wait(m_interactRetryInterval);}
catch(InterruptedException ignore){}
//give reconnect a chance
Thread.yield();
continue;
}
catch(NullPointerException jmse)
{
//we ARE reconnecting
try{m_subscriptionLock.wait(m_interactRetryInterval);}
catch(InterruptedException ignore){}
//give reconnect a chance
Thread.yield();
continue;
}
}
}
}
void unsubscribe(Subscription subscription)
{
long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
synchronized(m_subscriptionLock)
{
if(!m_subscriptions.containsKey(subscription))
return;
while(true)
{
if(System.currentTimeMillis() > timeoutTime)
{
throw new InvokeTimeoutException("Cannot unsubscribe listener");
}
//give reconnect a chance
Thread.yield();
try
{
ListenerSession session = (ListenerSession)
m_subscriptions.get(subscription);
session.cleanup();
m_subscriptions.remove(subscription);
break;
}
catch(NullPointerException jmse)
{
//we are reconnecting
try{m_subscriptionLock.wait(m_interactRetryInterval);}
catch(InterruptedException ignore){}
continue;
}
}
}
}
protected void onConnect()
throws Exception
{
synchronized(m_subscriptionLock)
{
Iterator subscriptions = m_subscriptions.keySet().iterator();
while(subscriptions.hasNext())
{
Subscription subscription = (Subscription)subscriptions.next();
if(m_subscriptions.get(subscription) == null)
{
m_subscriptions.put(subscription,
createListenerSession(m_connection, subscription));
}
}
m_subscriptionLock.notifyAll();
}
}
protected void onException()
{
synchronized(m_subscriptionLock)
{
Iterator subscriptions = m_subscriptions.keySet().iterator();
while(subscriptions.hasNext())
{
Subscription subscription = (Subscription)subscriptions.next();
m_subscriptions.put(subscription, null);
}
}
}
protected class ListenerSession extends ConnectorSession
{
protected MessageConsumer m_consumer;
protected Subscription m_subscription;
ListenerSession(Session session,
MessageConsumer consumer,
Subscription subscription)
throws Exception
{
super(session);
m_subscription = subscription;
m_consumer = consumer;
Destination destination = subscription.m_endpoint.getDestination(m_session);
m_consumer.setMessageListener(subscription.m_listener);
}
void cleanup()
{
try{m_consumer.close();}catch(Exception ignore){}
try{m_session.close();}catch(Exception ignore){}
}
}
}
private abstract class ConnectorSession
{
Session m_session;
ConnectorSession(Session session)
throws JMSException
{
m_session = session;
}
}
} |