FileDocCategorySizeDatePackage
MessageInflowLocalProxy.javaAPI DocJBoss 4.2.116506Fri Jul 13 20:53:56 BST 2007org.jboss.ejb3.mdb.inflow

MessageInflowLocalProxy

public class MessageInflowLocalProxy extends Object implements InvocationHandler
version
$Revision: 60233 $
author
William DeCoste

Fields Summary
private static final Logger
log
public static final String
MESSAGE_ENDPOINT_FACTORY
The key for the factory
public static final String
MESSAGE_ENDPOINT_XARESOURCE
The key for the xa resource
private boolean
trace
Whether trace is enabled
private String
cachedProxyString
Cached version of our proxy string
protected EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean
released
Whether this proxy has been released
protected boolean
delivered
Whether we have delivered a message
protected Thread
inUseThread
The in use thread
protected ClassLoader
oldClassLoader
The old classloader of the thread
protected Transaction
transaction
Any transaction we started
protected Transaction
suspended
Any suspended transaction
private JBossMessageEndpointFactory
endpointFactory
The message endpoint factory
private XAResource
resource
private javax.resource.spi.endpoint.MessageEndpointFactory
messageEndpointFactory
org.jboss.ejb3.mdb.MessagingContainer
container
Constructors Summary
protected MessageInflowLocalProxy(org.jboss.ejb3.mdb.MessagingContainer container)


     
   
      this.container = container;
   
Methods Summary
protected voidafter(java.lang.Object proxy)
After delivery processing.

param
mi the invocation
throws
Throwable for any error

      // Called out of sequence
      if (oldClassLoader == null)
         throw new IllegalStateException("afterDelivery without a previous beforeDelivery for message endpoint " + getProxyString(proxy));

      // Finish this delivery committing if we can
      try
      {
         finish("afterDelivery", proxy, true);
      }
      catch (Throwable t)
      {
         throw new ResourceException(t);
      }
   
protected voidbefore(java.lang.Object proxy, org.jboss.ejb3.mdb.MessagingContainer container, java.lang.reflect.Method method, java.lang.Object[] args)
Before delivery processing.

param
mi the invocation
throws
Throwable for any error

      // Called out of sequence
      if (oldClassLoader != null)
         throw new IllegalStateException("Missing afterDelivery from the previous beforeDelivery for message endpoint " + getProxyString(proxy));

      if (trace)
         log.trace("MessageEndpoint " + getProxyString(proxy) + " released");

      // Set the classloader
      oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread);
      SetTCLAction.setContextClassLoader(inUseThread, container.getClassloader());
      if (trace)
         log.trace("MessageEndpoint " + getProxyString(proxy) + " set context classloader to " + container.getClassloader());

      // start any transaction
      try
      {
         // Is the delivery transacted?
         MethodInfo methodInfo = container.getMethodInfo((Method)args[0]);
         boolean isTransacted = messageEndpointFactory.isDeliveryTransacted(methodInfo.getAdvisedMethod());

         startTransaction("beforeDelivery", proxy, container, method, args, isTransacted);
      }
      catch (Throwable t)
      {
         resetContextClassLoader(proxy);
         throw new ResourceException(t);
      }
   
protected java.lang.Objectdelivery(java.lang.Object proxy, org.jboss.ejb3.mdb.MessagingContainer container, java.lang.reflect.Method method, java.lang.Object[] args)
Delivery.

param
mi the invocation
return
the result of the delivery
throws
Throwable for any error

      // Have we already delivered a message?
      if (delivered)
         throw new IllegalStateException("Multiple message delivery between before and after delivery is not allowed for message endpoint " + getProxyString(proxy));

      if (trace)
         log.trace("MessageEndpoint " + getProxyString(proxy) + " delivering");
      
      // Mark delivery if beforeDelivery was invoked
      if (oldClassLoader != null)
         delivered = true;

      boolean commit = true;
      // Is the delivery transacted?
      MethodInfo methodInfo = container.getMethodInfo(method);

      try
      {
         // Check for starting a transaction
         if (oldClassLoader == null)
         {
            boolean isTransacted = messageEndpointFactory.isDeliveryTransacted(methodInfo.getAdvisedMethod());
            startTransaction("delivery", proxy, container, method, args, isTransacted);
         }
         return container.localInvoke(methodInfo, args);
      }
      catch (Throwable t)
      {
         if (trace)
            log.trace("MessageEndpoint " + getProxyString(proxy) + " delivery error", t);
         if (t instanceof Error || t instanceof RuntimeException)
         {
            if (transaction != null)
               transaction.setRollbackOnly();
            commit = false;
         }
         throw t;
      }
      finally
      {
         // No before/after delivery, end any transaction and release the lock
         if (oldClassLoader == null)
         {
            try
            {
               // Finish any transaction we started
               endTransaction(proxy, commit);
            }
            finally
            {
               releaseThreadLock(proxy);
            }
         }
      }
   
