FileDocCategorySizeDatePackage
StatefulTreeCache.javaAPI DocJBoss 4.2.121683Fri Jul 13 20:53:48 BST 2007org.jboss.ejb3.cache.tree

StatefulTreeCache.java

/*
 * JBoss, Home of Professional Open Source.
 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
 * as indicated by the @author tags. See the copyright.txt file in the
 * distribution for a full listing of individual contributors.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */
package org.jboss.ejb3.cache.tree;

import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.Map;

import java.util.Set;

import javax.ejb.EJBException;
import javax.ejb.NoSuchEJBException;
import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.jboss.cache.xml.XmlHelper;
import org.jboss.cache.eviction.RegionManager;
import org.jboss.cache.eviction.Region;
import org.jboss.cache.marshall.RegionNotFoundException;
import org.jboss.cache.CacheException;
import org.jboss.cache.AbstractTreeCacheListener;
import org.jboss.cache.DataNode;
import org.jboss.cache.TreeCache;
import org.jboss.cache.TreeCacheMBean;
import org.jboss.ejb3.Container;
import org.jboss.ejb3.EJBContainer;
import org.jboss.ejb3.Pool;
import org.jboss.ejb3.cache.ClusteredStatefulCache;
import org.jboss.ejb3.stateful.NestedStatefulBeanContext;
import org.jboss.ejb3.stateful.ProxiedStatefulBeanContext;
import org.jboss.ejb3.stateful.StatefulBeanContext;
import org.jboss.mx.util.MBeanProxyExt;
import org.jboss.mx.util.MBeanServerLocator;
import org.jboss.util.id.GUID;
import org.jboss.logging.Logger;
import org.jboss.annotation.ejb.cache.tree.CacheConfig;
import org.w3c.dom.Element;

import org.jboss.cache.Fqn;
import org.jboss.cache.config.Option;

/**
 * Clustered SFSB cache that uses JBoss Cache to cache and replicate
 * bean contexts.
 *
 * @author <a href="mailto:bill@jboss.org">Bill Burke</a>
 * @author Brian Stansberry
 * 
 * @version $Revision: 62818 $
 */
public class StatefulTreeCache implements ClusteredStatefulCache
{
   private static final int FQN_SIZE = 3; // depth of fqn that we store the session in.
   private static final int DEFAULT_BUCKET_COUNT = 100;

   private static final String[] DEFAULT_HASH_BUCKETS = new String[DEFAULT_BUCKET_COUNT];
   
   private static Option LOCAL_ONLY_OPTION = new Option();
   private static Option GRAVITATE_OPTION = new Option();
   static
   {
      LOCAL_ONLY_OPTION.setCacheModeLocal(true);
      GRAVITATE_OPTION.setForceDataGravitation(true);
      
      for (int i = 0; i < DEFAULT_HASH_BUCKETS.length; i++)
      {
         DEFAULT_HASH_BUCKETS[i] = String.valueOf(i);
      }
   }
   
   private ThreadLocal<Boolean> localActivity = new ThreadLocal<Boolean>();
   private Logger log = Logger.getLogger(StatefulTreeCache.class);
   private Pool pool;
   private WeakReference<ClassLoader> classloader;
   private TreeCache cache;
   private Fqn cacheNode;
   private ClusteredStatefulCacheListener listener;
   private RegionManager evictRegionManager;
   public static long MarkInUseWaitTime = 15000;
   protected String[] hashBuckets = DEFAULT_HASH_BUCKETS;
   protected int createCount = 0;
   protected int passivatedCount = 0;
   protected int removeCount = 0;
   protected long removalTimeout = 0; 
   protected RemovalTimeoutTask removalTask = null;
   protected boolean running = true;
   protected Map<Object, Long> beans = new ConcurrentHashMap<Object, Long>();
   protected EJBContainer ejbContainer;

