FileDocCategorySizeDatePackage
HAPartitionImpl.javaAPI DocJBoss 4.2.144389Fri Jul 13 20:52:38 BST 2007org.jboss.ha.framework.server

HAPartitionImpl

public class HAPartitionImpl extends org.jgroups.blocks.RpcDispatcher implements org.jboss.ha.framework.interfaces.HAPartition, AsynchEventHandler.AsynchEventProcessor, org.jgroups.MembershipListener, org.jgroups.MessageListener
This class is an abstraction class for a JGroups RPCDispatch and JChannel. It is a default implementation of HAPartition for the JGroups framework
author
Sacha Labourey.
author
Bill Burke.
author
Scott.Stark@jboss.org
version
$Revision: 62255 $

Fields Summary
protected HashMap
rpcHandlers
protected HashMap
stateHandlers
protected boolean
allowSyncListeners
Do we send any membership change notifications synchronously?
protected ArrayList
synchListeners
The synch HAMembershipListener and HAMembershipExtendedListeners
protected ArrayList
asynchListeners
The asynch HAMembershipListener and HAMembershipExtendedListeners
protected AsynchEventHandler
asynchHandler
The handler used to send membership change notifications asynchronously
protected Vector
members
The current cluster partition members
protected Vector
jgmembers
public Vector
history
protected Vector
otherMembers
The partition members other than this node
protected Vector
jgotherMembers
protected String
partitionName
The JChannel name
protected org.jgroups.stack.IpAddress
localJGAddress
the local JG IP Address
protected String
nodeName
The cluster transport protocol address string
protected org.jboss.ha.framework.interfaces.ClusterNode
me
me as a ClusterNode
protected long
timeout
The timeout for cluster RPC calls
protected org.jgroups.JChannel
channel
The JGroups partition channel
protected DistributedReplicantManagerImpl
replicantManager
The cluster replicant manager
protected DistributedStateImpl
dsManager
The cluster state manager
protected Logger
log
The cluster instance log category
protected Logger
clusterLifeCycleLog
protected long
currentViewId
The current cluster view id
protected MBeanServer
server
The JMX MBeanServer to use for registrations
protected long
state_transfer_timeout
Number of ms to wait for state
protected boolean
bindIntoJndi
Whether to bind the partition into JNDI
protected boolean
isStateSet
True if state was initialized during start-up.
protected Exception
setStateException
An exception occuring upon fetch state.
private final Object
stateLock
Constructors Summary
public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection, MBeanServer server)

      this(partitionName, channel, deadlock_detection);
      this.server = server;
   
public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection)

      super(channel, null, null, new Object(), deadlock_detection); // init RpcDispatcher with a fake target object
      this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
      this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
      this.channel = channel;
      this.partitionName = partitionName;
      this.history = new Vector();
      this.setMarshaller(new MarshallerImpl());
      logHistory ("Partition object created");
   
Methods Summary
protected voidbind(java.lang.String jndiName, java.lang.Object who, java.lang.Class classType, javax.naming.Context ctx)
Helper method that binds the partition in the JNDI tree.

param
jndiName Name under which the object must be bound
param
who Object to bind in JNDI
param
classType Class type under which should appear the bound object
param
ctx Naming context under which we bind the object
throws
Exception Thrown if a naming exception occurs during binding

      // Ah ! This service isn't serializable, so we use a helper class
      //
      NonSerializableFactory.bind(jndiName, who);
      Name n = ctx.getNameParser("").parse(jndiName);
      while (n.size () > 1)
      {
         String ctxName = n.get (0);
         try
         {
            ctx = (Context)ctx.lookup (ctxName);
         }
         catch (NameNotFoundException e)
         {
            log.debug ("creating Subcontext" + ctxName);
            ctx = ctx.createSubcontext (ctxName);
         }
         n = n.getSuffix (1);
      }

      // The helper class NonSerializableFactory uses address type nns, we go on to
      // use the helper class to bind the service object in JNDI
      //
      StringRefAddr addr = new StringRefAddr("nns", jndiName);
      Reference ref = new Reference(classType.getName (), addr, NonSerializableFactory.class.getName (), null);
      ctx.rebind (n.get (0), ref);
   
