FileDocCategorySizeDatePackage
DistributedReplicantManagerImpl.javaAPI DocJBoss 4.2.135844Fri Jul 13 20:52:38 BST 2007org.jboss.ha.framework.server

DistributedReplicantManagerImpl

public class DistributedReplicantManagerImpl extends Object implements AsynchEventHandler.AsynchEventProcessor, DistributedReplicantManagerImplMBean, HAPartition.HAMembershipExtendedListener, HAPartition.HAPartitionStateTransfer
This class manages replicated objects.
author
Bill Burke.
author
Sacha Labourey.
author
Scott.stark@jboss.org
version
$Revision: 61770 $

Fields Summary
protected static final String
SERVICE_NAME
protected static int
threadID
protected EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap
localReplicants
protected EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap
replicants
protected EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap
keyListeners
protected HashMap
intraviewIdCache
protected org.jboss.ha.framework.interfaces.HAPartition
partition
protected AsynchEventHandler
asynchHandler
The handler used to send replicant change notifications asynchronously
protected Logger
log
protected MBeanServer
mbeanserver
protected ObjectName
jmxName
protected String
nodeName
protected EDU.oswego.cs.dl.util.concurrent.Latch
partitionNameKnown
protected boolean
trace
protected Class[]
add_types
protected Class[]
remove_types
Constructors Summary
public DistributedReplicantManagerImpl(org.jboss.ha.framework.interfaces.HAPartition partition, MBeanServer server)
This class manages replicated objects through the given partition

param
partition {@link HAPartition} through which replicated objects will be exchanged


   // Static --------------------------------------------------------
   
   // Constructors --------------------------------------------------       
   
                              
       
   
      this.partition = partition;
      this.mbeanserver = server;
      this.log = Logger.getLogger(DistributedReplicantManagerImpl.class.getName() + "." + partition.getPartitionName());
      this.trace = log.isTraceEnabled();
   
Methods Summary
public void_add(java.lang.String key, java.lang.String nodeName, java.io.Serializable replicant)
Cluster callback called when a new replicant is added on another node

param
key Replicant key
param
nodeName Node that add the current replicant
param
replicant Serialized representation of the replicant

      if( trace )
         log.trace("_add(" + key + ", " + nodeName);
      
      try
      {
         addReplicant(key, nodeName, replicant);
         // Notify listeners asynchronously
         KeyChangeEvent kce = new KeyChangeEvent();
         kce.key = key;
         kce.replicants = lookupReplicants(key);
         asynchHandler.queueEvent(kce);
      }
      catch (Exception ex)
      {
         log.error("_add failed", ex);
      }
   
public void_remove(java.lang.String key, java.lang.String nodeName)
Cluster callback called when a replicant is removed by another node

param
key Name of the replicant key
param
nodeName Node that wants to remove its replicant for the give key

      try
      {
         if (removeReplicant (key, nodeName)) {
            // Notify listeners asynchronously
            KeyChangeEvent kce = new KeyChangeEvent();
            kce.key = key;
            kce.replicants = lookupReplicants(key);
            asynchHandler.queueEvent(kce);
         }
      }
      catch (Exception ex)
      {
         log.error("_remove failed", ex);
      }
   
public voidadd(java.lang.String key, java.io.Serializable replicant)

      if( trace )
         log.trace("add, key="+key+", value="+replicant);
      partitionNameKnown.acquire (); // we don't propagate until our name is known
      
      Object[] args = {key, this.nodeName, replicant};
      partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
      synchronized(localReplicants)
      {
         localReplicants.put(key, replicant);
         notifyKeyListeners(key, lookupReplicants(key));
      }
   
protected voidaddReplicant(java.lang.String key, java.lang.String nodeName, java.io.Serializable replicant)
Add a replicant to the replicants map.

param
key replicant key name
param
nodeName name of the node that adds this replicant
param
replicant Serialized representation of the replica

      addReplicant(replicants, key, nodeName, replicant);
   
protected voidaddReplicant(java.util.Map map, java.lang.String key, java.lang.String nodeName, java.io.Serializable replicant)
Logic for adding replicant to any map.

param
map structure in which adding the new replicant
param
key name of the replicant key
param
nodeName name of the node adding the replicant
param
replicant serialized representation of the replicant that is added

      synchronized(map)
      {
         HashMap rep = (HashMap)map.get(key);
         if (rep == null)
         {
            if( trace )
               log.trace("_adding new HashMap");
            rep = new HashMap();
            map.put(key, rep);
         }
         rep.put(nodeName, replicant);         
      }
   
