Fields Summary |
---|
private static final Logger | log |
public static final String | MESSAGE_ENDPOINT_FACTORYThe key for the factory |
public static final String | MESSAGE_ENDPOINT_XARESOURCEThe key for the xa resource |
private boolean | traceWhether trace is enabled |
private String | cachedProxyStringCached version of our proxy string |
protected EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean | releasedWhether this proxy has been released |
protected boolean | deliveredWhether we have delivered a message |
protected Thread | inUseThreadThe in use thread |
protected ClassLoader | oldClassLoaderThe old classloader of the thread |
protected Transaction | transactionAny transaction we started |
protected Transaction | suspendedAny suspended transaction |
private JBossMessageEndpointFactory | endpointFactoryThe message endpoint factory |
private XAResource | resource |
private javax.resource.spi.endpoint.MessageEndpointFactory | messageEndpointFactory |
org.jboss.ejb3.mdb.MessagingContainer | container |
Methods Summary |
---|
protected void | after(java.lang.Object proxy)After delivery processing.
// Called out of sequence
if (oldClassLoader == null)
throw new IllegalStateException("afterDelivery without a previous beforeDelivery for message endpoint " + getProxyString(proxy));
// Finish this delivery committing if we can
try
{
finish("afterDelivery", proxy, true);
}
catch (Throwable t)
{
throw new ResourceException(t);
}
|
protected void | before(java.lang.Object proxy, org.jboss.ejb3.mdb.MessagingContainer container, java.lang.reflect.Method method, java.lang.Object[] args)Before delivery processing.
// Called out of sequence
if (oldClassLoader != null)
throw new IllegalStateException("Missing afterDelivery from the previous beforeDelivery for message endpoint " + getProxyString(proxy));
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " released");
// Set the classloader
oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread);
SetTCLAction.setContextClassLoader(inUseThread, container.getClassloader());
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " set context classloader to " + container.getClassloader());
// start any transaction
try
{
// Is the delivery transacted?
MethodInfo methodInfo = container.getMethodInfo((Method)args[0]);
boolean isTransacted = messageEndpointFactory.isDeliveryTransacted(methodInfo.getAdvisedMethod());
startTransaction("beforeDelivery", proxy, container, method, args, isTransacted);
}
catch (Throwable t)
{
resetContextClassLoader(proxy);
throw new ResourceException(t);
}
|
protected java.lang.Object | delivery(java.lang.Object proxy, org.jboss.ejb3.mdb.MessagingContainer container, java.lang.reflect.Method method, java.lang.Object[] args)Delivery.
// Have we already delivered a message?
if (delivered)
throw new IllegalStateException("Multiple message delivery between before and after delivery is not allowed for message endpoint " + getProxyString(proxy));
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " delivering");
// Mark delivery if beforeDelivery was invoked
if (oldClassLoader != null)
delivered = true;
boolean commit = true;
// Is the delivery transacted?
MethodInfo methodInfo = container.getMethodInfo(method);
try
{
// Check for starting a transaction
if (oldClassLoader == null)
{
boolean isTransacted = messageEndpointFactory.isDeliveryTransacted(methodInfo.getAdvisedMethod());
startTransaction("delivery", proxy, container, method, args, isTransacted);
}
return container.localInvoke(methodInfo, args);
}
catch (Throwable t)
{
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " delivery error", t);
if (t instanceof Error || t instanceof RuntimeException)
{
if (transaction != null)
transaction.setRollbackOnly();
commit = false;
}
throw t;
}
finally
{
// No before/after delivery, end any transaction and release the lock
if (oldClassLoader == null)
{
try
{
// Finish any transaction we started
endTransaction(proxy, commit);
}
finally
{
releaseThreadLock(proxy);
}
}
}
|
protected void | endTransaction(java.lang.Object proxy, boolean commit)End the transaction
TransactionManager tm = null;
Transaction currentTx = null;
try
{
// If we started the transaction, commit it
if (transaction != null)
{
tm = TxUtil.getTransactionManager(); //getContainer(mi).getTransactionManager();
currentTx = tm.getTransaction();
// Suspend any bad transaction - there is bug somewhere, but we will try to tidy things up
if (currentTx != null && currentTx.equals(transaction) == false)
{
log.warn("Current transaction " + currentTx + " is not the expected transaction.");
tm.suspend();
tm.resume(transaction);
}
else
{
// We have the correct transaction
currentTx = null;
}
// Commit or rollback depending on the status
if (commit == false || transaction.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " rollback");
tm.rollback();
}
else
{
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " commit");
tm.commit();
}
}
// If we suspended the incoming transaction, resume it
if (suspended != null)
{
try
{
tm = TxUtil.getTransactionManager(); //getContainer(mi).getTransactionManager();
tm.resume(suspended);
}
finally
{
suspended = null;
}
}
}
finally
{
// Resume any suspended transaction
if (currentTx != null)
{
try
{
tm.resume(currentTx);
}
catch (Throwable t)
{
log.warn("MessageEndpoint " + getProxyString(proxy) + " failed to resume old transaction " + currentTx);
}
}
}
|
protected void | finish(java.lang.String context, java.lang.Object proxy, boolean commit)Finish the current delivery
try
{
endTransaction(proxy, commit);
}
finally
{
// Reset delivered flag
delivered = false;
// Change back to the original context classloader
resetContextClassLoader(proxy);
// We no longer hold the lock
releaseThreadLock(proxy);
}
|
protected org.jboss.ejb3.mdb.MessagingContainer | getContainer(org.jboss.aop.joinpoint.Invocation mi)Get the container
return getMessageEndpointFactory(mi).getContainer();
|
protected JBossMessageEndpointFactory | getMessageEndpointFactory(org.jboss.aop.joinpoint.Invocation invocation)Get the message endpoint factory
if (endpointFactory == null)
{
MethodInvocation mi = (MethodInvocation)invocation;
endpointFactory = (JBossMessageEndpointFactory) mi.getResponseAttachment(MESSAGE_ENDPOINT_FACTORY);
}
return endpointFactory;
|
protected java.lang.String | getProxyString(java.lang.Object proxy)Get our proxy's string value.
if (cachedProxyString == null)
cachedProxyString = container.getEjbName();
return cachedProxyString;
|
public java.lang.Object | invoke(java.lang.Object proxy, java.lang.reflect.Method method, java.lang.Object[] args)
// Are we still useable?
if (released.get())
throw new IllegalStateException("This message endpoint + " + getProxyString(proxy) + " has been released");
// Concurrent invocation?
Thread currentThread = Thread.currentThread();
if (inUseThread != null && inUseThread.equals(currentThread) == false)
throw new IllegalStateException("This message endpoint + " + getProxyString(proxy) + " is already in use by another thread " + inUseThread);
inUseThread = currentThread;
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " in use by " + method + " " + inUseThread);
// Which operation?
if (method.getName().equals("release"))
{
release(proxy);
return null;
}
else if (method.getName().equals("beforeDelivery"))
{
before(proxy, container, method, args);
return null;
}
else if (method.getName().equals("afterDelivery"))
{
after(proxy);
return null;
}
else
return delivery(proxy, container, method, args);
|
protected void | release(java.lang.Object proxy)Release this message endpoint.
// We are now released
released.set(true);
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " released");
// Tidyup any outstanding delivery
if (oldClassLoader != null)
{
try
{
finish("release", proxy, false);
}
catch (Throwable t)
{
log.warn("Error in release ", t);
}
}
|
protected void | releaseThreadLock(java.lang.Object proxy)Release the thread lock
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " no longer in use by " + inUseThread);
inUseThread = null;
|
protected void | resetContextClassLoader(java.lang.Object proxy)Reset the context classloader
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " reset classloader " + oldClassLoader);
SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
oldClassLoader = null;
|
public void | setMessageEndpointFactory(javax.resource.spi.endpoint.MessageEndpointFactory messageEndpointFactory)
this.messageEndpointFactory = messageEndpointFactory;
|
public void | setXaResource(javax.transaction.xa.XAResource resource)
this.resource = resource;
|
protected void | startTransaction(java.lang.String context, java.lang.Object proxy, org.jboss.ejb3.mdb.MessagingContainer container, java.lang.reflect.Method m, java.lang.Object[] args, boolean isTransacted)Start a transaction
Method method;
// Normal delivery
if ("delivery".equals(context))
method = m;
// Before delivery
else
method = (Method)args[0];
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " " + context + " method=" + method + " xaResource=" + resource + " transacted=" + isTransacted);
// Get the transaction status
TransactionManager tm = TxUtil.getTransactionManager(); //container.getTransactionManager();
suspended = tm.suspend();
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " " + context + " currentTx=" + suspended);
// Delivery is transacted
if (isTransacted)
{
// No transaction means we start a new transaction and enlist the resource
if (suspended == null)
{
tm.begin();
transaction = tm.getTransaction();
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " started transaction=" + transaction);
// Enlist the XAResource in the transaction
if (resource != null)
{
transaction.enlistResource(resource);
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " enlisted=" + resource);
}
}
else
{
// If there is already a transaction we ignore the XAResource (by spec 12.5.9)
try
{
tm.resume(suspended);
}
finally
{
suspended = null;
if (trace)
log.trace("MessageEndpoint " + getProxyString(proxy) + " transaction=" + suspended + " already active, IGNORED=" + resource);
}
}
}
|
public java.lang.String | toString()
return container.getEjbName().toString();
|