FileDocCategorySizeDatePackage
DistributedSynchronizationManager.javaAPI DocJBoss 4.2.18770Fri Jul 13 21:02:32 BST 2007org.jboss.aspects.versioned

DistributedSynchronizationManager

public class DistributedSynchronizationManager extends LocalSynchronizationManager implements org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer, org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener
Adds replication
author
Bill Burke
version
$Revision: 57186 $

Fields Summary
protected static Logger
log
protected org.jboss.ha.framework.interfaces.HAPartition
partition
protected String
domainName
protected Hashtable
heldLocks
Constructors Summary
public DistributedSynchronizationManager(String domainName, DistributedVersionManager versionManager, org.jboss.ha.framework.interfaces.HAPartition partition)


         
   
      super(versionManager);
      this.partition = partition;
      this.domainName = domainName + "/SynchManager";
   
Methods Summary
public voidacquireLocks(java.lang.String nodeName, org.jboss.util.id.GUID globalTxId, java.util.List list)

      log.trace("acquireLocks");
      ArrayList locks = new ArrayList();
      try
      {
         for (int i = 0; i < list.size(); i++)
         {
            GUID guid = (GUID)list.get(i);
            DistributedState state = getState(guid);
            state.acquireWriteLock();
            locks.add(state);
         }
         Hashtable held = (Hashtable)heldLocks.get(nodeName);
         if (held == null)
         {
            held = new Hashtable();
            heldLocks.put(nodeName, held);
         }
         held.put(globalTxId, locks);
      }
      catch (Exception ex)
      {
         releaseHeldLocks(locks);
         throw ex;
      }
      log.trace("end acquireLocks");
   
protected voidacquireRemoteLocks(org.jboss.util.id.GUID globalTxId, java.util.List guids)

      try
      {

         Object[] args = {partition.getNodeName(), globalTxId, guids};
         checkResponses(partition.callMethodOnCluster(domainName, "acquireLocks", args, true));
      }
      catch (Exception ex)
      {
         try
         {
            Object[] args = {partition.getNodeName()};
            partition.callMethodOnCluster(domainName, "releaseHeldLocks", args, true);
         }
         catch (Exception ignored)
         {
         }
         throw ex;
      }
   
public voidaddNewObjects(java.util.List newObjects)

      // updates must be in table first
      synchronized (tableLock)
      {
         for (int i = 0; i < newObjects.size(); i++)
         {
            DistributedState state = (DistributedState)newObjects.get(i);
            // REVISIT synch
            stateTable.put(state.getGUID(), state);
         }
         for (int i = 0; i < newObjects.size(); i++)
         {
            DistributedState state = (DistributedState)newObjects.get(i);
            if (objectTable.containsKey(state.getGUID())) continue;
            state.buildObject(this, versionManager);
         }
      }
   
protected voidcheckResponses(java.util.List rsps)
Checks whether any of the responses are exceptions. If yes, re-throws them (as exceptions or runtime exceptions).

param
rsps
throws
Exception

      Object rsp;
      if(rsps != null) {
         for(Iterator it=rsps.iterator(); it.hasNext();) {
            rsp=it.next();
            if(rsp != null) {
               if(rsp instanceof RuntimeException)
                  throw (RuntimeException)rsp;
               if(rsp instanceof Exception)
                  throw (Exception)rsp;
            }
         }
      }
   
public voidcreate()

      //partition.subscribeToStateTransferEvents(domainName, this);
      partition.registerRPCHandler(domainName, this);
   
public java.io.SerializablegetCurrentState()

      if(log.isTraceEnabled() )
         log.trace("getCurrentState called");
      return stateTable;
   
public voidmembershipChanged(java.util.Vector deadMembers, java.util.Vector newMembers, java.util.Vector allMembers)

      for (int i = 0; i < deadMembers.size(); i++)
      {
         Hashtable held = (Hashtable)heldLocks.remove(deadMembers.get(i));
         if (held != null)
         {
            Iterator it = held.values().iterator();
            while (it.hasNext())
            {
               List list = (List)it.next();
               releaseHeldLocks(list);
            }
         }
      }
   
public voidnoTxUpdate(DistributedUpdate update)

      throw new RuntimeException("NOT IMPLEMENTED");
   
protected voidpullState()

      Object[] args = {};
      ArrayList rsp = partition.callMethodOnCluster(domainName, "getCurrentState", args, true);
      if (rsp.size() > 0)
         setCurrentState((Serializable)rsp.get(0));
   
public voidreleaseHeldLocks(java.lang.String nodeName, org.jboss.util.id.GUID globalTxId)

      Hashtable held = (Hashtable)heldLocks.get(nodeName);
      if (held == null) return;

      List locks = (List)held.remove(globalTxId);
      if (locks != null) releaseHeldLocks(locks);
   
protected voidsendClusterUpdatesAndRelease(org.jboss.util.id.GUID globalTxId, java.util.List clusterUpdates)

      try
      {
         Object[] args = {partition.getNodeName(), globalTxId, clusterUpdates};
         checkResponses(partition.callMethodOnCluster(domainName, "updateObjects", args, true));

      }
      catch (Exception ex)
      {
         log.error("serious cache problems, data inconsistency is imminent", ex);
         throw ex;
      }
   
public voidsendNewObjects(java.util.List newObjects)

      log.trace("sending new objects");
      try
      {
         Object[] args = {newObjects};
         checkResponses(partition.callMethodOnCluster(domainName, "addNewObjects", args, true));
      }
      catch (Exception ex)
      {
         log.error("serious cache problems, data inconsistency is imminent", ex);
         throw ex;
      }

   
public voidsetCurrentState(java.io.Serializable newState)

      if( log.isTraceEnabled() )
         log.trace("setCurrentState called");
      try
      {
         synchronized (tableLock)
         {
            this.stateTable = (Hashtable)newState;
            log.trace("setCurrentState, size: " + stateTable.size());
            Iterator it = stateTable.values().iterator();
            while (it.hasNext())
            {
               DistributedState state = (DistributedState)it.next();
               if (objectTable.containsKey(state.getGUID())) continue;
               state.buildObject(this, versionManager);
            }
         }
      }
      catch (Exception ex)
      {
         log.error("failed to set state sent from cluster", ex);
      }
   
public voidstart()

      pullState();
   
public voidupdateObjects(java.lang.String nodeName, org.jboss.util.id.GUID globalTxId, java.util.ArrayList updates)

      log.trace("updateObjects");
      synchronized (tableLock)
      {
         for (int i = 0; i < updates.size(); i++)
         {
            DistributedUpdate update = (DistributedUpdate)updates.get(i);
            // REVISIT: synch
            DistributedState state = (DistributedState)stateTable.get(update.getGUID());
            state.mergeState(update);
            state.releaseWriteLock();
         }
      }
      Hashtable table = (Hashtable)heldLocks.get(nodeName);
      table.remove(globalTxId);
      log.trace("end updateObjects");