FileDocCategorySizeDatePackage
HASessionStateImpl.javaAPI DocJBoss 4.2.122536Fri Jul 13 20:52:38 BST 2007org.jboss.ha.hasessionstate.server

HASessionStateImpl.java

/*
 * JBoss, Home of Professional Open Source.
 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
 * as indicated by the @author tags. See the copyright.txt file in the
 * distribution for a full listing of individual contributors.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */
package org.jboss.ha.hasessionstate.server;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Vector;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.Name;
import javax.naming.NameNotFoundException;
import javax.naming.Reference;
import javax.naming.StringRefAddr;

import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.hasessionstate.interfaces.PackagedSession;
import org.jboss.logging.Logger;
import org.jboss.naming.NonSerializableFactory;
import org.jboss.system.server.ServerConfigUtil;

import EDU.oswego.cs.dl.util.concurrent.Mutex;

/**
 *   Default implementation of HASessionState
 *
 *   @see org.jboss.ha.hasessionstate.interfaces.HASessionState
 *   @author sacha.labourey@cogito-info.ch
 *   @author <a href="bill@burkecentral.com">Bill Burke</a>
 *   @version $Revision: 57188 $
 *
 * <p><b>Revisions:</b><br>
 * <p><b>2002/01/09: billb</b>
 * <ol>
 *   <li>ripped out sub partitioning stuff.  It really belongs as a subclass of HAPartition
 * </ol>
 * 
 */