public voidblock()

public voidcallAsynchMethodOnCluster(java.lang.String objName, java.lang.String methodName, java.lang.Object[] args, boolean excludeSelf)

param
objName
param
methodName
param
args
param
excludeSelf
throws
Exception
deprecated
Use {@link #callAsynchMethodOnCluster(String, String, Object[], Class[], boolean)} instead

      callAsynchMethodOnCluster(objName, methodName, args, null, excludeSelf);
   
public voidcallAsynchMethodOnCluster(java.lang.String objName, java.lang.String methodName, java.lang.Object[] args, java.lang.Class[] types, boolean excludeSelf)
This function is an abstraction of RpcDispatcher for asynchronous messages

      MethodCall m = null;
      boolean trace = log.isTraceEnabled();

      if(types != null)
         m=new MethodCall(objName + "." + methodName, args, types);
      else
         m=new MethodCall(objName + "." + methodName, args);

      if (excludeSelf)
      {
         if( trace )
         {
            log.trace("callAsynchMethodOnCluster(true), objName="+objName
               +", methodName="+methodName+", members="+jgotherMembers);
         }
         this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, timeout);
      }
      else
      {
         if( trace )
         {
            log.trace("callAsynchMethodOnCluster(false), objName="+objName
               +", methodName="+methodName+", members="+members);
         }
         this.callRemoteMethods(null, m, GroupRequest.GET_NONE, timeout);
      }
   
public java.util.ArrayListcallMethodOnCluster(java.lang.String objName, java.lang.String methodName, java.lang.Object[] args, boolean excludeSelf)

param
objName
param
methodName
param
args
param
excludeSelf
return
throws
Exception
deprecated
Use {@link #callMethodOnCluster(String,String,Object[],Class[], boolean)} instead

      return callMethodOnCluster(objName, methodName, args, null, excludeSelf);
   
public java.util.ArrayListcallMethodOnCluster(java.lang.String objName, java.lang.String methodName, java.lang.Object[] args, java.lang.Class[] types, boolean excludeSelf)
This function is an abstraction of RpcDispatcher.

      return callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.timeout);
   
public java.util.ArrayListcallMethodOnCluster(java.lang.String objName, java.lang.String methodName, java.lang.Object[] args, java.lang.Class[] types, boolean excludeSelf, long methodTimeout)

      ArrayList rtn = new ArrayList();
      MethodCall m=null;
      RspList rsp = null;
      boolean trace = log.isTraceEnabled();

      if(types != null)
         m=new MethodCall(objName + "." + methodName, args, types);
      else
         m=new MethodCall(objName + "." + methodName, args);

      if (excludeSelf)
      {
         if( trace )
         {
            log.trace("callMethodOnCluster(true), objName="+objName
               +", methodName="+methodName+", members="+jgotherMembers);
         }
         rsp = this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
      }
      else
      {
         if( trace )
         {
            log.trace("callMethodOnCluster(false), objName="+objName
               +", methodName="+methodName+", members="+members);
         }
         rsp = this.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
      }

      if (rsp != null)
      {
         for (int i = 0; i < rsp.size(); i++)
         {
            Object item = rsp.elementAt(i);
            if (item instanceof Rsp)
            {
               Rsp response = (Rsp) item;
               // Only include received responses
               boolean wasReceived = response.wasReceived();
               if( wasReceived == true )
               {
                  item = response.getValue();
                  if (!(item instanceof NoHandlerForRPC))
                     rtn.add(item);
               }
               else if( trace )
                  log.trace("Ignoring non-received response: "+response);
            }
            else
            {
               if (!(item instanceof NoHandlerForRPC))
                  rtn.add(item);
               else if( trace )
                  log.trace("Ignoring NoHandlerForRPC");
            }
         }
      }

      return rtn;
    