   public StatefulBeanContext create()
   {
      StatefulBeanContext ctx = null;
      try
      {
         ctx = (StatefulBeanContext) pool.get();
         if (log.isTraceEnabled())
         {
            log.trace("Caching context " + ctx.getId() + " of type " + ctx.getClass());
         }
         putInCache(ctx);
         ctx.setInUse(true);
         ctx.lastUsed = System.currentTimeMillis();
         ++createCount;
         beans.put(ctx.getId(), ctx.lastUsed);
      }
      catch (EJBException e)
      {
         throw e;
      }
      catch (Exception e)
      {
         throw new EJBException(e);
      }
      return ctx;
   }

   public StatefulBeanContext create(Class[] initTypes, Object[] initValues)
   {
      StatefulBeanContext ctx = null;
      try
      {
         ctx = (StatefulBeanContext) pool.get(initTypes, initValues);
         if (log.isTraceEnabled())
         {
            log.trace("Caching context " + ctx.getId() + " of type " + ctx.getClass());
         }
         putInCache(ctx);
         ctx.setInUse(true);
         ctx.lastUsed = System.currentTimeMillis();
         ++createCount;
         beans.put(ctx.getId(), ctx.lastUsed);
      }
      catch (EJBException e)
      {
         throw e;
      }
      catch (Exception e)
      {
         throw new EJBException(e);
      }
      return ctx;
   }

   public StatefulBeanContext get(Object key) throws EJBException
   {
      return get(key, true);
   }
   
   public StatefulBeanContext get(Object key, boolean markInUse) throws EJBException
   {
      StatefulBeanContext entry = null;
      Fqn id = getFqn(key);
      Boolean active = localActivity.get();
      try
      {
         localActivity.set(Boolean.TRUE);
         Option opt = new Option();
         opt.setForceDataGravitation(true);
         entry = (StatefulBeanContext) cache.get(id, "bean", opt);
      }
      catch (CacheException e)
      {
         RuntimeException re = convertToRuntimeException(e);
         throw re;
      }
      finally
      {
         localActivity.set(active);
      }
      
      if (entry == null)
      {
         throw new NoSuchEJBException("Could not find stateful bean: " + key);
      }
      else if (markInUse && entry.isRemoved())
      {
         throw new NoSuchEJBException("Could not find stateful bean: " + key + 
                                      " (bean was marked as removed)");
      }
      
      entry.postReplicate();
      
      if (markInUse)
      {
         entry.setInUse(true);
         
         // Mark the Fqn telling the eviction thread not to passivate it yet.
         // Note the Fqn we use is relative to the region!
         evictRegionManager.markNodeCurrentlyInUse(id, MarkInUseWaitTime);
         entry.lastUsed = System.currentTimeMillis();
         beans.put(key, entry.lastUsed);
      }
      
      if(log.isTraceEnabled())
      {
         log.trace("get: retrieved bean with cache id " +id.toString());
      }
      
      return entry;
   }

   public void remove(Object key)
   {
      Fqn id = getFqn(key);
      try
      {
         if(log.isTraceEnabled())
         {
            log.trace("remove: cache id " +id.toString());
         }
         
         Option opt = new Option();
         opt.setForceDataGravitation(true);
         StatefulBeanContext ctx = (StatefulBeanContext) cache.get(id, "bean", opt); 
         
         if (ctx != null)
         {
            if (!ctx.isRemoved())
               pool.remove(ctx);
            
            if (ctx.getCanRemoveFromCache())
            {
               // Do a cluster-wide removal of the ctx
               cache.remove(id);
            }
            else 
            {
               // We can't remove the ctx as it contains live nested beans
               // But, we must replicate it so other nodes know the parent is removed!
               putInCache(ctx);
            }
            
            ++removeCount;
            beans.remove(key);
         }
      }
      catch (CacheException e)
      {
         RuntimeException re = convertToRuntimeException(e);
         throw re;
      }
   }

   public void finished(StatefulBeanContext ctx)
   {
      synchronized (ctx)
      {
         ctx.setInUse(false);
         ctx.lastUsed = System.currentTimeMillis();
         beans.put(ctx.getId(), ctx.lastUsed);
         // OK, it is free to passivate now.
         evictRegionManager.unmarkNodeCurrentlyInUse(getFqn(ctx.getId()));
      }
   }

