FileDocCategorySizeDatePackage
InternalManagedConnectionPool.javaAPI DocJBoss 4.2.123691Fri Jul 13 21:01:18 BST 2007org.jboss.resource.connectionmanager

InternalManagedConnectionPool

public class InternalManagedConnectionPool extends Object
The internal pool implementation
author
David Jencks
author
Adrian Brock
author
Weston Price
version
$Revision: 61055 $

Fields Summary
private final javax.resource.spi.ManagedConnectionFactory
mcf
The managed connection factory
private final ConnectionListenerFactory
clf
The connection listener factory
private final Subject
defaultSubject
The default subject
private final javax.resource.spi.ConnectionRequestInfo
defaultCri
The default connection request information
private final PoolParams
poolParams
The pooling parameters
private int
maxSize
Copy of the maximum size from the pooling parameters. Dynamic changes to this value are not compatible with the semaphore which cannot change be dynamically changed.
private ArrayList
cls
The available connection event listeners
private final EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore
permits
The permits used to control who can checkout a connection
private final Logger
log
The log
private final boolean
trace
Whether trace is enabled
private final Counter
connectionCounter
Stats
private final HashSet
checkedOut
The checked out connections
private boolean
started
Whether the pool has been started
private EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean
shutdown
Whether the pool has been shutdown
private volatile int
maxUsedConnections
the max connections ever checked out
Constructors Summary
protected InternalManagedConnectionPool(javax.resource.spi.ManagedConnectionFactory mcf, ConnectionListenerFactory clf, Subject subject, javax.resource.spi.ConnectionRequestInfo cri, PoolParams poolParams, Logger log)
Create a new internal pool

param
mcf the managed connection factory
param
subject the subject
param
cri the connection request information
param
poolParams the pooling parameters
param
log the log


                                     
       
                
   
      this.mcf = mcf;
      this.clf = clf;
      defaultSubject = subject;
      defaultCri = cri;
      this.poolParams = poolParams;
      this.maxSize = this.poolParams.maxSize;
      
      this.log = log;
      this.trace = log.isTraceEnabled();
      cls = new ArrayList(this.maxSize);
      permits = new FIFOSemaphore(this.maxSize);

      if(poolParams.prefill)
      {
         PoolFiller.fillPool(this);
         
      }
      
   
Methods Summary
private ConnectionListenercreateConnectionEventListener(javax.security.auth.Subject subject, javax.resource.spi.ConnectionRequestInfo cri)
Create a connection event listener

param
subject the subject
param
cri the connection request information
return
the new listener
throws
ResourceException for any error

      ManagedConnection mc = mcf.createManagedConnection(subject, cri);
      connectionCounter.inc();
      try
      {
         return clf.createConnectionListener(mc, this);
      }
      catch (ResourceException re)
      {
         connectionCounter.dec();
         mc.destroy();
         throw re;
      }
   
private voiddoDestroy(ConnectionListener cl)
Destroy a connection

param
cl the connection to destroy

      if (cl.getState() == ConnectionListener.DESTROYED)
      {
         log.trace("ManagedConnection is already destroyed " + cl);
         return;
      }

      connectionCounter.dec();
      cl.setState(ConnectionListener.DESTROYED);
      try
      {
         cl.getManagedConnection().destroy();
      }
      catch (Throwable t)
      {
         log.debug("Exception destroying ManagedConnection " + cl, t);
      }
   
public voidfillToMin()

      while (true)
      {
         // Get a permit - avoids a race when the pool is nearly full
         // Also avoids unnessary fill checking when all connections are checked out
         try
         {
            if (permits.attempt(poolParams.blockingTimeout))
            {
               try
               {
                  if (shutdown.get())
                     return;

                  // We already have enough connections
                  if (getMinSize() - connectionCounter.getGuaranteedCount() <= 0)
                     return;

                  // Create a connection to fill the pool
                  try
                  {
                     ConnectionListener cl = createConnectionEventListener(defaultSubject, defaultCri);
                     synchronized (cls)
                     {
                        if (trace)
                           log.trace("Filling pool cl=" + cl);
                        cls.add(cl);
                     }
                  }
                  catch (ResourceException re)
                  {
                     log.warn("Unable to fill pool ", re);
                     return;
                  }
               }
               finally
               {
                  permits.release();
               }
            }
         }
         catch (InterruptedException ignored)
         {
            log.trace("Interrupted while requesting permit in fillToMin");
         }
      }
   