public java.util.ArrayListcallMethodOnCoordinatorNode(java.lang.String objName, java.lang.String methodName, java.lang.Object[] args, java.lang.Class[] types, boolean excludeSelf)
Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the cluster. and is replaced

param
objName
param
methodName
param
args
param
types
param
excludeSelf
return
throws
Exception

      return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf,this.timeout);
      
public java.util.ArrayListcallMethodOnCoordinatorNode(java.lang.String objName, java.lang.String methodName, java.lang.Object[] args, java.lang.Class[] types, boolean excludeSelf, long methodTimeout)
Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the cluster. and is replaced

param
objName
param
methodName
param
args
param
types
param
excludeSelf
param
methodTimeout
return
throws
Exception

         ArrayList rtn = new ArrayList();
         MethodCall m=null;
         RspList rsp = null;
         boolean trace = log.isTraceEnabled();

         if(types != null)
            m=new MethodCall(objName + "." + methodName, args, types);
         else
            m=new MethodCall(objName + "." + methodName, args);

         if( trace )
         {
            log.trace("callMethodOnCoordinatorNode(false), objName="+objName
               +", methodName="+methodName);
         }

         // the first cluster view member is the coordinator
         Vector coordinatorOnly = new Vector();
         // If we are the coordinator, only call ourself if 'excludeSelf' is false
         if (false == isCurrentNodeCoordinator () ||
             false == excludeSelf)
            coordinatorOnly.addElement(this.jgmembers.elementAt (0));

         rsp = this.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);

         if (rsp != null)
         {
            for (int i = 0; i < rsp.size(); i++)
            {
               Object item = rsp.elementAt(i);
               if (item instanceof Rsp)
               {
                  Rsp response = (Rsp) item;
                  // Only include received responses
                  boolean wasReceived = response.wasReceived();
                  if( wasReceived == true )
                  {
                     item = response.getValue();
                     if (!(item instanceof NoHandlerForRPC))
                        rtn.add(item);
                  }
                  else if( trace )
                     log.trace("Ignoring non-received response: "+response);
               }
               else
               {
                  if (!(item instanceof NoHandlerForRPC))
                     rtn.add(item);
                  else if( trace )
                     log.trace("Ignoring NoHandlerForRPC");
               }
            }
         }

         return rtn;
       
public voidclosePartition()

      logHistory ("Closing partition");
      log.info("Closing partition " + partitionName);

      try
      {
         asynchHandler.stop();
      }
      catch( Exception e)
      {
         log.warn("Failed to stop asynchHandler", e);
      }

      // Stop the DRM and DS services
      //
      try
      {
         this.replicantManager.stop();
      }
      catch (Exception e)
      {
         log.error("operation failed", e);
      }

      try
      {
         this.dsManager.stop();
      }
      catch (Exception e)
      {
         log.error("operation failed", e);
      }

//    NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
//    add the destroyPartition() step
      try
      {
//          channel.close();
          channel.disconnect();
      }
      catch (Exception e)
      {
         log.error("operation failed", e);
      }

      if (bindIntoJndi)
      {
         String boundName = "/HAPartition/" + partitionName;
         InitialContext ctx = new InitialContext();
         try
         {
            ctx.unbind(boundName);
         }
         finally
         {
            ctx.close();
         }
         NonSerializableFactory.unbind (boundName);    	  
      }
      
      log.info("Partition " + partitionName + " closed.");
   
public voiddestroyPartition()


      try
      {
         this.replicantManager.destroy();
      }
      catch (Exception e)
      {
         log.error("operation failed", e);
      }

      try
      {
         this.dsManager.destroy();
      }
      catch (Exception e)
      {
         log.error("operation failed", e);
      }
      try
      {
         channel.close();
      }
      catch (Exception e)
      {
         log.error("operation failed", e);
      }

      log.info("Partition " + partitionName + " destroyed.");
  