protected intcalculateReplicantsHash(java.util.List members)

      int result = 0;
      Object obj = null;
      
      for (int i=0; i<members.size (); i++)
      {
         obj = members.get (i);
         if (obj != null)
            result+= obj.hashCode (); // no explicit overflow with int addition
      }
      
      return result;
   
protected voidcleanupKeyListeners()

      // NOT IMPLEMENTED YET
   
public voiddestroy()

  
      // now partition can't be resuscitated, so remove local replicants
      if (localReplicants != null)
      {
         synchronized(localReplicants)
         {
            String[] keys = new String[localReplicants.size()];
            localReplicants.keySet().toArray(keys);
            for(int n = 0; n < keys.length; n ++)
            {               
               this.removeLocal(keys[n]); // channel is disconnected, so
                                          // don't try to notify cluster
            }
         }
      }

      this.mbeanserver.unregisterMBean (this.jmxName);
      
      partition.unregisterRPCHandler(SERVICE_NAME, this);
      partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this);
      partition.unregisterMembershipListener(this);
   
public java.util.CollectiongetAllServices()

      HashSet services = new HashSet();
      services.addAll (localReplicants.keySet ());
      services.addAll (replicants.keySet ());      
      return services;
   
public java.io.SerializablegetCurrentState()

      java.util.Collection services = this.getAllServices ();
      HashMap result = new HashMap ();
      
      java.util.Iterator catsIter = services.iterator ();                       
      while (catsIter.hasNext ())
      {
         String category = (String)catsIter.next ();
         HashMap content = (HashMap)this.replicants.get (category);
         if (content == null)
            content = new HashMap ();
         else
            content = (HashMap)content.clone ();
         
         Serializable local = lookupLocalReplicant(category);
         if (local != null)
            content.put (this.nodeName, local);
         
         result.put (category, content);
      }
      
      // we add the intraviewid cache to the global result
      //
      Object[] globalResult = new Object[] {result, intraviewIdCache};
      return globalResult;
   
protected java.util.VectorgetKeysReplicatedByNode(java.lang.String nodeName)

      Vector result = new Vector ();
      synchronized (replicants)
      {         
         Iterator keysIter = replicants.keySet ().iterator ();
         while (keysIter.hasNext ())
         {
            String key = (String)keysIter.next ();
            HashMap values = (HashMap)replicants.get (key);
            if ( (values != null) && values.containsKey (nodeName) )
            {
               result.add (key);
            }
         }
      }
      return result;
   
public intgetReplicantsViewId(java.lang.String key)

      Integer result = (Integer)this.intraviewIdCache.get (key);
      
      if (result == null)
         return 0;
      else
         return result.intValue ();      
   
public voidinit()

      log.debug("registerRPCHandler");
      partition.registerRPCHandler(SERVICE_NAME, this);
      log.debug("subscribeToStateTransferEvents");
      partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
      log.debug("registerMembershipListener");
      partition.registerMembershipListener(this);

     // subscribed this "sub-service" of HAPartition with JMX
      // TODO: In the future (when state transfer issues will be completed), 
      // we will need to redesign the way HAPartitions and its sub-protocols are
      // registered with JMX. They will most probably be independant JMX services.
      //
      String name = "jboss:service=" + SERVICE_NAME + 
                    ",partitionName=" + this.partition.getPartitionName();
      this.jmxName = new javax.management.ObjectName(name);
      this.mbeanserver.registerMBean(this, jmxName);
    
public booleanisMasterReplica(java.lang.String key)

      if( trace )
         log.trace("isMasterReplica, key="+key);
      // if I am not a replicat, I cannot be the master...
      //
      if (!localReplicants.containsKey (key))
      {
         if( trace )
            log.trace("no localReplicants, key="+key+", isMasterReplica=false");
         return false;
      }

      Vector allNodes = this.partition.getCurrentView ();
      HashMap repForKey = (HashMap)replicants.get(key);
      if (repForKey==null)
      {
         if( trace )
            log.trace("no replicants, key="+key+", isMasterReplica=true");
         return true;
      }
      Vector replicaNodes = new Vector ((repForKey).keySet ());          
      boolean isMasterReplica = false;
      for (int i=0; i<allNodes.size (); i++)
      {
         String aMember = (String)allNodes.elementAt (i);
         if( trace )
            log.trace("Testing member: "+aMember);
         if (replicaNodes.contains (aMember))
         {
            if( trace )
               log.trace("Member found in replicaNodes, isMasterReplica=false");
            break;
         }
         else if (aMember.equals (this.nodeName))
         {
            if( trace )
               log.trace("Member == nodeName, isMasterReplica=true");
            isMasterReplica = true;
            break;
         }
      }
      return isMasterReplica;
   