   public void replicate(StatefulBeanContext ctx)
   {
      // StatefulReplicationInterceptor should only pass us the ultimate
      // parent context for a tree of nested beans, which should always be
      // a standard StatefulBeanContext
      if (ctx instanceof NestedStatefulBeanContext)
      {
         throw new IllegalArgumentException("Received unexpected replicate call for nested context " + ctx.getId());
      }
      
      try
      {
         putInCache(ctx);
      }
      catch (CacheException e)
      {
         RuntimeException re = convertToRuntimeException(e);
         throw re;
      }
   }

   public void initialize(Container container) throws Exception
   {
      this.ejbContainer = (EJBContainer) container;
      
      log = Logger.getLogger(getClass().getName() + "." + this.ejbContainer.getEjbName());
      
      this.pool = this.ejbContainer.getPool();
      ClassLoader cl = this.ejbContainer.getClassloader();
      this.classloader = new WeakReference<ClassLoader>(cl);
      
      CacheConfig config = (CacheConfig) this.ejbContainer.resolveAnnotation(CacheConfig.class);
      MBeanServer server = MBeanServerLocator.locateJBoss();
      ObjectName cacheON = new ObjectName(config.name());
      TreeCacheMBean mbean = (TreeCacheMBean) MBeanProxyExt.create(TreeCacheMBean.class, cacheON, server);
      cache = (TreeCache) mbean.getInstance();
      
      cacheNode = new Fqn(new Object[] { this.ejbContainer.getDeploymentQualifiedName() });

      // Try to create an eviction region per ejb
      evictRegionManager = cache.getEvictionRegionManager();
      Element element = getElementConfig(cacheNode.toString(), config.idleTimeoutSeconds(),
              config.maxSize());
      Region region = evictRegionManager.createRegion(cacheNode, element);
      
      cache.registerClassLoader(cacheNode.toString(), cl);
      cache.activateRegion(cacheNode.toString());
      
      log.debug("initialize(): create eviction region: " +region + " for ejb: " +this.ejbContainer.getEjbName());
   
      removalTimeout = config.removalTimeoutSeconds();
      if (removalTimeout > 0)
         removalTask = new RemovalTimeoutTask("SFSB Removal Thread - " + this.ejbContainer.getObjectName().getCanonicalName());
   }

   protected Element getElementConfig(String regionName, long timeToLiveSeconds, int maxNodes) throws Exception {
      String xml = "<region name=\"" +regionName +"\" policyClass=\"org.jboss.ejb3.cache.tree.AbortableLRUPolicy\">\n" +
               "<attribute name=\"maxNodes\">" +maxNodes +"</attribute>\n" +
               "<attribute name=\"timeToLiveSeconds\">"+ timeToLiveSeconds +"</attribute>\n" +
               "</region>";
      return XmlHelper.stringToElement(xml);
   }

   public void start()
   {
      // register to listen for cache events
      
      // TODO this approach may not be scalable when there are many beans 
      // since then we will need to go thru N listeners to figure out which 
      // one this event belongs to. Consider having a singleton listener
      listener = new ClusteredStatefulCacheListener();
      cache.addTreeCacheListener(listener);
      
      if (removalTask != null)
         removalTask.start();
      
      running = true;
   }

   public void stop()
   {
      running = false;
      
      // Remove the listener
      cache.removeTreeCacheListener(listener);

      try {
         // Remove locally. We do this to clean up the persistent store,
         // which is not affected by the inactivateRegion call below.
         Option opt = new Option();
      	 opt.setCacheModeLocal(true);
         cache.remove(cacheNode, opt);
      } 
      catch (CacheException e) 
      {
         log.error("Stop(): can't remove bean from the underlying distributed cache");
      }      
      
      try
      {
         cache.inactivateRegion(cacheNode.toString());
      }
      catch (Exception e)
      {
         log.error("Caught exception inactivating region " + cacheNode, e);
      }
      try
      {
         cache.unregisterClassLoader(cacheNode.toString());
      }
      catch (RegionNotFoundException e)
      {
         log.error("Caught exception unregistering classloader from  region " + cacheNode, e);
      }

      // Remove the eviction region
      RegionManager rm = cache.getEvictionRegionManager();
      rm.removeRegion(cacheNode);

      log.debug("stop(): StatefulTreeCache stopped successfully for " +cacheNode);
   }
   