protected voidfetchState()

      log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
      long start, stop;
      isStateSet = false;
      start = System.currentTimeMillis();
      boolean rc = channel.getState(null, this.state_transfer_timeout);
      if (rc)
      {
         synchronized (stateLock)
         {
            while (!isStateSet)
            {
               if (setStateException != null)
                  throw setStateException;

               try
               {
                  stateLock.wait();
               }
               catch (InterruptedException iex)
               {
               }
            }
         }
         stop = System.currentTimeMillis();
         log.info("state was retrieved successfully (in " + (stop - start) + " milliseconds)");
      }
      else
      {
         // No one provided us with state.
         // We need to find out if we are the coordinator, so we must
         // block until viewAccepted() is called at least once

         synchronized (members)
         {
            while (members.size() == 0)
            {
               log.debug("waiting on viewAccepted()");
               try
               {
                  members.wait();
               }
               catch (InterruptedException iex)
               {
               }
            }
         }

         if (isCurrentNodeCoordinator())
         {
            log.info("State could not be retrieved (we are the first member in group)");
         }
         else
         {
            throw new IllegalStateException("Initial state transfer failed: " +
               "Channel.getState() returned false");
         }
      }
   
public booleangetAllowSynchronousMembershipNotifications()

      return allowSyncListeners;
   
public booleangetBindIntoJndi()

	   return bindIntoJndi;
   
public org.jboss.ha.framework.interfaces.ClusterNodegetClusterNode()

      return me;
   
public org.jboss.ha.framework.interfaces.ClusterNode[]getClusterNodes()

      ClusterNode[] nodes = new ClusterNode[this.members.size()];
      this.members.toArray(nodes);
      return nodes;
   
public java.util.VectorgetCurrentView()

      Vector result = new Vector (this.members.size());
      for (int i = 0; i < members.size(); i++)
      {
         result.add( ((ClusterNode) members.elementAt(i)).getName() );
      }
      return result;
   
public longgetCurrentViewId()

      return this.currentViewId;
   
protected java.util.VectorgetDeadMembers(java.util.Vector oldMembers, java.util.Vector newMembers)
Helper method that returns a vector of dead members from two input vectors: new and old vectors of two views. Dead members are old - new members.

param
oldMembers Vector of old members
param
newMembers Vector of new members
return
Vector of members that have died between the two views, can be empty.

      boolean debug = log.isDebugEnabled();
      if(oldMembers == null) oldMembers=new Vector();
      if(newMembers == null) newMembers=new Vector();
      Vector dead=(Vector)oldMembers.clone();
      dead.removeAll(newMembers);
      if(dead.size() > 0 && debug)
         log.debug("dead members: " + dead);
      return dead;
   
public org.jboss.ha.framework.interfaces.DistributedReplicantManagergetDistributedReplicantManager()

      return replicantManager;
   
public org.jboss.ha.framework.interfaces.DistributedStategetDistributedStateService()

      return this.dsManager;
   
public longgetMethodCallTimeout()

      return timeout;
   
protected java.util.VectorgetNewMembers(java.util.Vector oldMembers, java.util.Vector allMembers)
Helper method that returns a vector of new members from two input vectors: new and old vectors of two views.

param
oldMembers Vector of old members
param
allMembers Vector of new members
return
Vector of members that have joined the partition between the two views

      if(oldMembers == null) oldMembers=new Vector();
      if(allMembers == null) allMembers=new Vector();
      Vector newMembers=(Vector)allMembers.clone();
      newMembers.removeAll(oldMembers);
      return newMembers;
   
public java.lang.StringgetNodeName()

      return nodeName;
   
public java.lang.StringgetPartitionName()

      return partitionName;
   