public java.lang.StringlistContent()

      // we merge all replicants services: local only or not
      //
      java.util.Collection services = this.getAllServices ();

      StringBuffer result = new StringBuffer ();
      java.util.Iterator catsIter = services.iterator ();
      
      result.append ("<pre>");
      
      while (catsIter.hasNext ())
      {
         String category = (String)catsIter.next ();
         HashMap content = (HashMap)this.replicants.get (category);
         if (content == null)
            content = new HashMap ();
         java.util.Iterator keysIter = content.keySet ().iterator ();
                  
         result.append ("-----------------------------------------------\n");
         result.append ("Service : ").append (category).append ("\n\n");
         
         Serializable local = lookupLocalReplicant(category);
         if (local == null)
            result.append ("\t- Service is *not* available locally\n");
         else
            result.append ("\t- Service *is* also available locally\n");

         while (keysIter.hasNext ())
         {
            String location = (String)keysIter.next ();            
            result.append ("\t- ").append(location).append ("\n");
         }
         
         result.append ("\n");
         
      }
      
      result.append ("</pre>");
      
      return result.toString ();
   
public java.lang.StringlistXmlContent()

      // we merge all replicants services: local only or not
      //
      java.util.Collection services = this.getAllServices ();
      StringBuffer result = new StringBuffer ();

      result.append ("<ReplicantManager>\n");

      java.util.Iterator catsIter = services.iterator ();
      while (catsIter.hasNext ())
      {
         String category = (String)catsIter.next ();
         HashMap content = (HashMap)this.replicants.get (category);
         if (content == null)
            content = new HashMap ();
         java.util.Iterator keysIter = content.keySet ().iterator ();
                  
         result.append ("\t<Service>\n");
         result.append ("\t\t<ServiceName>").append (category).append ("</ServiceName>\n");

         
         Serializable local = lookupLocalReplicant(category);
         if (local != null)
         {
            result.append ("\t\t<Location>\n");
            result.append ("\t\t\t<Name local=\"True\">").append (this.nodeName).append ("</Name>\n");
            result.append ("\t\t</Location>\n");
         }

         while (keysIter.hasNext ())
         {
            String location = (String)keysIter.next ();            
            result.append ("\t\t<Location>\n");
            result.append ("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n");
            result.append ("\t\t</Location>\n");
         }
         
         result.append ("\t<Service>\n");
         
      }

      result.append ("<ReplicantManager>\n");
      
      return result.toString ();
   
public java.io.SerializablelookupLocalReplicant(java.lang.String key)

      return (Serializable)localReplicants.get(key);
   
public java.lang.Object[]lookupLocalReplicants()
Cluster callback called when a node wants to know our complete list of local replicants