   public int getCacheSize()
   {
      int count = 0;
	   try 
	   {
          Set children = null;
          for (int i = 0; i < hashBuckets.length; i++)
          {
             children = cache.getChildrenNames(new Fqn(cacheNode, hashBuckets[i]));
             count += (children == null ? 0 : children.size());
          }
		  count = count - passivatedCount;
	   } 
       catch (CacheException e)
	   {
		   log.error("Caught exception calculating cache size", e);
           count = -1;
	   }
	   return count;
   }
   
   public int getTotalSize()
   {
      return beans.size();
   }
   
   public int getCreateCount()
   {
	   return createCount;
   }
   
   public int getPassivatedCount()
   {
	   return passivatedCount;
   }
   
   public int getRemoveCount()
   {
      return removeCount;
   }
   
   public int getAvailableCount()
   {
      return -1;
   }
   
   public int getMaxSize()
   {
      return -1;
   }
   
   public int getCurrentSize()
   {
      return getCacheSize();
   }
   
   private void putInCache(StatefulBeanContext ctx) throws CacheException
   {
      Boolean active = localActivity.get();
      try
      {
         localActivity.set(Boolean.TRUE);
         ctx.preReplicate();
         cache.put(getFqn(ctx.getId()), "bean", ctx);
         ctx.markedForReplication = false;
      }
      finally
      {
         localActivity.set(active);
      }
      
   }
   
   private Fqn getFqn(Object id)
   {
      String beanId = id.toString();
      int index;
      if (id instanceof GUID)
      {
         index = (id.hashCode()& 0x7FFFFFFF) % hashBuckets.length;
      }
      else
      {
         index = (beanId.hashCode()& 0x7FFFFFFF) % hashBuckets.length;
      }
      
      return new Fqn(cacheNode, hashBuckets[index], beanId);
   }
   
   /** 
    * Creates a RuntimeException, but doesn't pass CacheException as the cause
    * as it is a type that likely doesn't exist on a client.
    * Instead creates a RuntimeException with the original exception's
    * stack trace.
    */   
   private RuntimeException convertToRuntimeException(CacheException e)
   {
      RuntimeException re = new RuntimeException(e.getClass().getName() + " " + e.getMessage());
      re.setStackTrace(e.getStackTrace());
      return re;
   }

   /**
    * A TreeCacheListener that allows us to get notifications of passivations and
    * activations and thus notify the cached StatefulBeanContext.
    */
   public class ClusteredStatefulCacheListener extends AbstractTreeCacheListener
   {
      @Override
      public void nodeActivate(Fqn fqn, boolean pre) 
      {
         // Ignore everything but locally originating "post" events 
         // for nodes in our region
         if(pre) return;
         if(fqn.size() != FQN_SIZE) return;
         if(!fqn.isChildOrEquals(cacheNode)) return;

         // Don't activate a bean just so we can replace the object 
         // with a replicated one
         if (Boolean.TRUE != localActivity.get()) 
         {
            // But we do want to record that the bean's now in memory
            --passivatedCount;
            return; 
         }
         
         StatefulBeanContext bean = null;
         try {
            // TODO Can this cause deadlock in the cache level? Should be ok but need review.
            bean = (StatefulBeanContext) cache.get(fqn, "bean");    
         } 
         catch (CacheException e) 
         {
            log.error("nodeActivate(): can't retrieve bean instance from: " +fqn + " with exception: " +e);
            return;
         }
         
         if(bean == null)
         {
            throw new IllegalStateException("nodeActivate(): null bean instance.");
         }
         
         --passivatedCount;

         if(log.isTraceEnabled())
         {
            log.trace("nodeActivate(): sending postActivate event to bean at fqn: " +fqn);
         }
         
         ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
         try
         {  
            ClassLoader cl = classloader.get();
            if (cl != null)
            {
               Thread.currentThread().setContextClassLoader(cl);
            }
            
            bean.activateAfterReplication();
         }
         finally
         {
            Thread.currentThread().setContextClassLoader(oldCl);
         }
      }

