FileDocCategorySizeDatePackage
QueuedTxLock.javaAPI DocJBoss 4.2.112484Fri Jul 13 21:02:26 BST 2007org.jboss.aspects.txlock

QueuedTxLock.java

/*
 * JBoss, Home of Professional Open Source.
 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
 * as indicated by the @author tags. See the copyright.txt file in the
 * distribution for a full listing of individual contributors.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */
package org.jboss.aspects.txlock;

import org.jboss.logging.Logger;
import org.jboss.util.deadlock.DeadlockDetector;
import org.jboss.util.deadlock.Resource;

import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
import java.util.HashMap;
import java.util.LinkedList;

/**
 * This class is holds threads awaiting the transactional lock to be free
 * in a fair FIFO transactional queue.  Non-transactional threads
 * are also put in this wait queue as well. This class pops the next waiting transaction from the queue
 * and notifies only those threads waiting associated with that transaction.
 *
 * As of 04/10/2002, you can now specify in jboss.xml method attributes that define
 * methods as read-only.  read-only methods(and read-only beans) will release transactional
 * locks at the end of the invocation.  This decreases likelyhood of deadlock and increases
 * performance.
 *
 * FIXME marcf: we should get solid numbers on this locking, bench in multi-thread environments
 * We need someone with serious SUN hardware to run this lock into the ground
 *
 * @author <a href="marc.fleury@jboss.org">Marc Fleury</a>
 * @author <a href="bill@burkecentral.com">Bill Burke</a>
 *
 * @version $Revision: 57186 $
 */
public class QueuedTxLock implements Resource
{
   public static final String TXLOCK = "TxLock";
   public static final String TIMEOUT = "timeout";

   private HashMap txLocks = new HashMap();
   private LinkedList txWaitQueue = new LinkedList();

   /** Transaction holding lock on bean */
   private Transaction tx = null;
   private boolean synched = false;
   private boolean isSynchronized = false;
   private Logger log = Logger.getLogger(this.getClass());

   public Object getResourceHolder()
   {
      return tx;
   }

   public void sync() throws InterruptedException
   {
      synchronized (this)
      {
         while (synched)
         {
            this.wait();
         }
         synched = true;
      }
   }

   public void releaseSync()
   {
      synchronized (this)
      {
         synched = false;
         this.notify();
      }
   }

   /**
    * The setTransaction associates a transaction with the lock.
    * The current transaction is associated by the schedule call.
    */
   public void setTransaction(Transaction tx)
   {
      this.tx = tx;
   }

   public Transaction getTransaction()
   {
      return tx;
   }


   private class TxLock
   {

      public Transaction waitingTx;
      public String threadName;
      public boolean isQueued;

      public TxLock(Transaction trans)
      {
         this.threadName = Thread.currentThread().toString();
         this.waitingTx = trans;
         this.isQueued = true;
      }

      public boolean equals(Object obj)
      {
         if (obj == this) return true;

         TxLock lock = (TxLock) obj;

         return lock.waitingTx.equals(this.waitingTx);
      }

      public int hashCode()
      {
         return waitingTx.hashCode();
      }

      public String toString()
      {
         StringBuffer buffer = new StringBuffer(100);
         buffer.append("TXLOCK waitingTx=").append(waitingTx);
         buffer.append(" thread=").append(threadName);
         buffer.append(" queued=").append(isQueued);
         return buffer.toString();
      }
   }

   protected TxLock getTxLock(Transaction miTx)
   {
      TxLock lock = null;
      TxLock key = new TxLock(miTx);
      lock = (TxLock) txLocks.get(key);
      if (lock == null)
      {
         txLocks.put(key, key);
         txWaitQueue.addLast(key);
         lock = key;
      }
      return lock;
   }

   protected boolean isTxExpired(Transaction miTx) throws Exception
   {
      if (miTx != null && miTx.getStatus() == Status.STATUS_MARKED_ROLLBACK)
      {
         return true;
      }
      return false;
   }


   public boolean lockNoWait(Transaction transaction) throws Exception
   {
      this.sync();
      if (log.isTraceEnabled())
         log.trace("lockNoWait tx=" + transaction + " " + toString());
      try
      {
         // And are we trying to enter with another transaction?
         if (getTransaction() != null && !getTransaction().equals(transaction))
         {
            return false;
         }
         setTransaction(transaction);
         return true;
      }
      finally
      {
         this.releaseSync();
      }
   }

   /**
    * doSchedule(Invocation)
    *
    * doSchedule implements a particular policy for scheduling the threads coming in.
    * There is always the spec required "serialization" but we can add custom scheduling in here
    *
    * Synchronizing on lock: a failure to get scheduled must result in a wait() call and a
    * release of the lock.  Schedulation must return with lock.
    *
    */
   public void schedule(Transaction miTx, org.jboss.aop.joinpoint.Invocation mi)
           throws Exception
   {
      boolean trace = log.isTraceEnabled();
      this.sync();
      try
      {
         if (trace) log.trace("Begin schedule");

         if (isTxExpired(miTx))
         {
            log.error("Saw rolled back tx=" + miTx);
            throw new RuntimeException("Transaction marked for rollback, possibly a timeout");
         }

         //Next test is independent of whether the context is locked or not, it is purely transactional
         // Is the instance involved with another transaction? if so we implement pessimistic locking
         waitForTx(mi, miTx, trace);
         if (!isSynchronized)
         {
            isSynchronized = true;
            miTx.registerSynchronization(new TxLockSynchronization());
         }
      }
      finally
      {
         this.releaseSync();
      }

   }