throws
Exception Thrown if a cluster communication exception occurs
return
A java array of size 2 containing the name of our node in this cluster and the serialized representation of our state

      partitionNameKnown.acquire (); // we don't answer until our name is known
      
      Object[] rtn = {this.nodeName, localReplicants};
      if( trace )
         log.trace ("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + localReplicants.size ());
      return rtn;
   
public java.util.ListlookupReplicants(java.lang.String key)

      Serializable local = lookupLocalReplicant(key);
      HashMap replicant = (HashMap)replicants.get(key);
      if (replicant == null && local == null)
         return null;

      ArrayList rtn = new ArrayList();

      if (replicant == null)
      {
         if (local != null)
            rtn.add(local);
      }
      else 
      {
         // JBAS-2677. Put the replicants in view order.
         ClusterNode[] nodes = partition.getClusterNodes();
         String replNode;
         Object replVal;
         for (int i = 0; i < nodes.length; i++)
         {
            replNode = nodes[i].getName();
            if (local != null && nodeName.equals(replNode))
            {
               rtn.add(local);
               continue;
            }
            
            replVal = replicant.get(replNode);
            if (replVal != null)
               rtn.add(replVal);            
         }
      }
      
      return rtn;
   
public java.util.ListlookupReplicantsNodeNames(java.lang.String key)

      
      boolean locallyReplicated = localReplicants.containsKey (key);
      HashMap replicant = (HashMap)replicants.get(key);
      if (replicant == null && !locallyReplicated)
         return null;

      ArrayList rtn = new ArrayList();
      
      if (replicant == null)
      {   
         if (locallyReplicated)
            rtn.add(this.nodeName);
      }
      else
      {
         // JBAS-2677. Put the replicants in view order.
         Set keys = replicant.keySet();
         ClusterNode[] nodes = partition.getClusterNodes();
         String keyOwner;
         for (int i = 0; i < nodes.length; i++)
         {
            keyOwner = nodes[i].getName();
            if (locallyReplicated && nodeName.equals(keyOwner))
            {
               rtn.add(this.nodeName);
               continue;
            }
            
            if (keys.contains(keyOwner))
               rtn.add(keyOwner);            
         }
      }
      
      return rtn;
   
public voidmembershipChanged(java.util.Vector deadMembers, java.util.Vector newMembers, java.util.Vector allMembers)

      // Here we only care about deadMembers.  Purge all replicant lists of deadMembers
      // and then notify all listening nodes.
      //
     log.info("I am (" + nodeName + ") received membershipChanged event:");
     log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")");
     log.info("New Members : " + newMembers.size()  + " (" + newMembers + ")");
     log.info("All Members : " + allMembers.size()  + " (" + allMembers + ")");
      purgeDeadMembers(deadMembers);
      
      // we don't need to merge members anymore
   
public voidmembershipChangedDuringMerge(java.util.Vector deadMembers, java.util.Vector newMembers, java.util.Vector allMembers, java.util.Vector originatingGroups)

      // Here we only care about deadMembers.  Purge all replicant lists of deadMembers
      // and then notify all listening nodes.
      //
      log.info("Merging partitions...");
      log.info("Dead members: " + deadMembers.size());
      log.info("Originating groups: " + originatingGroups);
      purgeDeadMembers(deadMembers);
      if (newMembers.size() > 0) 
      {
         new MergeMembers().start();
      }
   
protected voidmergeMembers()

      boolean isAlreadyMerging = ClusterMergeStatus.isMergeInProcess();
      try
      {
         ClusterMergeStatus.startMergeProcess();
         
         log.debug("Start merging members in DRM service...");
         java.util.HashSet notifies = new java.util.HashSet ();
         ArrayList rsp = partition.callMethodOnCluster(SERVICE_NAME,
                                        "lookupLocalReplicants",
                                        new Object[]{}, new Class[]{}, true);
         if (rsp.size() == 0)
            log.debug("No responses from other nodes during the DRM merge process.");
         else 
         { 
            log.debug("The DRM merge process has received " + rsp.size() + " answers");
         }
         for (int i = 0; i < rsp.size(); i++)
         {
            Object o = rsp.get(i);
            if (o == null)
            {
               log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
               continue;
            }
            else if (o instanceof Throwable)
            {
               log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable) o);
               continue;
            }
            
            Object[] objs = (Object[]) o;
            String node = (String)objs[0];
            Map replicants = (Map)objs[1];
            Iterator keys = replicants.keySet().iterator();
            
            //FIXME: We don't remove keys in the merge process but only add new keys!
            while (keys.hasNext())
            {
               String key = (String)keys.next();
               // done to reduce duplicate notifications
               if (!replicantEntryAlreadyExists  (key, node))
               {
                  addReplicant(key, node, (Serializable)replicants.get(key));
                  notifies.add (key);
               }
            }
            
            Vector currentStatus = getKeysReplicatedByNode (node);
            if (currentStatus.size () > replicants.size ())
            {
               // The merge process needs to remove some (now)
               // unexisting keys
               //
               for (int currentKeysId=0, currentKeysMax=currentStatus.size (); currentKeysId<currentKeysMax; currentKeysId++)
               {
                  String theKey = (String)currentStatus.elementAt (currentKeysId);
                  if (!replicants.containsKey (theKey))
                  {
                     removeReplicant (theKey, node);
                     notifies.add(theKey);
                  }
               }
            }
         }   
         
         Iterator notifIter = notifies.iterator ();
         while (notifIter.hasNext ())
         {
            String key = (String)notifIter.next ();
            notifyKeyListeners(key, lookupReplicants(key));
         }
         log.debug ("..Finished merging members in DRM service");

      }               
      catch (Exception ex)
      {
         log.error("merge failed", ex);
      }
      finally
      {
         if (!isAlreadyMerging)
            ClusterMergeStatus.endMergeProcess();
      }
   