public byte[]getState()

      logHistory ("getState called on partition");
      boolean debug = log.isDebugEnabled();
      
      log.debug("getState called.");
      try
      {
         // we now get the sub-state of each HAPartitionStateTransfer subscribers and
         // build a "macro" state
         //
         HashMap state = new HashMap();
         Iterator keys = stateHandlers.keySet().iterator();
         while (keys.hasNext())
         {
            String key = (String)keys.next();
            HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
            if (debug)
               log.debug("getState for " + key);
            state.put(key, subscriber.getCurrentState());
         }
         return objectToByteBuffer(state);
      }
      catch (Exception ex)
      {
         log.error("getState failed", ex);
      }
      return null;
   
public longgetStateTransferTimeout()

      return state_transfer_timeout;
   
public java.lang.Objecthandle(org.jgroups.Message req)
Message contains MethodCall. Execute it against *this* object and return result. Use MethodCall.Invoke() to do this. Return result. This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.

param
req The org.jgroups. representation of the method invocation
return
The serializable return value from the invocation

      Object body = null;
      Object retval = null;
      MethodCall  method_call = null;
      boolean trace = log.isTraceEnabled();
      
      if( trace )
         log.trace("Partition " + partitionName + " received msg");
      if(req == null || req.getBuffer() == null)
      {
         log.warn("message or message buffer is null !");
         return null;
      }
      
      try
      {
         body = objectFromByteBuffer(req.getBuffer());
      }
      catch(Exception e)
      {
         log.warn("failed unserializing message buffer (msg=" + req + ")", e);
         return null;
      }
      
      if(body == null || !(body instanceof MethodCall))
      {
         log.warn("message does not contain a MethodCall object !");
         return null;
      }
      
      // get method call informations
      //
      method_call = (MethodCall)body;
      String methodName = method_call.getName();      
      
      if( trace )
         log.trace("pre methodName: " + methodName);
      
      int idx = methodName.lastIndexOf('.");
      String handlerName = methodName.substring(0, idx);
      String newMethodName = methodName.substring(idx + 1);
      
      if( trace ) 
      {
         log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
         log.trace("Handle: " + methodName);
      }
      
      // prepare method call
      method_call.setName(newMethodName);
      Object handler = rpcHandlers.get(handlerName);
      if (handler == null)
      {
         if( trace )
            log.debug("No rpc handler registered under: "+handlerName);
         return new NoHandlerForRPC();
      }

      /* Invoke it and just return any exception with trace level logging of
      the exception. The exception semantics of a group rpc call are weak as
      the return value may be a normal return value or the exception thrown.
      */
      try
      {
         retval = method_call.invoke(handler);
         if( trace )
            log.trace("rpc call return value: "+retval);
      }
      catch (Throwable t)
      {
         if( trace )
            log.trace("rpc call threw exception", t);
         retval = t;
      }
      
      return retval;
   
public voidinit()

      log.info("Initializing");
      logHistory ("Initializing partition");

      // Subscribe to dHA events comming generated by the org.jgroups. protocol stack
      //
      log.debug("setMembershipListener");
      setMembershipListener(this);
      log.debug("setMessageListener");
      setMessageListener(this);
      
      // Create the DRM and link it to this HAPartition
      //
      log.debug("create replicant manager");
      this.replicantManager = new DistributedReplicantManagerImpl(this, this.server);
      log.debug("init replicant manager");
      this.replicantManager.init();
      log.debug("bind replicant manager");
      
      // Create the DS and link it to this HAPartition
      //
      log.debug("create distributed state");
      this.dsManager = new DistributedStateImpl(this, this.server);
      log.debug("init distributed state service");
      this.dsManager.init();
      log.debug("bind distributed state service");

      // Create the asynchronous handler for view changes
      asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
      
      log.debug("done initing.");
   
public booleanisCurrentNodeCoordinator()

      if(this.members == null || this.members.size() == 0 || this.me == null)
         return false;
     return this.members.elementAt (0).equals (this.me);
   
public voidlogHistory(java.lang.String message)

      try
      {
         history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
      }
      catch (Exception ignored){}
   
protected voidnotifyListeners(java.util.ArrayList theListeners, long viewID, java.util.Vector allMembers, java.util.Vector deadMembers, java.util.Vector newMembers, java.util.Vector originatingGroups)

      log.debug("Begin notifyListeners, viewID: "+viewID);
      
      synchronized(theListeners)
      {
         // JBAS-3619 -- don't hold synch lock while notifying
         theListeners = (ArrayList) theListeners.clone();
      }
      
      for (int i = 0; i < theListeners.size(); i++)
      {
         HAMembershipListener aListener = null;
         try
         {
            aListener = (HAMembershipListener) theListeners.get(i);
            if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
            {
               HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
               exListener.membershipChangedDuringMerge (deadMembers, newMembers,
                  allMembers, originatingGroups);
            }
            else
            {
               aListener.membershipChanged(deadMembers, newMembers, allMembers);
            }
         }
         catch (Throwable e)
         {
            // a problem in a listener should not prevent other members to receive the new view
            log.warn("HAMembershipListener callback failure: "+aListener, e);
         }
      }
      log.debug("End notifyListeners, viewID: "+viewID);
   
public static java.lang.ObjectobjectFromByteBuffer(byte[] buffer)
Creates an object from a byte buffer


   // Static --------------------------------------------------------
   
              
          
   
      if(buffer == null) 
         return null;

      ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
      MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
      return mvis.readObject();
   
public static byte[]objectToByteBuffer(java.lang.Object obj)
Serializes an object into a byte buffer. The object has to implement interface Serializable or Externalizable

      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
      mvos.writeObject(obj);
      mvos.flush();
      return baos.toByteArray();
   
public voidprocessEvent(java.lang.Object event)

      ViewChangeEvent vce = (ViewChangeEvent) event;
      notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
            vce.deadMembers, vce.newMembers, vce.originatingGroups);
      
   