      @Override
      public void nodePassivate(Fqn fqn, boolean pre) 
      {
         // Ignore everything but "pre" events for nodes in our region
         if(!pre) return;
         if(fqn.size() != FQN_SIZE) return;
         if(!fqn.isChildOrEquals(cacheNode)) return;

         StatefulBeanContext bean = null;
         ClassLoader oldCl = Thread.currentThread().getContextClassLoader();  
         Boolean active = localActivity.get();       
         try 
         {
            localActivity.set(Boolean.TRUE);
            // EJBTHREE-746 Use peek to bypass interceptors and thus avoid generating another 
            // eviction event (which will cause another attempt to passivate)
            // Caller thread (eviction) already has a lock on the node
            // With JBC 2.0 there is a BypassInterceptors Option we can use
            DataNode node = cache.peek(fqn);
            if (node != null)
            {
               bean = (StatefulBeanContext) node.get("bean");
               if (bean != null)
               {
                  ClassLoader cl = classloader.get();
                  if (cl != null)
                  {
                     Thread.currentThread().setContextClassLoader(cl);
                  }
               
                  if (!bean.getCanPassivate())
                  {
                     // Abort the eviction
                     throw new ContextInUseException("Cannot passivate bean " + fqn + 
                           " -- it or one if its children is currently in use");
                  }
                  
                  if(log.isTraceEnabled())
                  {
                     log.trace("nodePassivate(): send prePassivate event to bean at fqn: " +fqn);
                  }
                  
                  bean.passivateAfterReplication();
                  ++passivatedCount;
               }
            }
         }
         catch (NoSuchEJBException e)
         {
            // TODO is this still necessary? Don't think we
            // should have orphaned proxies any more
            if (bean instanceof ProxiedStatefulBeanContext)
            {
               // This is probably an orphaned proxy; double check and remove it
               try
               {
                  bean.getContainedIn();
                  // If that didn't fail, it's not an orphan
                  throw e;
               }
               catch (NoSuchEJBException n)
               {
                  log.debug("nodePassivate(): removing orphaned proxy at " + fqn);
                  try
                  {
                     cache.remove(fqn);
                  }
                  catch (CacheException c)
                  {
                     log.error("nodePassivate(): could not remove orphaned proxy at " + fqn, c);
                     // Just fall through and let the eviction try
                  }
               }
            }
            else
            {
               throw e;
            }
         }
         finally
         {
            Thread.currentThread().setContextClassLoader(oldCl);
            localActivity.set(active);
         }
      }
   }
   
   private class RemovalTimeoutTask extends Thread
   {
      public RemovalTimeoutTask(String name)
      {
         super(name);
      }

      public void run()
      {
         while (running)
         {
            try
            {
               Thread.sleep(removalTimeout * 1000);
            }
            catch (InterruptedException e)
            {
               running = false;
               return;
            }
            try
            {
               long now = System.currentTimeMillis();
               
               Iterator<Map.Entry<Object, Long>> it = beans.entrySet().iterator();
               while (it.hasNext())
               {
                  
                  Map.Entry<Object, Long> entry = it.next();
                  long lastUsed = entry.getValue();
                  if (now - lastUsed >= removalTimeout * 1000)
                  {
                     remove(entry.getKey());
                  }
               }
            }
            catch (Exception ex)
            {
               log.error("problem removing SFSB thread", ex);
            }
         }
      }
   }
}