FileDocCategorySizeDatePackage
DistributedSynchronizationManager.javaAPI DocJBoss 4.2.18770Fri Jul 13 21:02:32 BST 2007org.jboss.aspects.versioned

DistributedSynchronizationManager.java

/*
 * JBoss, Home of Professional Open Source.
 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
 * as indicated by the @author tags. See the copyright.txt file in the
 * distribution for a full listing of individual contributors.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */
package org.jboss.aspects.versioned;

import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
import org.jboss.logging.Logger;
import org.jboss.util.id.GUID;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;

/**
 *  Adds replication
 *
 *  @author <a href="mailto:bill@jboss.org">Bill Burke</a>
 *  @version $Revision: 57186 $
 */
public class DistributedSynchronizationManager extends LocalSynchronizationManager implements HAPartitionStateTransfer, HAMembershipListener
{

   protected static Logger log = Logger.getLogger(DistributedSynchronizationManager.class);
   protected HAPartition partition;
   protected String domainName;
   protected Hashtable heldLocks = new Hashtable();

   public DistributedSynchronizationManager(String domainName, DistributedVersionManager versionManager, HAPartition partition)
   {
      super(versionManager);
      this.partition = partition;
      this.domainName = domainName + "/SynchManager";
   }

   public void create() throws Exception
   {
      //partition.subscribeToStateTransferEvents(domainName, this);
      partition.registerRPCHandler(domainName, this);
   }

   public void start() throws Exception
   {
      pullState();
   }

   protected void pullState() throws Exception
   {
      Object[] args = {};
      ArrayList rsp = partition.callMethodOnCluster(domainName, "getCurrentState", args, true);
      if (rsp.size() > 0)
         setCurrentState((Serializable)rsp.get(0));
   }

   public Serializable getCurrentState()
   {
      if(log.isTraceEnabled() )
         log.trace("getCurrentState called");
      return stateTable;
   }

   public void setCurrentState(Serializable newState)
   {
      if( log.isTraceEnabled() )
         log.trace("setCurrentState called");
      try
      {
         synchronized (tableLock)
         {
            this.stateTable = (Hashtable)newState;
            log.trace("setCurrentState, size: " + stateTable.size());
            Iterator it = stateTable.values().iterator();
            while (it.hasNext())
            {
               DistributedState state = (DistributedState)it.next();
               if (objectTable.containsKey(state.getGUID())) continue;
               state.buildObject(this, versionManager);
            }
         }
      }
      catch (Exception ex)
      {
         log.error("failed to set state sent from cluster", ex);
      }
   }


   public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
   {
      for (int i = 0; i < deadMembers.size(); i++)
      {
         Hashtable held = (Hashtable)heldLocks.remove(deadMembers.get(i));
         if (held != null)
         {
            Iterator it = held.values().iterator();
            while (it.hasNext())
            {
               List list = (List)it.next();
               releaseHeldLocks(list);
            }
         }
      }
   }

   public void sendNewObjects(List newObjects) throws Exception
   {
      log.trace("sending new objects");
      try
      {
         Object[] args = {newObjects};
         checkResponses(partition.callMethodOnCluster(domainName, "addNewObjects", args, true));
      }
      catch (Exception ex)
      {
         log.error("serious cache problems, data inconsistency is imminent", ex);
         throw ex;
      }

   }

   protected void sendClusterUpdatesAndRelease(GUID globalTxId, List clusterUpdates) throws Exception
   {
      try
      {
         Object[] args = {partition.getNodeName(), globalTxId, clusterUpdates};
         checkResponses(partition.callMethodOnCluster(domainName, "updateObjects", args, true));

      }
      catch (Exception ex)
      {
         log.error("serious cache problems, data inconsistency is imminent", ex);
         throw ex;
      }
   }
   protected void acquireRemoteLocks(GUID globalTxId, List guids) throws Exception
   {
      try
      {

         Object[] args = {partition.getNodeName(), globalTxId, guids};
         checkResponses(partition.callMethodOnCluster(domainName, "acquireLocks", args, true));
      }
      catch (Exception ex)
      {
         try
         {
            Object[] args = {partition.getNodeName()};
            partition.callMethodOnCluster(domainName, "releaseHeldLocks", args, true);
         }
         catch (Exception ignored)
         {
         }
         throw ex;
      }
   }

   public void noTxUpdate(DistributedUpdate update) throws Exception
   {
      throw new RuntimeException("NOT IMPLEMENTED");
   }

   public void addNewObjects(List newObjects) throws Exception
   {
      // updates must be in table first
      synchronized (tableLock)
      {
         for (int i = 0; i < newObjects.size(); i++)
         {
            DistributedState state = (DistributedState)newObjects.get(i);
            // REVISIT synch
            stateTable.put(state.getGUID(), state);
         }
         for (int i = 0; i < newObjects.size(); i++)
         {
            DistributedState state = (DistributedState)newObjects.get(i);
            if (objectTable.containsKey(state.getGUID())) continue;
            state.buildObject(this, versionManager);
         }
      }
   }

   public void updateObjects(String nodeName, GUID globalTxId, ArrayList updates) throws Exception
   {
      log.trace("updateObjects");
      synchronized (tableLock)
      {
         for (int i = 0; i < updates.size(); i++)
         {
            DistributedUpdate update = (DistributedUpdate)updates.get(i);
            // REVISIT: synch
            DistributedState state = (DistributedState)stateTable.get(update.getGUID());
            state.mergeState(update);
            state.releaseWriteLock();
         }
      }
      Hashtable table = (Hashtable)heldLocks.get(nodeName);
      table.remove(globalTxId);
      log.trace("end updateObjects");
   }

   public void releaseHeldLocks(String nodeName, GUID globalTxId)
   {
      Hashtable held = (Hashtable)heldLocks.get(nodeName);
      if (held == null) return;

      List locks = (List)held.remove(globalTxId);
      if (locks != null) releaseHeldLocks(locks);
   }

   public void acquireLocks(String nodeName, GUID globalTxId, List list) throws Exception
   {
      log.trace("acquireLocks");
      ArrayList locks = new ArrayList();
      try
      {
         for (int i = 0; i < list.size(); i++)
         {
            GUID guid = (GUID)list.get(i);
            DistributedState state = getState(guid);
            state.acquireWriteLock();
            locks.add(state);
         }
         Hashtable held = (Hashtable)heldLocks.get(nodeName);
         if (held == null)
         {
            held = new Hashtable();
            heldLocks.put(nodeName, held);
         }
         held.put(globalTxId, locks);
      }
      catch (Exception ex)
      {
         releaseHeldLocks(locks);
         throw ex;
      }
      log.trace("end acquireLocks");
   }

   /**
    * Checks whether any of the responses are exceptions. If yes, re-throws
    * them (as exceptions or runtime exceptions).
    * @param rsps
    * @throws Exception
    */
   protected void checkResponses(List rsps) throws Exception {
      Object rsp;
      if(rsps != null) {
         for(Iterator it=rsps.iterator(); it.hasNext();) {
            rsp=it.next();
            if(rsp != null) {
               if(rsp instanceof RuntimeException)
                  throw (RuntimeException)rsp;
               if(rsp instanceof Exception)
                  throw (Exception)rsp;
            }
         }
      }
   }

}