protected static synchronized intnextThreadID()

      return threadID ++;
   
protected voidnotifyKeyListeners(java.lang.String key, java.util.List newReplicants)
Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed

param
key The replicant key name
param
newReplicants The new list of replicants

      if( trace )
         log.trace("notifyKeyListeners");

      // we first update the intra-view id for this particular key
      //
      int newId = updateReplicantsHashId (key);
      
      ArrayList listeners = (ArrayList)keyListeners.get(key);
      if (listeners == null)
      {
         if( trace )
            log.trace("listeners is null");
         return;
      }
      
      // ArrayList's iterator is not thread safe
      DistributedReplicantManager.ReplicantListener[] toNotify = null;
      synchronized(listeners) 
      {
         toNotify = new DistributedReplicantManager.ReplicantListener[listeners.size()];
         toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners.toArray(toNotify);
      }
      
      if( trace )
         log.trace("notifying " + toNotify.length + " listeners for key change: " + key);
      for (int i = 0; i < toNotify.length; i++)
      {
         if (toNotify[i] != null)
            toNotify[i].replicantsChanged(key, newReplicants, newId);
      }
   
public voidprocessEvent(java.lang.Object event)

      KeyChangeEvent kce = (KeyChangeEvent) event;
      notifyKeyListeners(kce.key, kce.replicants);
   
protected voidpurgeDeadMembers(java.util.Vector deadMembers)
get rid of dead members from replicant list return true if anything was purged.

      if (deadMembers.size() <= 0)
         return;

      log.debug("purgeDeadMembers, "+deadMembers);
      try
      {
         synchronized(replicants)
         {
            Iterator keys = replicants.keySet().iterator();
            while (keys.hasNext())
            {
               String key = (String)keys.next();
               HashMap replicant = (HashMap)replicants.get(key);
               boolean modified = false;
               for (int i = 0; i < deadMembers.size(); i++)
               {
                  String node = deadMembers.elementAt(i).toString();
                  log.debug("trying to remove deadMember " + node + " for key " + key);
                  Object removed = replicant.remove(node);
                  if (removed != null) 
                  {
                     log.debug(node + " was removed");
                     modified = true;
                  }
                  else
                  {
                     log.debug(node + " was NOT removed!!!");
                  }
               }
               if (modified)
               {
                  notifyKeyListeners(key, lookupReplicants(key));
               }
            }
         }
      }
      catch (Exception ex)
      {
         log.error("purgeDeadMembers failed", ex);
      }
   
public voidregisterListener(java.lang.String key, DistributedReplicantManager.ReplicantListener subscriber)

      synchronized(keyListeners)
      {
         ArrayList listeners = (ArrayList)keyListeners.get(key);
         if (listeners == null)
         {
            listeners = new ArrayList();
            keyListeners.put(key, listeners);
         }
         listeners.add(subscriber);
      }
   
public voidremove(java.lang.String key)

      
      partitionNameKnown.acquire (); // we don't propagate until our name is known
      
      // optimisation: we don't make a costly network call
      // if there is nothing to remove
      if (localReplicants.containsKey(key))
      {
         Object[] args = {key, this.nodeName};
         partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
         removeLocal(key);
      }
   
protected voidremoveLocal(java.lang.String key)

      synchronized(localReplicants)
      {
         localReplicants.remove(key);
         List result = lookupReplicants(key);
         if (result == null)
            result = new ArrayList (); // don't pass null but an empty list
         notifyKeyListeners(key, result);
      }
   
protected booleanremoveReplicant(java.lang.String key, java.lang.String nodeName)

      synchronized(replicants)
      {
         HashMap replicant = (HashMap)replicants.get(key);
         if (replicant == null) return false;
         Object removed = replicant.remove(nodeName);
         if (removed != null)
         {
            Collection values = replicant.values();               
            if (values.size() == 0)
            {
               replicants.remove(key);
            }
            return true;
         }
      }
      return false;
   