public voidreceive(org.jgroups.Message msg)

 /* complete */
public voidregisterMembershipListener(HAMembershipListener listener)

      boolean isAsynch = (this.allowSyncListeners == false) 
            || (listener instanceof AsynchHAMembershipListener)
            || (listener instanceof AsynchHAMembershipExtendedListener);
      if( isAsynch ) {
         synchronized(this.asynchListeners) {
            this.asynchListeners.add(listener);
         }
      }
      else  { 
         synchronized(this.synchListeners) {
            this.synchListeners.add(listener);
         }
      }
   
public voidregisterRPCHandler(java.lang.String objName, java.lang.Object subscriber)

      rpcHandlers.put(objName, subscriber);
   
public voidsetAllowSynchronousMembershipNotifications(boolean allowSync)

      
      this.allowSyncListeners = allowSync;
   
public voidsetBindIntoJndi(boolean bind)

	   bindIntoJndi = bind;
   
public voidsetMethodCallTimeout(long timeout)

      this.timeout=timeout;
   
public voidsetState(byte[] obj)

      logHistory ("setState called on partition");
      try
      {
         log.debug("setState called");
         if (obj == null)
         {
            log.debug("state is null");
            return;
         }
         
         long used_mem_before, used_mem_after;
         int state_size=obj != null? obj.length : 0;
         Runtime rt=Runtime.getRuntime();
         used_mem_before=rt.totalMemory() - rt.freeMemory();

         HashMap state = (HashMap)objectFromByteBuffer(obj);
         java.util.Iterator keys = state.keySet().iterator();
         while (keys.hasNext())
         {
            String key = (String)keys.next();
            log.debug("setState for " + key);
            Object someState = state.get(key);
            HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
            if (subscriber != null)
            {
               try
               {
                  subscriber.setCurrentState((java.io.Serializable)someState);
               }
               catch (Exception e)
               {
                  // Don't let issues with one subscriber affect others
                  // unless it is DRM or DS, which are really internal 
                  // functions of the HAPartition
                  if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key) 
                        || DistributedStateImpl.SERVICE_NAME.equals(key))
                  {
                     if (e instanceof RuntimeException)
                        throw (RuntimeException) e;
                     else
                        throw new RuntimeException(e);
                  }
                  else
                  {
                     log.error("Caught exception setting state to " + subscriber, e);
                  }
               }
            }
            else
            {
               log.debug("There is no stateHandler for: " + key);
            }
         }

         used_mem_after=rt.totalMemory() - rt.freeMemory();
         log.debug("received a state of " + state_size + " bytes; expanded memory by " +
               (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
               ", used memory after: " + used_mem_after + ")");
         
         isStateSet = true;
      }
      catch (Throwable t)
      {
         log.error("failed setting state", t);
         if (t instanceof Exception)
            setStateException = (Exception) t;
         else
            setStateException = new Exception(t);
      }
      finally
      {
         synchronized (stateLock)
         {
            // Notify wait that state has been set.
            stateLock.notifyAll();
         }
      }
   