public voidflush()

      ArrayList destroy = null;
      synchronized (cls)
      {
         if (trace)
            log.trace("Flushing pool checkedOut=" + checkedOut + " inPool=" + cls);

         // Mark checked out connections as requiring destruction
         for (Iterator i = checkedOut.iterator(); i.hasNext();)
         {
            ConnectionListener cl = (ConnectionListener) i.next();
            if (trace)
               log.trace("Flush marking checked out connection for destruction " + cl);
            cl.setState(ConnectionListener.DESTROY);
         }
         // Destroy connections in the pool
         while (cls.size() > 0)
         {
            ConnectionListener cl = (ConnectionListener) cls.remove(0);
            if (destroy == null)
               destroy = new ArrayList();
            destroy.add(cl);
         }
      }

      // We need to destroy some connections
      if (destroy != null)
      {
         for (int i = 0; i < destroy.size(); ++i)
         {
            ConnectionListener cl = (ConnectionListener) destroy.get(i);
            if (trace)
               log.trace("Destroying flushed connection " + cl);
            doDestroy(cl);
         }

         // We destroyed something, check the minimum.
         if (shutdown.get() == false && poolParams.minSize > 0)
            PoolFiller.fillPool(this);
      }
      
   
public longgetAvailableConnections()

      return permits.permits();
   
public ConnectionListenergetConnection(javax.security.auth.Subject subject, javax.resource.spi.ConnectionRequestInfo cri)
todo distinguish between connection dying while match called and bad match strategy. In latter case we should put it back in the pool.

      
      subject = (subject == null) ? defaultSubject : subject;
      cri = (cri == null) ? defaultCri : cri;
      long startWait = System.currentTimeMillis();
      try
      {
         if (permits.attempt(poolParams.blockingTimeout))
         {
            //We have a permit to get a connection. Is there one in the pool already?
            ConnectionListener cl = null;
            do
            {
               synchronized (cls)
               {
                  if (shutdown.get())
                  {
                     permits.release();
                     throw new ResourceException("The pool has been shutdown");
                  }

                  if (cls.size() > 0)
                  {
                     cl = (ConnectionListener) cls.remove(cls.size() - 1);
                     checkedOut.add(cl);
                     int size = (int) (maxSize - permits.permits());

                     //Update the maxUsedConnections
                     if (size > maxUsedConnections)
                        maxUsedConnections = size;
                  }
               }
               if (cl != null)
               {
                  //Yes, we retrieved a ManagedConnection from the pool. Does it match?
                  try
                  {
                     Object matchedMC = mcf.matchManagedConnections(Collections.singleton(cl.getManagedConnection()),
                           subject, cri);
                     if (matchedMC != null)
                     {
                        if (trace)
                           log.trace("supplying ManagedConnection from pool: " + cl);
                        cl.grantPermit(true);
                        return cl;
                     }

                     //Match did not succeed but no exception was thrown.
                     //Either we have the matching strategy wrong or the
                     //connection died while being checked.  We need to
                     //distinguish these cases, but for now we always
                     //destroy the connection.
                     log.warn("Destroying connection that could not be successfully matched: " + cl);
                     synchronized (cls)
                     {
                        checkedOut.remove(cl);
                     }
                     doDestroy(cl);
                     cl = null;
                                        
                  }
                  catch (Throwable t)
                  {
                     log.warn("Throwable while trying to match ManagedConnection, destroying connection: " + cl, t);
                     synchronized (cls)
                     {
                        checkedOut.remove(cl);
                     }
                     doDestroy(cl);
                     cl = null;
                  
                    
                  }
                  //We made it here, something went wrong and we should validate if we should continue attempting to acquire a connection
                  if(poolParams.useFastFail)
                  {
                     log.trace("Fast failing for connection attempt. No more attempts will be made to acquire connection from pool and a new connection will be created immeadiately");
                     break;
                  }
               
               }
            }
            while (cls.size() > 0);//end of do loop

            //OK, we couldnt find a working connection from the pool.  Make a new one.
            try
            {
               //No, the pool was empty, so we have to make a new one.
               cl = createConnectionEventListener(subject, cri);
               synchronized (cls)
               {
                  checkedOut.add(cl);
                  int size = (int) (maxSize - permits.permits());
                  if (size > maxUsedConnections)
                     maxUsedConnections = size;
               }

               //lack of synch on "started" probably ok, if 2 reads occur we will just
               //run fillPool twice, no harm done.
               if (started == false)
               {
                  started = true;
                  if (poolParams.minSize > 0)
                     PoolFiller.fillPool(this);
               }
               if (trace)
                  log.trace("supplying new ManagedConnection: " + cl);
               cl.grantPermit(true);
               return cl;
            }
            catch (Throwable t)
            {
               log.warn("Throwable while attempting to get a new connection: " + cl, t);
               //return permit and rethrow
               synchronized (cls)
               {
                  checkedOut.remove(cl);
               }
               permits.release();
               JBossResourceException.rethrowAsResourceException(
                     "Unexpected throwable while trying to create a connection: " + cl, t);
               throw new UnreachableStatementException();
            }
         }
         else
         {
            // we timed out
            throw new ResourceException("No ManagedConnections available within configured blocking timeout ( "
                  + poolParams.blockingTimeout + " [ms] )");
         }

      }
      catch (InterruptedException ie)
      {
         long end = System.currentTimeMillis() - startWait;
         throw new ResourceException("Interrupted while requesting permit! Waited " + end + " ms");
      }
   