protected booleanreplicantEntryAlreadyExists(java.lang.String key, java.lang.String nodeName)
Indicates if the a replicant already exists for a given key/node pair

param
key replicant key name
param
nodeName name of the node
return
a boolean indicating if a replicant for the given node exists for the given key

      return replicantEntryAlreadyExists (replicants, key, nodeName);
   
protected booleanreplicantEntryAlreadyExists(java.util.Map map, java.lang.String key, java.lang.String nodeName)
Indicates if the a replicant already exists for a given key/node pair in the give data structure

         HashMap rep = (HashMap)map.get(key);
         if (rep == null)
            return false;
         else
            return rep.containsKey (nodeName);
   
protected voidrepublishLocalReplicants()

      try
      {
         if( trace )
            log.trace("Start Re-Publish local replicants in DRM");

         HashMap localReplicants;
         synchronized (this.localReplicants)
         {
            localReplicants = new HashMap(this.localReplicants);
         }

         Iterator entries = localReplicants.entrySet().iterator();
         while( entries.hasNext() )
         {
            Map.Entry entry = (Map.Entry) entries.next();
            String key = (String) entry.getKey();
            Object replicant = entry.getValue();
            if (replicant != null)
            {
               if( trace )
                  log.trace("publishing, key=" + key + ", value=" + replicant);

               Object[] args = {key, this.nodeName, replicant};

               partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
               notifyKeyListeners(key, lookupReplicants(key));
            }
         }
         if( trace )
            log.trace("End Re-Publish local replicants");
      }
      catch (Exception e)
      {
         log.error("Re-Publish failed", e);
      }
   
public voidsetCurrentState(java.io.Serializable newState)

      Object[] globalState = (Object[])newState;
      
      HashMap map = (HashMap)globalState[0];
      this.replicants.putAll(map);
      this.intraviewIdCache = (HashMap)globalState[1];

      if( trace )
      {
         log.trace(nodeName + ": received new state, will republish local replicants");
      }
      MembersPublisher publisher = new MembersPublisher();
      publisher.start();
   
public voidstart()

      this.nodeName = this.partition.getNodeName ();
      
      // Create the asynch listener handler thread
      asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler");
      asynchHandler.start();

      partitionNameKnown.release (); // partition name is now known!
      
      //log.info("mergemembers");
      //mergeMembers();
   
public voidstop()

      // BES 200604 -- implication of NR's JBLCUSTER-38 change.  Moving to
      // destroy allows restart of HAPartition while local registrations 
      // survive -- stopping partition does not stop all registered services
      // e.g. ejbs; if we maintain their registrations we can pass them to
      // the cluster when we restart.  However, we are leaving all the remote
      // replicants we have registered around, so they will still be included 
      // as targets if anyone contacts our EJB while partition is stopped.
      // Probably OK; if they aren't valid the client will find this out.
      
//    NR 200505 : [JBCLUSTER-38] move to destroy
//      if (localReplicants != null)
//      {
//         synchronized(localReplicants)
//         {
//            while (! localReplicants.isEmpty ())
//            {               
//               this.remove ((String)localReplicants.keySet().iterator().next ());
//            }
//         }
//      }
      
      // Stop the asynch handler thread
      try
      {
         asynchHandler.stop();
      }
      catch( Exception e)
      {
         log.warn("Failed to stop asynchHandler", e);
      }
      
//    NR 200505 : [JBCLUSTER-38] move to destroy
//      this.mbeanserver.unregisterMBean (this.jmxName);
   
public voidunregisterListener(java.lang.String key, DistributedReplicantManager.ReplicantListener subscriber)

      synchronized(keyListeners)
      {
         ArrayList listeners = (ArrayList)keyListeners.get (key);
         if (listeners == null) return;
         
         listeners.remove(subscriber);
         if (listeners.size() == 0)
            keyListeners.remove(key);

      }
   
protected intupdateReplicantsHashId(java.lang.String key)

      // we first get a list of all nodes names that replicate this key
      //
      List nodes = this.lookupReplicantsNodeNames (key);
      int result = 0;
      
      if ( (nodes == null) || (nodes.size () == 0) )
      {
         // no nore replicants for this key: we uncache our view id
         //
         this.intraviewIdCache.remove (key);
      }
      else
      {
         result = this.calculateReplicantsHash (nodes);
         this.intraviewIdCache.put (key, new Integer (result));
      }
      
      return result;