public class HASessionStateImpl
   implements org.jboss.ha.hasessionstate.interfaces.HASessionState,
	      HAPartition.HAPartitionStateTransfer
{
   
   protected String _sessionStateName;
   protected Logger log;
   protected HAPartition hapGeneral;
   protected String sessionStateIdentifier;
   protected String myNodeName;
   
   protected long beanCleaningDelay;
   protected String haPartitionName;
   protected String haPartitionJndiName;
   
   protected final String DEFAULT_PARTITION_JNDI_NAME = ServerConfigUtil.getDefaultPartitionName();
   protected final String JNDI_FOLDER_NAME_FOR_HASESSIONSTATE = org.jboss.metadata.ClusterConfigMetaData.JNDI_PREFIX_FOR_SESSION_STATE;
   protected final String JNDI_FOLDER_NAME_FOR_HAPARTITION = "/HAPartition/";
   protected final long MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE = 30L * 60L * 1000L; // 30 minutes... should be set externally or use cache settings
   protected static final String HA_SESSION_STATE_STATE_TRANSFER = "HASessionStateTransfer";
   
   protected HashMap locks = new HashMap ();
      
   public HASessionStateImpl ()
   {}
   
   public HASessionStateImpl (String sessionStateName,
                              HAPartition partition,
                              long beanCleaningDelay)
   {
      this(sessionStateName, partition.getPartitionName(), beanCleaningDelay);
      this.hapGeneral = partition;
   }
   
   public HASessionStateImpl (String sessionStateName,
                              String mainHAPartitionName,
                              long beanCleaningDelay)
   {
      if (sessionStateName == null)
         this._sessionStateName = org.jboss.metadata.ClusterConfigMetaData.DEFAULT_SESSION_STATE_NAME;
      else
         this._sessionStateName = sessionStateName;
      
      this.sessionStateIdentifier = "SessionState-'" + this._sessionStateName + "'";
      
      if (mainHAPartitionName == null)
         haPartitionName = DEFAULT_PARTITION_JNDI_NAME;
      else
         haPartitionName = mainHAPartitionName;
      
      haPartitionJndiName = JNDI_FOLDER_NAME_FOR_HAPARTITION + haPartitionName;
      
      if (beanCleaningDelay > 0)
         this.beanCleaningDelay = beanCleaningDelay;
      else
         this.beanCleaningDelay = MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE;
      
   }
   
   public void init () throws Exception
   {
      this.log = Logger.getLogger(HASessionStateImpl.class.getName() + "." + this._sessionStateName);

      // BES 20060416 -- if people used an old config, we may not
      // have been passed the partition, so have to find in JNDI
      // JNDI work s/b done in start(), but we have no choice, as
      // we must register for state transfer in init
      if (this.hapGeneral == null)
      {
         Context ctx = new InitialContext ();      
         this.hapGeneral = (HAPartition)ctx.lookup (haPartitionJndiName);
      }
      
      if (hapGeneral == null)
         log.error ("Unable to get default HAPartition under name '" + haPartitionJndiName + "'.");
      
      this.hapGeneral.registerRPCHandler (this.sessionStateIdentifier, this);
      this.hapGeneral.subscribeToStateTransferEvents (HA_SESSION_STATE_STATE_TRANSFER, this);
   }
   
   public void start () throws Exception
   {
      this.myNodeName = this.hapGeneral.getNodeName ();
      log.debug ("HASessionState node name : " + this.myNodeName );
      
      // BES 4/7/06 clean up lifecycle; move this to start, as it can't be 
      // called until startService due to JNDI dependency
      Context ctx = new InitialContext ();
      this.bind (this._sessionStateName, this, HASessionStateImpl.class, ctx);
   }
   
   protected void bind (String jndiName, Object who, Class classType, Context ctx) throws Exception
   {
      // Ah ! This service isn't serializable, so we use a helper class
      //
      NonSerializableFactory.bind (jndiName, who);
      Name n = ctx.getNameParser ("").parse (jndiName);
      while (n.size () > 1)
      {
         String ctxName = n.get (0);
         try
         {
            ctx = (Context)ctx.lookup (ctxName);
         }
         catch (NameNotFoundException e)
         {
            log.debug ("creating Subcontext" + ctxName);
            ctx = ctx.createSubcontext (ctxName);
         }
         n = n.getSuffix (1);
      }
      
      // The helper class NonSerializableFactory uses address type nns, we go on to
      // use the helper class to bind the service object in JNDI
      //
      StringRefAddr addr = new StringRefAddr ("nns", jndiName);
      Reference ref = new Reference ( classType.getName (), addr, NonSerializableFactory.class.getName (), null);
      ctx.bind (n.get (0), ref);
   }
   
   public void stop () throws Exception
   {      
      purgeState();
       
      // Unbind so we can rebind if restarted
      try
      {
         Context ctx = new InitialContext ();         
         ctx.unbind (this._sessionStateName);
         NonSerializableFactory.unbind (this._sessionStateName);
      }
      catch (Exception ignored)
      {}
   }
   
   public void destroy() throws Exception
   {       
      // Remove ref to ourself from HAPartition
      this.hapGeneral.unregisterRPCHandler(this.sessionStateIdentifier, this);
      this.hapGeneral.unsubscribeFromStateTransferEvents(HA_SESSION_STATE_STATE_TRANSFER, this);  
   }
   
   public String getNodeName ()
   {
      return this.myNodeName ;
   }
   
   // Used for Session state transfer
   //
   public Serializable getCurrentState ()
   {
      log.debug ("Building and returning state of HASessionState");
      
      if (this.appSessions == null)
         this.appSessions = new Hashtable ();
      
      Serializable result = null;
      
      synchronized (this.lockAppSession)
      {
         this.purgeState ();
         
         try
         {
            result = deflate (this.appSessions);
         }
         catch (Exception e)
         {
            log.error("operation failed", e);
         }
      }
      return result;
   }
   
   public void setCurrentState (Serializable newState)
   {
      log.debug ("Receiving state of HASessionState");
      
      if (this.appSessions == null)
         this.appSessions = new Hashtable ();
      
      synchronized (this.lockAppSession)
      {
         try
         {
            this.appSessions.clear (); // hope to facilitate the job of the GC
            this.appSessions = (Hashtable)inflate ((byte[])newState);
         }
         catch (Exception e)
         {
            log.error("operation failed", e);
         }         
      }
   }
   
   public void purgeState ()
   {
      synchronized (this.lockAppSession)
      {
         for (Enumeration keyEnum = this.appSessions.keys (); keyEnum.hasMoreElements ();)
         {
            // trip in apps..
            //
            Object key = keyEnum.nextElement ();
            Hashtable value = (Hashtable)this.appSessions.get (key);
            long currentTime = System.currentTimeMillis ();
            
            for (Iterator iterSessions = value.values ().iterator (); iterSessions.hasNext ();)
            {
               PackagedSession ps = (PackagedSession)iterSessions.next ();
               if ( (currentTime - ps.unmodifiedExistenceInVM ()) > beanCleaningDelay )
                  iterSessions.remove ();
            }
         }
      }
      
   }
   
   protected byte[] deflate (Object object) throws IOException
   {
      ByteArrayOutputStream baos = new ByteArrayOutputStream ();
      Deflater def = new Deflater (java.util.zip.Deflater.BEST_COMPRESSION);
      DeflaterOutputStream dos = new DeflaterOutputStream (baos, def);
      
      ObjectOutputStream out = new ObjectOutputStream (dos);
      out.writeObject (object);
      out.close ();
      dos.finish ();
      dos.close ();
      
      return baos.toByteArray ();
   }
   
   protected Object inflate (byte[] compressedContent) throws IOException
   {
      if (compressedContent==null)
         return null;
      
      try
      {
         ObjectInputStream in = new ObjectInputStream (new InflaterInputStream (new ByteArrayInputStream (compressedContent)));
         
         Object object = in.readObject ();
         in.close ();
         return object;
      }
      catch (Exception e)
      {
         throw new IOException (e.toString ());
      }
   }
   
   protected Hashtable appSessions = new Hashtable ();
   protected Object lockAppSession = new Object ();
   
   protected Hashtable getHashtableForApp (String appName)
   {
      if (this.appSessions == null)
         this.appSessions = new Hashtable (); // should never happen though...
      
      Hashtable result = null;
      
      synchronized (this.lockAppSession)
      {
         result = (Hashtable)this.appSessions.get (appName);
         if (result == null)
         {
            result = new Hashtable ();
            this.appSessions.put (appName, result);
         }
      }
      return result;
   }
   
   public void createSession (String appName, Object keyId)
   {
      this._createSession (appName, keyId);
   }
   
   public PackagedSessionImpl _createSession (String appName, Object keyId)
   {
      Hashtable app = this.getHashtableForApp (appName);
      PackagedSessionImpl result = new PackagedSessionImpl ((Serializable)keyId, null, this.myNodeName);
      app.put (keyId, result);
      return result;
   }
   
   public void setState (String appName, Object keyId, byte[] state)
      throws java.rmi.RemoteException
   {
      Hashtable app = this.getHashtableForApp (appName);
      PackagedSession ps = (PackagedSession)app.get (keyId);
      
      if (ps == null)
      {
         ps = _createSession (appName, keyId);
      }
            
      boolean isStateIdentical = false;
      
      Mutex mtx = getLock (appName, keyId);
      try { 
         if (!mtx.attempt (0))
            throw new java.rmi.RemoteException ("Concurent calls on session object.");
      } 
      catch (InterruptedException ie) { log.info (ie); return; }
      
      try
      {
         isStateIdentical = ps.setState(state);
         if (!isStateIdentical)
         {
            Object[] args =
               {appName, ps};
            try
            {
               this.hapGeneral.callMethodOnCluster (this.sessionStateIdentifier,
                                                    "_setState",
                                                    args,
                                                    new Class[]{String.class, PackagedSession.class}, true);
            }
            catch (Exception e)
            {
               log.error("operation failed", e);
            }
         }
      }
      finally
      {
         mtx.release ();
      }
   }
   
   /*
   public void _setStates (String appName, Hashtable packagedSessions)
   {
      synchronized (this.lockAppSession)
      {
         Hashtable app = this.getHashtableForApp (appName);
         
         if (app == null)
         {
            app = new Hashtable (packagedSessions.size ());
            this.appSessions.put (appName, app);
         }
         app.putAll (packagedSessions);
      }
   }*/
   
   public void _setState (String appName, PackagedSession session)
   {
      Hashtable app = this.getHashtableForApp (appName);
      PackagedSession ps = (PackagedSession)app.get (session.getKey ());
      
      if (ps == null)
      {
         ps = session;
         synchronized (app)
         {
            app.put (ps.getKey (), ps);
         }
      }
      else
      {
         Mutex mtx = getLock (appName, session.getKey ());
         try { mtx.acquire (); } catch (InterruptedException ie) { log.info (ie); return; }
         
         try
         {
            if (ps.getOwner ().equals (this.myNodeName))
            {
               // a modification has occured externally while we were the owner
               //
               ownedObjectExternallyModified (appName, session.getKey (), ps, session);
            }
            ps.update (session);
         }
         finally
         {
            mtx.release ();
         }
      }
      
   }
   
   public PackagedSession getState (String appName, Object keyId)
   {
      Hashtable app = this.getHashtableForApp (appName);
      return (PackagedSession)app.get (keyId);
   }
   
   public PackagedSession getStateWithOwnership (String appName, Object keyId) throws java.rmi.RemoteException
   {
      return this.localTakeOwnership (appName, keyId);
   }
   
   public PackagedSession localTakeOwnership (String appName, Object keyId) throws java.rmi.RemoteException
   {
      Hashtable app = this.getHashtableForApp (appName);
      PackagedSession ps = (PackagedSession)app.get (keyId);
      
      // if the session is not yet available, we simply return null. The persistence manager
      // will have to take an action accordingly
      //
      if (ps == null)
         return null;
      
      Mutex mtx = getLock (appName, keyId);
      
      try { 
         if (!mtx.attempt (0))
            throw new java.rmi.RemoteException ("Concurent calls on session object.");
      } 
      catch (InterruptedException ie) { log.info (ie); return null; }
      
      try
      {
         if (!ps.getOwner ().equals (this.myNodeName))
         {
            Object[] args =
            {appName, keyId, this.myNodeName, new Long (ps.getVersion ())};
            ArrayList answers = null;
            try
            {
               answers = this.hapGeneral.callMethodOnCluster (this.sessionStateIdentifier,
                                                              "_setOwnership",
                                                              args,
                                                              new Class[]{String.class, Object.class,
                                                                          String.class, Long.class},
                                                              true);
            }
            catch (Exception e)
            {
               log.error("operation failed", e);
            }
            
            if (answers != null && answers.contains (Boolean.FALSE))
               throw new java.rmi.RemoteException ("Concurent calls on session object.");
            else
            {
               ps.setOwner (this.myNodeName);
               return ps;
            }
         }
         else
            return ps;
      }
      finally
      {
         mtx.release ();
      }
   }
   
   public Boolean _setOwnership (String appName, Object keyId, String newOwner, Long remoteVersion)
   {
      Hashtable app = this.getHashtableForApp (appName);
      PackagedSession ps = (PackagedSession)app.get (keyId);
      Boolean answer = Boolean.TRUE;
      Mutex mtx = getLock (appName, keyId);
      
      try { 
         if (!mtx.attempt (0))
            return Boolean.FALSE;
      } 
      catch (InterruptedException ie) { log.info (ie); return Boolean.FALSE; }

      try
      {
         if (!ps.getOwner ().equals (this.myNodeName))
         {
            // this is not our business... we don't care
            // we do not update the owner of ps as another host may refuse the _setOwnership call
            // anyway, the update will be sent to us later if state is modified
            //
            //ps.setOwner (newOwner);
            answer = Boolean.TRUE;
         }
         else if (ps.getVersion () > remoteVersion.longValue ())
         {
            // we are concerned and our version is more recent than the one of the remote host!
            // it means that we have concurrent calls on the same state that has not yet been updated
            // this means we will need to raise a java.rmi.RemoteException
            //
            answer = Boolean.FALSE;
         }
         else
         {
            // the remote host has the same version as us (or more recent? possible?)
            // we need to update the ownership. We can do this because we know that no other
            // node can refuse the _setOwnership call
            ps.setOwner (newOwner);
            ownedObjectExternallyModified (appName, keyId, ps, ps);
            answer = Boolean.TRUE;
         }
      }
      finally
      {
         mtx.release ();
      }
      return answer;
   }
   
   public void takeOwnership (String appName, Object keyId) throws java.rmi.RemoteException
   {
      this.localTakeOwnership (appName, keyId);
   }
   
   public void removeSession (String appName, Object keyId)
   {
      Hashtable app = this.getHashtableForApp (appName);
      if (app != null)
      {
         PackagedSession ps = (PackagedSession)app.remove (keyId);
         if (ps != null)
         {
            removeLock (appName, keyId);
            Object[] args =
               { appName, keyId };
            try
            {
               this.hapGeneral.callMethodOnCluster (this.sessionStateIdentifier,
                                                    "_removeSession",
                                                    args,
                                                    new Class[]{String.class, Object.class},
                                                    true);
            }
            catch (Exception e)
            { log.error("operation failed", e); }
         }
      }
   }
   
   public void _removeSession (String appName, Object keyId)
   {
      Hashtable app = this.getHashtableForApp (appName);
      PackagedSession ps = null;
      ps = (PackagedSession)app.remove (keyId);
      if (ps != null && ps.getOwner ().equals (this.myNodeName))
         ownedObjectExternallyModified (appName, keyId, ps, ps);
      
      removeLock (appName, keyId);
   }
   
   protected Hashtable listeners = new Hashtable ();
   
   public synchronized void subscribe (String appName, HASessionStateListener listener)
   {
      Vector members = (Vector)listeners.get (appName);
      if (members == null)
      {
         members = new Vector ();
         listeners.put (appName, members);
      }
      if (!members.contains (listener))
      {
         members.add (listener);
      }

   }
   
   public synchronized void unsubscribe (String appName, HASessionStateListener listener)
   {
      Vector members = (Vector)listeners.get (appName);
      if ((members != null) && members.contains (listener))
         members.remove (listener);
   }
   
   public void ownedObjectExternallyModified (String appName, Object key, PackagedSession oldSession, PackagedSession newSession)
   {
      Vector members = (Vector)listeners.get (appName);
      if (members != null)
         for (int i=0; i<members.size (); i++)
         try
         {
            ((HASessionStateListener)members.elementAt (i)).sessionExternallyModified (newSession);
         }
         catch (Throwable t)
         {
            log.debug (t);
         }
   }
   
   public HAPartition getCurrentHAPartition ()
   {
      return this.hapGeneral;
   }
   
   
   protected boolean lockExists (String appName, Object key)
   {
      synchronized (this.locks)
      {
         HashMap ls = (HashMap)this.locks.get (appName);
         if (ls == null)
            return false;
         
         return (ls.get(key)!=null);
      }
   }

   protected Mutex getLock (String appName, Object key)
   {
      synchronized (this.locks)
      {
         HashMap ls = (HashMap)this.locks.get (appName);
         if (ls == null)
         {
            ls = new HashMap ();
            this.locks.put (appName, ls);
         }
          
         Mutex mutex = (Mutex)ls.get(key);
         if (mutex == null)
         {
            mutex = new Mutex ();
            ls.put (key, mutex);
         }
         
         return mutex;         
      }
   }

   protected void removeLock (String appName, Object key)
   {
      synchronized (this.locks)
      {
         HashMap ls = (HashMap)this.locks.get (appName);
         if (ls == null)
            return;
         ls.remove (key);
      }
   }
   
}