public voidsetStateTransferTimeout(long state_transfer_timeout)

      this.state_transfer_timeout=state_transfer_timeout;
   
public voidstartPartition()

      // get current JG group properties
      //
      logHistory ("Starting partition");
      log.debug("get nodeName");
      this.localJGAddress = (IpAddress)channel.getLocalAddress();
      this.me = new ClusterNode(this.localJGAddress);
      this.nodeName = this.me.getName();

      log.debug("Get current members");
      View view = channel.getView();
      this.jgmembers = (Vector)view.getMembers().clone();
      this.members = translateAddresses(this.jgmembers); // TRANSLATE
      log.info("Number of cluster members: " + members.size());
      for(int m = 0; m > members.size(); m ++)
      {
         Object node = members.get(m);
         log.debug(node);
      }
      // Keep a list of other members only for "exclude-self" RPC calls
      //
      this.jgotherMembers = (Vector)view.getMembers().clone();
      this.jgotherMembers.remove (channel.getLocalAddress());
      this.otherMembers = translateAddresses(this.jgotherMembers); // TRANSLATE
      log.info ("Other members: " + this.otherMembers.size ());

      verifyNodeIsUnique (view.getMembers());

      // Update the initial view id
      //
      this.currentViewId = view.getVid().getId();

      // We must now synchronize new state transfer subscriber
      //
      fetchState();
      
      // We start now able to start our DRM and DS
      //
      this.replicantManager.start();
      this.dsManager.start();

      // Start the asynch listener handler thread
      asynchHandler.start();
      
      // Bind ourself in the public JNDI space if configured to do so
      if (!bindIntoJndi)
    	  return;
      
      Context ctx = new InitialContext();
      this.bind("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx);
   
public voidsubscribeToStateTransferEvents(java.lang.String objectName, HAPartitionStateTransfer subscriber)

      stateHandlers.put(objectName, subscriber);
   
public voidsuspect(org.jgroups.Address suspected_mbr)

      
      logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
      if (isCurrentNodeCoordinator ())
         clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
      else
         log.info("Suspected member: " + suspected_mbr);
   
protected java.util.VectortranslateAddresses(java.util.Vector jgAddresses)

      if (jgAddresses == null)
         return null;

      Vector result = new Vector (jgAddresses.size());
      for (int i = 0; i < jgAddresses.size(); i++)
      {
         IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
         result.add(new ClusterNode (addr));
      }

      return result;
   
public voidunregisterMembershipListener(HAMembershipListener listener)

      boolean isAsynch = (this.allowSyncListeners == false) 
            || (listener instanceof AsynchHAMembershipListener)
            || (listener instanceof AsynchHAMembershipExtendedListener);
      if( isAsynch ) {
         synchronized(this.asynchListeners) {
            this.asynchListeners.remove(listener);
         }
      }
      else  { 
         synchronized(this.synchListeners) {
            this.synchListeners.remove(listener);
         }
      }
   
public voidunregisterRPCHandler(java.lang.String objName, java.lang.Object subscriber)

      rpcHandlers.remove(objName);
   
public voidunsubscribeFromStateTransferEvents(java.lang.String objectName, HAPartitionStateTransfer subscriber)

      stateHandlers.remove(objectName);
   
protected voidverifyNodeIsUnique(java.util.Vector javaGroupIpAddresses)

      byte[] localUniqueName = this.localJGAddress.getAdditionalData();
      if (localUniqueName == null)
      {
         log.warn("No additional information has been found in the JavaGroup address: " +
                  "make sure you are running with a correct version of JGroups and that the protocol " +
                  " you are using supports the 'additionalData' behaviour");
         return;
      }

      for (int i = 0; i < javaGroupIpAddresses.size(); i++)
      {
         IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i);
         if (!address.equals(this.localJGAddress))
         {
            if (localUniqueName.equals(address.getAdditionalData()))
               throw new Exception ("Local node removed from cluster (" + this.localJGAddress + "): another node (" + address + ") publicizing the same name was already there");
         }
      }
   
