FileDocCategorySizeDatePackage
DistributedStateImpl.javaAPI DocJBoss 4.2.115621Fri Jul 13 20:52:38 BST 2007org.jboss.ha.framework.server

DistributedStateImpl.java

/*
 * JBoss, Home of Professional Open Source.
 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
 * as indicated by the @author tags. See the copyright.txt file in the
 * distribution for a full listing of individual contributors.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */
package org.jboss.ha.framework.server;


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
import org.jboss.logging.Logger;
import org.jboss.system.Registry;

/**
 *   This class manages distributed state across the cluster.
 *
 * @author  <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
 * @author  <a href="mailto:bill@burkecentral.com">Bill Burke</a>.
 * @author  Scott.Stark@jboss.org
 * @version $Revision: 57188 $
 */
public class DistributedStateImpl
   implements DistributedStateImplMBean, HAPartitionStateTransfer
{
   // Constants -----------------------------------------------------

   protected final static String SERVICE_NAME = "DistributedState";

   protected final static Class[] set_types=new Class[]{String.class, Serializable.class, Serializable.class};
   protected final static Class[] remove_types=new Class[]{String.class, Serializable.class};

   // Attributes ----------------------------------------------------

   /**
    * HashMap<String, HashMap>. Keys= category, value = HashMap<Object, Object>
    */
   protected HashMap categories = new HashMap ();


   protected HashMap keyListeners = new HashMap ();
   protected HAPartition partition;
   protected Logger log;
   protected MBeanServer mbeanServer = null;
   protected String name = null;

   // Static --------------------------------------------------------c

   // Constructors --------------------------------------------------

   public DistributedStateImpl () {} // for JMX checks

   public DistributedStateImpl (HAPartition partition, MBeanServer server)
   {
      this.partition = partition;
      this.mbeanServer = server;
      this.log = Logger.getLogger (this.getClass ());
   }

   // Public --------------------------------------------------------

   public void init () throws Exception
   {
      // When we subscribe to state transfer events, GetState will be called to initialize
      // this service.
      partition.subscribeToStateTransferEvents (SERVICE_NAME, this);
      partition.registerRPCHandler (SERVICE_NAME, this);

      // subscribed this "sub-service" of HAPartition with JMX
      // TODO: In the future (when state transfer issues will be completed),
      // we will need to redesign the way HAPartitions and its sub-protocols are
      // registered with JMX. They will most probably be independant JMX services.
      //
      this.name = "jboss:service=" + SERVICE_NAME +
                    ",partitionName=" + this.partition.getPartitionName();
      ObjectName jmxName = new ObjectName(this.name);
      mbeanServer.registerMBean(this, jmxName);
      Registry.bind (this.name, this);
   }

   public void start () throws Exception
   {
   }

   public void stop () throws Exception
   {
       // NR 200505:[JBCLUSTER-38] will be done at destroy instead
//      Registry.unbind (this.name);
//      ObjectName jmxName = new ObjectName(this.name);
//      mbeanServer.unregisterMBean (jmxName);
   }

    // NR 200505 : [JBCLUSTER-38] unbind DS MBean
   public void destroy () throws Exception
   {
      Registry.unbind (this.name);
      ObjectName jmxName = new ObjectName(this.name);
      mbeanServer.unregisterMBean (jmxName);
      
      partition.unsubscribeFromStateTransferEvents (SERVICE_NAME, this);
      partition.unregisterRPCHandler(SERVICE_NAME, this);
   }

   public String listContent () throws Exception
   {
      StringBuffer result = new StringBuffer ();
      Collection cats = this.getAllCategories ();
      Iterator catsIter = cats.iterator ();
      while (catsIter.hasNext ())
      {
         String category = (String)catsIter.next ();
         Iterator keysIter = this.getAllKeys(category).iterator ();

         result.append ("-----------------------------------------------\n");
         result.append ("Category : ").append (category).append ("\n\n");
         result.append ("KEY\t:\tVALUE\n");

         while (keysIter.hasNext ())
         {
            Serializable key = (Serializable) keysIter.next ();
            String value = this.get (category, key).toString ();
            result.append ("'").append(key);
            result.append ("'\t:\t'");
            result.append (value);
            result.append("'\n");
         }
         result.append ("\n");
      }
      return result.toString ();
   }

   public String listXmlContent () throws Exception
   {
      StringBuffer result = new StringBuffer ();
      Collection cats = this.getAllCategories ();
      Iterator catsIter = cats.iterator ();

      result.append ("<DistributedState>\n");

      while (catsIter.hasNext ())
      {
         String category = (String)catsIter.next ();
         Iterator keysIter = this.getAllKeys(category).iterator ();

         result.append ("\t<Category>\n");
         result.append ("\t\t<CategoryName>").append (category).append ("</CategoryName>\n");

         while (keysIter.hasNext ())
         {
            Serializable key = (Serializable) keysIter.next ();
            String value = this.get (category, key).toString ();
            result.append ("\t\t<Entry>\n");
            result.append ("\t\t\t<Key>").append (key).append ("</Key>\n");
            result.append ("\t\t\t<Value>").append (value).append ("</Value>\n");
            result.append ("\t\t</Entry>\n");
         }
         result.append ("\t</Category>\n");
      }
      result.append ("</DistributedState>\n");

      return result.toString ();
   }

   // DistributedState implementation ----------------------------------------------

   public void set (String category, Serializable key, Serializable value)
      throws Exception
   {
      set (category, key, value, true);
   }
   public void set (String category, Serializable key, Serializable value,
      boolean asynchronousCall) throws Exception
   {
      Object[] args = {category, key, value};
      if (asynchronousCall)
         partition.callAsynchMethodOnCluster (SERVICE_NAME, "_set", args, set_types, true);
      else
         partition.callMethodOnCluster (SERVICE_NAME, "_set", args, set_types, true);
      this._setInternal (category, key, value);
      notifyKeyListeners (category, key, value, true);
   }

   public Serializable remove (String category, Serializable key)
      throws Exception
   {
      return remove (category, key, true);
   }

   public Serializable remove (String category, Serializable key,
      boolean asynchronousCall) throws Exception
   {
      Object[] args = {category, key};
      if (asynchronousCall)
         partition.callAsynchMethodOnCluster (SERVICE_NAME, "_remove", args, remove_types, true);
      else
         partition.callMethodOnCluster (SERVICE_NAME, "_remove", args, remove_types, true);
      Serializable removed = this._removeInternal (category, key);
      notifyKeyListenersOfRemove (category, key, removed , true);
      return removed ;
   }

   public Serializable get (String category, Serializable key)
   {
      synchronized(this.categories)
      {
         HashMap cat = (HashMap)categories.get (category);
         if (cat == null) return null;

         return (Serializable)cat.get (key);
      }
   }

   public Collection getAllCategories ()
   {
      synchronized(this.categories)
      {
         return Collections.unmodifiableCollection(categories.keySet ());
      }
   }

   public Collection getAllKeys (String category)
   {
      synchronized(this.categories)
      {
         HashMap cat = (HashMap)categories.get (category);
         if (cat == null) return null;

         return Collections.unmodifiableCollection(cat.keySet ());
      }
   }

   public Collection getAllValues (String category)
   {
      synchronized(this.categories)
      {
         HashMap cat = (HashMap)categories.get (category);
         if (cat == null) return null;

         return Collections.unmodifiableCollection(cat.values ());
      }
   }


   public void registerDSListenerEx (String category, DSListenerEx subscriber)
   {
      registerListener(category, subscriber);
   }
   public void unregisterDSListenerEx (String category, DSListenerEx subscriber)
   {
      unregisterListener(category, subscriber);
   }
   public void registerDSListener (String category, DSListener subscriber)
   {
      registerListener(category, subscriber);
   }
   public void unregisterDSListener (String category, DSListener subscriber)
   {
      unregisterListener(category, subscriber);
   }

   // HAPartition RPC method invocations implementation ----------------------------------------------

   public void _set (String category, String key, Serializable value)
      throws Exception
   {
      this._setInternal (category, key, value);
      notifyKeyListeners (category, key, value, false);
   }
   public void _set (String category, Serializable key, Serializable value)
      throws Exception
   {
      this._setInternal (category, key, value);
      notifyKeyListeners (category, key, value, false);
   }

   public void _setInternal (String category, Serializable key, Serializable value)
      throws Exception
   {
      synchronized(this.categories)
      {
         HashMap cat = (HashMap)categories.get (category);
         if (cat == null)
         {
            cat = new HashMap ();
            categories.put (category, cat);
         }
         cat.put (key, value);
      }
   }

   public void _remove (String category, String key) throws Exception
   {
      Serializable removed = this._removeInternal (category, key);
      notifyKeyListenersOfRemove (category, key, removed, false);
   }
   public void _remove (String category, Serializable key) throws Exception
   {
      Serializable removed = this._removeInternal (category, key);
      notifyKeyListenersOfRemove (category, key, removed, false);
   }

   public Serializable _removeInternal (String category, Serializable key)
      throws Exception
   {
      synchronized(this.categories)
      {
         HashMap cat = (HashMap)categories.get (category);
         if (cat == null) return null;
         Object removed = cat.remove (key);
         if (removed != null)
         {
            if (cat.size () == 0)
            {
               categories.remove (category);
            }
         }
         return (Serializable)removed;
      }
   }

   // HAPartitionStateTransfer implementation ----------------------------------------------

   public Serializable getCurrentState ()
   {
      HashMap retval=new HashMap();
      Map.Entry entry;
      String catName;
      HashMap value, newVal;

      synchronized(this.categories)
      {

         for(Iterator it=this.categories.entrySet().iterator(); it.hasNext();)
         {
            entry=(Map.Entry)it.next(); // key: category name, value: HashMap
            catName=(String)entry.getKey();
            value=(HashMap)entry.getValue();
            newVal=value != null? (HashMap)value.clone() : null;
            retval.put(catName, newVal);
         }

         return retval;
      }
   }

   public void setCurrentState (Serializable newState)
   {
      synchronized (this.categories)
      {
         categories.clear ();
         categories.putAll ((HashMap)newState);
         if (keyListeners.size () > 0)
         {
            cleanupKeyListeners ();
         }
      }
   }

   // Package protected ---------------------------------------------

   // Protected -----------------------------------------------------

   protected void registerListener (String category, Object subscriber)
   {
      synchronized(this.keyListeners)
      {
         ArrayList listeners = (ArrayList)keyListeners.get (category);
         if (listeners == null)
         {
            listeners = new ArrayList ();
            keyListeners.put (category, listeners);
         }
         listeners.add (subscriber);
      }
   }

   protected void unregisterListener (String category, Object subscriber)
   {
      synchronized(this.keyListeners)
      {
         ArrayList listeners = (ArrayList)keyListeners.get (category);
         if (listeners == null) return;

         listeners.remove (subscriber);
         if (listeners.size () == 0)
         {
            keyListeners.remove (category);
         }
      }
   }

   protected void notifyKeyListeners (String category, Serializable key,
      Serializable value, boolean locallyModified)
   {
      synchronized(this.keyListeners)
      {
         ArrayList listeners = (ArrayList)keyListeners.get (category);
         if (listeners == null)
            return;
         String strKey = key.toString();

         for (int i = 0; i < listeners.size (); i++)
         {
            Object listener = listeners.get (i);
            if( listener instanceof DSListener )
            {
               DSListener dslistener = (DSListener) listener;
               dslistener.valueHasChanged (category, strKey, value, locallyModified);
            }
            else
            {
               DSListenerEx dslistener = (DSListenerEx) listener;
               dslistener.valueHasChanged (category, key, value, locallyModified);
            }
         }
      }
   }

   protected void notifyKeyListenersOfRemove (String category, Serializable key,
      Serializable oldContent, boolean locallyModified)
   {
      synchronized(this.keyListeners)
      {
         ArrayList listeners = (ArrayList)keyListeners.get (category);
         if (listeners == null)
            return;
         String strKey = key.toString();

         for (int i = 0; i < listeners.size (); i++)
         {
            Object listener = listeners.get (i);
            if( listener instanceof DSListener )
            {
               DSListener dslistener = (DSListener) listener;
               dslistener.keyHasBeenRemoved (category, strKey, oldContent, locallyModified);
            }
            else
            {
               DSListenerEx dslistener = (DSListenerEx) listener;
               dslistener.keyHasBeenRemoved (category, key, oldContent, locallyModified);
            }
         }
      }
   }

   protected void cleanupKeyListeners ()
   {
      // NOT IMPLEMENTED YET
   }

   // Private -------------------------------------------------------

   // Inner classes -------------------------------------------------

}