public intgetConnectionCount()

      return connectionCounter.getCount();
   
public intgetConnectionCreatedCount()

      return connectionCounter.getCreatedCount();
   
public intgetConnectionDestroyedCount()

      return connectionCounter.getDestroyedCount();
   
public intgetConnectionInUseCount()

      return checkedOut.size();
   
public intgetMaxConnectionsInUseCount()

      return maxUsedConnections;
   
private intgetMinSize()
Guard against configurations or dynamic changes that may increase the minimum beyond the maximum

      if (poolParams.minSize > maxSize)
         return maxSize;
      return poolParams.minSize;
   
protected voidinitialize()
Initialize the pool

      if (poolParams.idleTimeout != 0)
         IdleRemover.registerPool(this, poolParams.idleTimeout);

      if (poolParams.backgroundValidation)
      {

         log.debug("Registering for background validation at interval " + poolParams.backgroundInterval);
         ConnectionValidator.registerPool(this, poolParams.backgroundInterval);

      }

   
private ConnectionListenerremoveForFrequencyCheck()


      log.debug("Checking for connection within frequency");

      ConnectionListener cl = null;

      for (Iterator iter = cls.iterator(); iter.hasNext();)
      {

         cl = (ConnectionListener) iter.next();
         long lastCheck = cl.getLastValidatedTime();

         if ((System.currentTimeMillis() - lastCheck) >= poolParams.backgroundInterval)
         {
            cls.remove(cl);
            break;

         }
         else
         {
            cl = null;
         }

      }

      return cl;
   
public voidremoveTimedOut()

      ArrayList destroy = null;
      long timeout = System.currentTimeMillis() - poolParams.idleTimeout;
      while (true)
      {
         synchronized (cls)
         {
            // Nothing left to destroy
            if (cls.size() == 0)
               break;

            // Check the first in the list
            ConnectionListener cl = (ConnectionListener) cls.get(0);
            if (cl.isTimedOut(timeout))
            {
               // We need to destroy this one
               cls.remove(0);
               if (destroy == null)
                  destroy = new ArrayList();
               destroy.add(cl);
            }
            else
            {
               //They were inserted chronologically, so if this one isn't timed out, following ones won't be either.
               break;
            }
         }
      }

      // We found some connections to destroy
      if (destroy != null)
      {
         for (int i = 0; i < destroy.size(); ++i)
         {
            ConnectionListener cl = (ConnectionListener) destroy.get(i);
            if (trace)
               log.trace("Destroying timedout connection " + cl);
            doDestroy(cl);
         }

         // We destroyed something, check the minimum.
         if (shutdown.get() == false && poolParams.minSize > 0)
            PoolFiller.fillPool(this);
      }
   