protected voidendTransaction(java.lang.Object proxy, boolean commit)
End the transaction

param
mi the invocation
param
commit whether to try to commit
throws
Throwable for any error

      TransactionManager tm = null;
      Transaction currentTx = null;
      try
      {
         // If we started the transaction, commit it
         if (transaction != null)
         {
            tm = TxUtil.getTransactionManager(); //getContainer(mi).getTransactionManager();
            currentTx = tm.getTransaction();
            
            // Suspend any bad transaction - there is bug somewhere, but we will try to tidy things up
            if (currentTx != null && currentTx.equals(transaction) == false)
            {
               log.warn("Current transaction " + currentTx + " is not the expected transaction.");
               tm.suspend();
               tm.resume(transaction);
            }
            else
            {
               // We have the correct transaction
               currentTx = null;
            }
            
            // Commit or rollback depending on the status
            if (commit == false || transaction.getStatus() == Status.STATUS_MARKED_ROLLBACK)
            {
               if (trace)
                  log.trace("MessageEndpoint " + getProxyString(proxy) + " rollback");
               tm.rollback();
            }
            else
            {
               if (trace)
                  log.trace("MessageEndpoint " + getProxyString(proxy) + " commit");
               tm.commit();
            }
         }

         // If we suspended the incoming transaction, resume it
         if (suspended != null)
         {
            try
            {
               tm = TxUtil.getTransactionManager(); //getContainer(mi).getTransactionManager();
               tm.resume(suspended);
            }
            finally
            {
               suspended = null;
            }
         }
      }
      finally
      {
         // Resume any suspended transaction
         if (currentTx != null)
         {
            try
            {
               tm.resume(currentTx);
            }
            catch (Throwable t)
            {
               log.warn("MessageEndpoint " + getProxyString(proxy) + " failed to resume old transaction " + currentTx);
               
            }
         }
      }
   
protected voidfinish(java.lang.String context, java.lang.Object proxy, boolean commit)
Finish the current delivery

param
context the lifecycle method
param
mi the invocation
param
commit whether to commit
throws
Throwable for any error

      try
      {
         endTransaction(proxy, commit);
      }
      finally
      {
         // Reset delivered flag
         delivered = false;
         // Change back to the original context classloader
         resetContextClassLoader(proxy);
         // We no longer hold the lock
         releaseThreadLock(proxy);
      }
   
protected org.jboss.ejb3.mdb.MessagingContainergetContainer(org.jboss.aop.joinpoint.Invocation mi)
Get the container

return
the container

      return getMessageEndpointFactory(mi).getContainer();
   
protected JBossMessageEndpointFactorygetMessageEndpointFactory(org.jboss.aop.joinpoint.Invocation invocation)
Get the message endpoint factory

return
the message endpoint factory

      if (endpointFactory == null)
      {
         MethodInvocation mi = (MethodInvocation)invocation;
         endpointFactory = (JBossMessageEndpointFactory) mi.getResponseAttachment(MESSAGE_ENDPOINT_FACTORY);
      }
      return endpointFactory;
   
protected java.lang.StringgetProxyString(java.lang.Object proxy)
Get our proxy's string value.

param
mi the invocation
return
the string

      if (cachedProxyString == null)
         cachedProxyString = container.getEjbName();
      return cachedProxyString;
   