public voidviewAccepted(org.jgroups.View newView)
Notification of a cluster view change. This is done from the JG protocol handlder thread and we must be careful to not unduly block this thread. Because of this there are two types of listeners, synchronous and asynchronous. The synchronous listeners are messaged with the view change event using the calling thread while the asynchronous listeners are messaged using a seperate thread.

param
newView

      try
      {
         // we update the view id
         //
         this.currentViewId = newView.getVid().getId();

         // Keep a list of other members only for "exclude-self" RPC calls
         //
         this.jgotherMembers = (Vector)newView.getMembers().clone();
         this.jgotherMembers.remove (channel.getLocalAddress());
         this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
         Vector translatedNewView = translateAddresses ((Vector)newView.getMembers().clone());
         logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
                     " (old view: " + this.members + " )");


         // Save the previous view and make a copy of the new view
         Vector oldMembers = this.members;

         Vector newjgMembers = (Vector)newView.getMembers().clone();
         Vector newMembers = translateAddresses(newjgMembers); // TRANSLATE
         if (this.members == null)
         {
            // Initial viewAccepted
            this.members = newMembers;
            this.jgmembers = newjgMembers;
            log.debug("ViewAccepted: initial members set");
            return;
         }
         this.members = newMembers;
         this.jgmembers = newjgMembers;

         int difference = 0;
         if (oldMembers == null)
            difference = newMembers.size () - 1;
         else
            difference = newMembers.size () - oldMembers.size ();
         
         if (isCurrentNodeCoordinator ())
            clusterLifeCycleLog.info ("New cluster view for partition " + this.partitionName + " (id: " +
                                      this.currentViewId + ", delta: " + difference + ") : " + this.members);
         else
            log.info("New cluster view for partition " + this.partitionName + ": " +
                     this.currentViewId + " (" + this.members + " delta: " + difference + ")");

         // Build a ViewChangeEvent for the asynch listeners
         ViewChangeEvent event = new ViewChangeEvent();
         event.viewId = currentViewId;
         event.allMembers = translatedNewView;
         event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
         event.newMembers = getNewMembers(oldMembers, event.allMembers);
         event.originatingGroups = null;
         // if the new view occurs because of a merge, we first inform listeners of the merge
         if(newView instanceof MergeView)
         {
            MergeView mergeView = (MergeView) newView;
            event.originatingGroups = mergeView.getSubgroups();
         }

         log.debug("membership changed from " + 
                   (oldMembers == null ? 0 : oldMembers.size()) + " to " + 
                   event.allMembers.size());
         // Put the view change to the asynch queue
         this.asynchHandler.queueEvent(event);

         // Broadcast the new view to the synchronous view change listeners
         if (this.allowSyncListeners)
         {
            this.notifyListeners(synchListeners, event.viewId, event.allMembers,
                  event.deadMembers, event.newMembers, event.originatingGroups);
         }
      }
      catch (Exception ex)
      {
         log.error("ViewAccepted failed", ex);
      }