   /**
    * Wait until no other transaction is running with this lock.
    *
    * @return    Returns true if this thread was scheduled in txWaitQueue
    */
   protected void waitForTx(org.jboss.aop.joinpoint.Invocation mi, Transaction miTx, boolean trace) throws Exception
   {
      boolean wasScheduled = false;
      // Do we have a running transaction with the context?
      // We loop here until either until success or until transaction timeout
      // If we get out of the loop successfully, we can successfully
      // set the transaction on this puppy.
      TxLock txLock = null;
      while (getTransaction() != null &&
              // And are we trying to enter with another transaction?
              !getTransaction().equals(miTx))
      {
         // Check for a deadlock on every cycle
         try
         {
            DeadlockDetector.singleton.deadlockDetection(miTx, this);
         }
         catch (Exception e)
         {
            // We were queued, not any more
            if (txLock != null && txLock.isQueued)
            {
               txLocks.remove(txLock);
               txWaitQueue.remove(txLock);
            }
            throw e;
         }

         wasScheduled = true;
         // That's no good, only one transaction per context
         // Let's put the thread to sleep the transaction demarcation will wake them up
         if (trace) log.trace("Transactional contention on context miTx=" + miTx + " " + toString());

         if (txLock == null)
            txLock = getTxLock(miTx);

         if (trace) log.trace("Begin wait on " + txLock + " " + toString());

         // And lock the threads on the lock corresponding to the Tx in MI
         synchronized (txLock)
         {
            releaseSync();
            try
            {
               int txTimeout = 0;
               Integer timeout = (Integer) mi.getMetaData(TXLOCK, TIMEOUT);
               if (timeout != null) txTimeout = timeout.intValue();
               txLock.wait(txTimeout);
            }
            catch (InterruptedException ignored)
            {
            }
         } // end synchronized(txLock)

         this.sync();

         if (trace) log.trace("End wait on " + txLock + " " + toString());
         if (isTxExpired(miTx))
         {
            log.error(Thread.currentThread() + "Saw rolled back tx=" + miTx + " waiting for txLock");
            if (txLock.isQueued)
            {
               // Remove the TxLock from the queue because this thread is exiting.
               // Don't worry about notifying other threads that share the same transaction.
               // They will timeout and throw the below RuntimeException
               txLocks.remove(txLock);
               txWaitQueue.remove(txLock);
            }
            else if (getTransaction() != null && getTransaction().equals(miTx))
            {
               // We're not qu
               nextTransaction(trace);
            }
            if (miTx != null)
            {
               DeadlockDetector.singleton.removeWaiting(miTx);
            }
            throw new RuntimeException("Transaction marked for rollback, possibly a timeout");
         }
      } // end while(tx!=miTx)

      // If we get here, this means that we have the txlock
      if (!wasScheduled) setTransaction(miTx);
      return;
   }

   /*
    * nextTransaction()
    *
    * nextTransaction will
    * - set the current tx to null
    * - schedule the next transaction by notifying all threads waiting on the transaction
    * - setting the thread with the new transaction so there is no race with incoming calls
    */
   protected void nextTransaction(boolean trace)
   {
      if (!synched)
      {
         throw new IllegalStateException("do not call nextTransaction while not synched!");
      }

      setTransaction(null);
      // is there a waiting list?
      TxLock thelock = null;
      if (!txWaitQueue.isEmpty())
      {
         thelock = (TxLock) txWaitQueue.removeFirst();
         txLocks.remove(thelock);
         thelock.isQueued = false;
         // The new transaction is the next one, important to set it up to avoid race with
         // new incoming calls
         if (thelock.waitingTx != null)
            DeadlockDetector.singleton.removeWaiting(thelock.waitingTx);
         setTransaction(thelock.waitingTx);
         synchronized (thelock)
         {
            // notify All threads waiting on this transaction.
            // They will enter the methodLock wait loop.
            thelock.notifyAll();
         }
      }
      if (trace)
         log.trace("nextTransaction: " + thelock + " " + toString());
   }

   public void endTransaction()
   {
      boolean trace = log.isTraceEnabled();
      if (trace)
         log.trace("endTransaction: " + toString());
      nextTransaction(trace);
   }

   public void endInvocation(Transaction thetx)
   {
      if (log.isTraceEnabled())
         log.trace("endInvocation: miTx=" + thetx + " " + toString());
   }

   public String toString()
   {
      StringBuffer buffer = new StringBuffer(100);
      buffer.append(" hash=").append(hashCode());
      buffer.append(" tx=").append(getTransaction());
      buffer.append(" synched=").append(synched);
      buffer.append(" queue=").append(txWaitQueue);
      return buffer.toString();
   }

   private final class TxLockSynchronization implements Synchronization
   {
      public void beforeCompletion()
      {
      }

      public void afterCompletion(int status)
      {
         try
         {
            sync();
         }
         catch (InterruptedException ignored)
         {
         }
         isSynchronized = false;
         endTransaction();
         releaseSync();
      }
   }
}