public voidreturnConnection(ConnectionListener cl, boolean kill)

      if (cl.getState() == ConnectionListener.DESTROYED)
      {
         log.trace("ManagedConnection is being returned after it was destroyed" + cl);
         if (cl.hasPermit())
         {
            // release semaphore
            cl.grantPermit(false);
            permits.release();
         }

         return;
      }

      if (trace)
         log.trace("putting ManagedConnection back into pool kill=" + kill + " cl=" + cl);
      try
      {
         cl.getManagedConnection().cleanup();
      }
      catch (ResourceException re)
      {
         log.warn("ResourceException cleaning up ManagedConnection: " + cl, re);
         kill = true;
      }

      // We need to destroy this one
      if (cl.getState() == ConnectionListener.DESTROY)
         kill = true;

      synchronized (cls)
      {
         checkedOut.remove(cl);

         // This is really an error
         if (kill == false && cls.size() >= poolParams.maxSize)
         {
            log.warn("Destroying returned connection, maximum pool size exceeded " + cl);
            kill = true;
         }

         // If we are destroying, check the connection is not in the pool
         if (kill)
         {
            // Adrian Brock: A resource adapter can asynchronously notify us that
            // a connection error occurred.
            // This could happen while the connection is not checked out.
            // e.g. JMS can do this via an ExceptionListener on the connection.
            // I have twice had to reinstate this line of code, PLEASE DO NOT REMOTE IT!
            cls.remove(cl);
         }
         // return to the pool
         else
         {
            cl.used();
            cls.add(cl);
         }

         if (cl.hasPermit())
         {
            // release semaphore
            cl.grantPermit(false);
            permits.release();
         }
      }

      if (kill)
      {
         if (trace)
            log.trace("Destroying returned connection " + cl);
         doDestroy(cl);
      }

   
private voidreturnForFrequencyCheck(ConnectionListener cl)


      log.debug("Returning for connection within frequency");

      cl.setLastValidatedTime(System.currentTimeMillis());
      cls.add(cl);

   
public voidshutdown()

      shutdown.set(true);
      IdleRemover.unregisterPool(this);
      ConnectionValidator.unRegisterPool(this);
      flush();
   
public voidshutdownWithoutClear()
For testing

      IdleRemover.unregisterPool(this);
      IdleRemover.waitForBackgroundThread();
      ConnectionValidator.unRegisterPool(this);
      ConnectionValidator.waitForBackgroundThread();

      fillToMin();
      shutdown.set(true);
   
public voidvalidateConnections()


      if (trace)
         log.trace("Attempting to  validate connections for pool " + this);

      if (permits.attempt(poolParams.blockingTimeout))
      {

         boolean destroyed = false;

         try
         {

            while (true)
            {

               ConnectionListener cl = null;

               synchronized (cls)
               {
                  if (cls.size() == 0)
                  {
                     break;
                  }

                  cl = removeForFrequencyCheck();

               }

               if (cl == null)
               {
                  break;
               }

               try
               {

                  Set candidateSet = Collections.singleton(cl.getManagedConnection());

                  if (mcf instanceof ValidatingManagedConnectionFactory)
                  {
                     ValidatingManagedConnectionFactory vcf = (ValidatingManagedConnectionFactory) mcf;
                     candidateSet = vcf.getInvalidConnections(candidateSet);

                     if (candidateSet != null && candidateSet.size() > 0)
                     {

                        if (cl.getState() != ConnectionListener.DESTROY)
                        {
                           doDestroy(cl);
                           destroyed = true;
                        }
                     }

                  }
                  else
                  {
                     log.warn("warning: background validation was specified with a non compliant ManagedConnectionFactory interface.");
                  }

               }
               finally
               {
                  if(!destroyed)
                  {
                     synchronized (cls)
                     {
                        returnForFrequencyCheck(cl);
                     }                     
                  }

               }

            }

         }
         finally
         {
            permits.release();

            if (destroyed && shutdown.get() == false && poolParams.minSize > 0)
            {
               PoolFiller.fillPool(this);
            }

         }

      }