public java.lang.Objectinvoke(java.lang.Object proxy, java.lang.reflect.Method method, java.lang.Object[] args)

   
      // Are we still useable?
      if (released.get())
         throw new IllegalStateException("This message endpoint + " + getProxyString(proxy) + " has been released");

      // Concurrent invocation?
      Thread currentThread = Thread.currentThread();
      if (inUseThread != null && inUseThread.equals(currentThread) == false)
         throw new IllegalStateException("This message endpoint + " + getProxyString(proxy) + " is already in use by another thread " + inUseThread);
      inUseThread = currentThread;
      
      if (trace)
         log.trace("MessageEndpoint " + getProxyString(proxy) + " in use by " + method + " " + inUseThread);
      
      // Which operation?
      if (method.getName().equals("release"))
      {
         release(proxy);
         return null;
      }
      else if (method.getName().equals("beforeDelivery"))
      {
         before(proxy, container, method, args);
         return null;
      }
      else if (method.getName().equals("afterDelivery"))
      {
         after(proxy);
         return null;
      }
      else
         return delivery(proxy, container, method, args);
   
protected voidrelease(java.lang.Object proxy)
Release this message endpoint.

param
mi the invocation
throws
Throwable for any error

      // We are now released
      released.set(true);

      if (trace)
         log.trace("MessageEndpoint " + getProxyString(proxy) + " released");
      
      // Tidyup any outstanding delivery
      if (oldClassLoader != null)
      {
         try
         {
            finish("release", proxy, false);
         }
         catch (Throwable t)
         {
            log.warn("Error in release ", t);
         }
      }
   
protected voidreleaseThreadLock(java.lang.Object proxy)
Release the thread lock

param
mi the invocation

      if (trace)
         log.trace("MessageEndpoint " + getProxyString(proxy) + " no longer in use by " + inUseThread);
      inUseThread = null;
   
protected voidresetContextClassLoader(java.lang.Object proxy)
Reset the context classloader

param
mi the invocation

      if (trace)
         log.trace("MessageEndpoint " + getProxyString(proxy) + " reset classloader " + oldClassLoader);
      SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader);
      oldClassLoader = null;
   
public voidsetMessageEndpointFactory(javax.resource.spi.endpoint.MessageEndpointFactory messageEndpointFactory)

      this.messageEndpointFactory = messageEndpointFactory;
   
public voidsetXaResource(javax.transaction.xa.XAResource resource)

      this.resource = resource;
   
protected voidstartTransaction(java.lang.String context, java.lang.Object proxy, org.jboss.ejb3.mdb.MessagingContainer container, java.lang.reflect.Method m, java.lang.Object[] args, boolean isTransacted)
Start a transaction

param
context the lifecycle method
param
mi the invocation
param
container the container
throws
Throwable for any error

 
      Method method;
      
      // Normal delivery      
      if ("delivery".equals(context))
         method = m;
      // Before delivery
      else
         method = (Method)args[0];

      if (trace)
         log.trace("MessageEndpoint " + getProxyString(proxy) + " " + context + " method=" + method + " xaResource=" + resource + " transacted=" + isTransacted);

      // Get the transaction status
      TransactionManager tm = TxUtil.getTransactionManager(); //container.getTransactionManager();
      suspended = tm.suspend();

      if (trace)
         log.trace("MessageEndpoint " + getProxyString(proxy) + " " + context + " currentTx=" + suspended);

      // Delivery is transacted
      if (isTransacted)
      {
         // No transaction means we start a new transaction and enlist the resource
         if (suspended == null)
         {
            tm.begin();
            transaction = tm.getTransaction();
            if (trace)
               log.trace("MessageEndpoint " + getProxyString(proxy) + " started transaction=" + transaction);
      
            // Enlist the XAResource in the transaction
            if (resource != null)
            {
               transaction.enlistResource(resource);
               if (trace)
                  log.trace("MessageEndpoint " + getProxyString(proxy) + " enlisted=" + resource);
            }
         }
         else
         {
            // If there is already a transaction we ignore the XAResource (by spec 12.5.9)
            try
            {
               tm.resume(suspended);
            }
            finally
            {
               suspended = null;
               if (trace)
                  log.trace("MessageEndpoint " + getProxyString(proxy) + " transaction=" + suspended + " already active, IGNORED=" + resource);
            }
         }
      }
   
public java.lang.StringtoString()

      return container.getEjbName().toString();