Fields Summary |
---|
private static final Logger | logThe log |
public static final Method | ONMESSAGEThe onMessage method |
protected org.jboss.resource.adapter.jms.JmsResourceAdapter | raThe resource adapter |
protected JmsActivationSpec | specThe activation spec |
protected javax.resource.spi.endpoint.MessageEndpointFactory | endpointFactoryThe message endpoint factory |
protected EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean | deliveryActiveWhether delivery is active |
protected org.jboss.jms.jndi.JMSProviderAdapter | adapterThe jms provider adapter |
protected javax.jms.Destination | destinationThe destination |
protected javax.jms.Connection | connectionThe connection |
protected JmsServerSessionPool | poolThe server session pool |
protected boolean | isDeliveryTransactedIs the delivery transacted |
protected DLQHandler | dlqHandlerThe DLQ handler |
protected TransactionManager | tmThe TransactionManager |
Methods Summary |
---|
public JmsActivationSpec | getActivationSpec()
return spec;
|
public javax.jms.Connection | getConnection()
return connection;
|
public DLQHandler | getDLQHandler()
return dlqHandler;
|
public javax.jms.Destination | getDestination()
return destination;
|
public javax.resource.spi.endpoint.MessageEndpointFactory | getMessageEndpointFactory()
return endpointFactory;
|
public org.jboss.jms.jndi.JMSProviderAdapter | getProviderAdapter()
return adapter;
|
public javax.transaction.TransactionManager | getTransactionManager()
if (tm == null)
{
tm = TransactionManagerLocator.getInstance().locate();
}
return tm;
|
public javax.resource.spi.work.WorkManager | getWorkManager()
return ra.getWorkManager();
|
public void | handleFailure(java.lang.Throwable failure)Handles any failure by trying to reconnect
log.warn("Failure in jms activation " + spec, failure);
while (deliveryActive.get())
{
teardown();
try
{
Thread.sleep(spec.getReconnectIntervalLong());
}
catch (InterruptedException e)
{
log.debug("Interrupted trying to reconnect " + spec, e);
break;
}
log.info("Attempting to reconnect " + spec);
try
{
setup();
log.info("Reconnected with messaging provider.");
break;
}
catch (Throwable t)
{
log.error("Unable to reconnect " + spec, t);
}
}
|
public boolean | isDeliveryTransacted()
return isDeliveryTransacted;
|
public void | onException(javax.jms.JMSException exception)
handleFailure(exception);
|
protected void | setup()Setup the activation
log.debug("Setting up " + spec);
setupJMSProviderAdapter();
Context ctx = adapter.getInitialContext();
log.debug("Using context " + ctx.getEnvironment() + " for " + spec);
try
{
setupDLQ(ctx);
setupDestination(ctx);
setupConnection(ctx);
}
finally
{
ctx.close();
}
setupSessionPool();
log.debug("Setup complete " + this);
|
protected void | setupConnection(javax.naming.Context ctx)Setup the Connection
log.debug("setup connection " + this);
String user = spec.getUser();
String pass = spec.getPassword();
String clientID = spec.getClientId();
if (spec.isTopic())
connection = setupTopicConnection(ctx, user, pass, clientID);
else
connection = setupQueueConnection(ctx, user, pass, clientID);
log.debug("established connection " + this);
|
protected void | setupDLQ(javax.naming.Context ctx)Setup the DLQ
if (spec.isUseDLQ())
{
Class clazz = Thread.currentThread().getContextClassLoader().loadClass(spec.getDLQHandler());
dlqHandler = (DLQHandler) clazz.newInstance();
dlqHandler.setup(this, ctx);
}
log.debug("Setup DLQ " + this);
|
protected void | setupDestination(javax.naming.Context ctx)Setup the Destination
Class destinationType;
if (spec.isTopic())
destinationType = Topic.class;
else
destinationType = Queue.class;
String destinationName = spec.getDestination();
log.debug("Retrieving destination " + destinationName + " of type " + destinationType.getName());
destination = (Destination) Util.lookup(ctx, destinationName, destinationType);
log.debug("Got destination " + destination + " from " + destinationName);
|
protected void | setupJMSProviderAdapter()Get the jms provider
String providerAdapterJNDI = spec.getProviderAdapterJNDI();
if (providerAdapterJNDI.startsWith("java:") == false)
providerAdapterJNDI = "java:" + providerAdapterJNDI;
log.debug("Retrieving the jms provider adapter " + providerAdapterJNDI + " for " + this);
adapter = (JMSProviderAdapter) Util.lookup(providerAdapterJNDI, JMSProviderAdapter.class);
log.debug("Using jms provider adapter " + adapter + " for " + this);
|
protected javax.jms.QueueConnection | setupQueueConnection(javax.naming.Context ctx, java.lang.String user, java.lang.String pass, java.lang.String clientID)Setup a Queue Connection
String queueFactoryRef = adapter.getQueueFactoryRef();
log.debug("Attempting to lookup queue connection factory " + queueFactoryRef);
QueueConnectionFactory qcf = (QueueConnectionFactory) Util.lookup(ctx, queueFactoryRef, QueueConnectionFactory.class);
log.debug("Got queue connection factory " + qcf + " from " + queueFactoryRef);
log.debug("Attempting to create queue connection with user " + user);
QueueConnection result;
if (qcf instanceof XAQueueConnectionFactory && isDeliveryTransacted)
{
XAQueueConnectionFactory xaqcf = (XAQueueConnectionFactory) qcf;
if (user != null)
result = xaqcf.createXAQueueConnection(user, pass);
else
result = xaqcf.createXAQueueConnection();
}
else
{
if (user != null)
result = qcf.createQueueConnection(user, pass);
else
result = qcf.createQueueConnection();
}
if (clientID != null)
result.setClientID(clientID);
result.setExceptionListener(this);
log.debug("Using queue connection " + result);
return result;
|
protected void | setupSessionPool()Setup the server session pool
pool = new JmsServerSessionPool(this);
log.debug("Created session pool " + pool);
log.debug("Starting session pool " + pool);
pool.start();
log.debug("Started session pool " + pool);
log.debug("Starting delivery " + connection);
connection.start();
log.debug("Started delivery " + connection);
|
protected javax.jms.TopicConnection | setupTopicConnection(javax.naming.Context ctx, java.lang.String user, java.lang.String pass, java.lang.String clientID)Setup a Topic Connection
String topicFactoryRef = adapter.getTopicFactoryRef();
log.debug("Attempting to lookup topic connection factory " + topicFactoryRef);
TopicConnectionFactory tcf = (TopicConnectionFactory) Util.lookup(ctx, topicFactoryRef, TopicConnectionFactory.class);
log.debug("Got topic connection factory " + tcf + " from " + topicFactoryRef);
log.debug("Attempting to create topic connection with user " + user);
TopicConnection result;
if (tcf instanceof XATopicConnectionFactory && isDeliveryTransacted)
{
XATopicConnectionFactory xatcf = (XATopicConnectionFactory) tcf;
if (user != null)
result = xatcf.createXATopicConnection(user, pass);
else
result = xatcf.createXATopicConnection();
}
else
{
if (user != null)
result = tcf.createTopicConnection(user, pass);
else
result = tcf.createTopicConnection();
}
if (clientID != null)
result.setClientID(clientID);
result.setExceptionListener(this);
log.debug("Using topic connection " + result);
return result;
|
public void | start()Start the activation
deliveryActive = new SynchronizedBoolean(true);
ra.getWorkManager().scheduleWork(new SetupActivation());
|
public void | stop()Stop the activation
deliveryActive.set(false);
teardown();
|
protected void | teardown()Teardown the activation
log.debug("Tearing down " + spec);
teardownSessionPool();
teardownConnection();
teardownDestination();
teardownDLQ();
log.debug("Tearing down complete " + this);
|
protected void | teardownConnection()Teardown the connection
try
{
if (connection != null)
{
log.debug("Closing the " + connection);
connection.close();
}
}
catch (Throwable t)
{
log.debug("Error closing the connection " + connection, t);
}
connection = null;
|
protected void | teardownDLQ()Teardown the DLQ
log.debug("Removing DLQ " + this);
try
{
if (dlqHandler != null)
dlqHandler.teardown();
}
catch (Throwable t)
{
log.debug("Error tearing down the DLQ " + dlqHandler, t);
}
dlqHandler = null;
|
protected void | teardownDestination()Teardown the destination
|
protected void | teardownSessionPool()Teardown the server session pool
try
{
if (connection != null)
{
log.debug("Stopping delivery " + connection);
connection.stop();
}
}
catch (Throwable t)
{
log.debug("Error stopping delivery " + connection, t);
}
try
{
if (pool != null)
{
log.debug("Stopping the session pool " + pool);
pool.stop();
}
}
catch (Throwable t)
{
log.debug("Error clearing the pool " + pool, t);
}
|
public java.lang.String | toString()
StringBuffer buffer = new StringBuffer();
buffer.append(Strings.defaultToString(this)).append('(");
buffer.append("spec=").append(Strings.defaultToString(spec));
buffer.append(" mepf=").append(Strings.defaultToString(endpointFactory));
buffer.append(" active=").append(deliveryActive.get());
if (destination != null)
buffer.append(" destination=").append(destination);
if (connection != null)
buffer.append(" connection=").append(connection);
if (pool != null)
buffer.append(" pool=").append(Strings.defaultToString(pool));
if (dlqHandler != null)
buffer.append(" dlq=").append(Strings.defaultToString(dlqHandler));
buffer.append(" transacted=").append(isDeliveryTransacted);
buffer.append(')");
return buffer.toString();
|