FileDocCategorySizeDatePackage
JGCacheInvalidationBridge.javaAPI DocJBoss 4.2.120862Fri Jul 13 20:52:36 BST 2007org.jboss.cache.invalidation.bridges

JGCacheInvalidationBridge

public class JGCacheInvalidationBridge extends org.jboss.system.ServiceMBeanSupport implements DistributedReplicantManager.ReplicantListener, JGCacheInvalidationBridgeMBean, DistributedState.DSListenerEx, org.jboss.cache.invalidation.InvalidationBridgeListener
JGroups implementation of a cache invalidation bridge
see
JGCacheInvalidationBridgeMBean
author
Sacha Labourey.
version
$Revision: 58255 $

Revisions:

24 septembre 2002 Sacha Labourey:

  • First implementation

Fields Summary
protected String
partitionName
protected org.jboss.ha.framework.server.ClusterPartitionMBean
clusterPartition
The ClusterPartition with which we are associated.
protected String
invalidationManagerName
protected String
bridgeName
protected org.jboss.ha.framework.interfaces.HAPartition
partition
protected org.jboss.ha.framework.interfaces.DistributedState
ds
protected org.jboss.ha.framework.interfaces.DistributedReplicantManager
drm
protected String
RPC_HANLE_NAME
protected String
nodeName
protected org.jboss.cache.invalidation.InvalidationManagerMBean
invalMgr
protected org.jboss.cache.invalidation.BridgeInvalidationSubscription
invalidationSubscription
protected Collection
localGroups
protected Vector
bridgedGroups
protected final Class[]
rpc_invalidate_types
protected final Class[]
rpc_invalidates_types
protected final Class[]
rpc_invalidate_all_types
protected final Class[]
rpc_batch_invalidate_types
Constructors Summary
public JGCacheInvalidationBridge()



   // Static --------------------------------------------------------
   
   // Constructors --------------------------------------------------

     
   
   
Methods Summary
protected void_do_rpc_batchInvalidate(org.jboss.cache.invalidation.BatchInvalidation[] invalidations, boolean asynch)

      Object[] params = new Object[] {invalidations};
      try
      {         
         if (asynch)
            this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME,
                                                      "_rpc_batchInvalidate", params, rpc_batch_invalidate_types, true);
         else
            this.partition.callMethodOnCluster (this.RPC_HANLE_NAME,
                                                "_rpc_batchInvalidate", params, rpc_batch_invalidate_types, true);
      }
      catch (Exception e)
      {
         log.debug ("Distributed invalidation (3) has failed (Bridge: " + this.bridgeName + ")");
      }
   
protected void_do_rpc_invalidate(java.lang.String invalidationGroupName, java.io.Serializable key, boolean asynch)

      Object[] params = new Object[] {invalidationGroupName, key};
      try
      {         
         if (asynch)
            this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME,
                                                      "_rpc_invalidate",
                                                      params, rpc_invalidate_types, true);
         else
            this.partition.callMethodOnCluster (this.RPC_HANLE_NAME,
                                                "_rpc_invalidate",
                                                params, rpc_invalidate_types, true);
      }
      catch (Exception e)
      {
         log.debug ("Distributed invalidation (1) has failed for group " + 
                    invalidationGroupName + " (Bridge: " + this.bridgeName + ")");
      }
   
protected void_do_rpc_invalidate_all(java.lang.String invalidationGroupName, boolean asynch)

      Object[] params = new Object[] {invalidationGroupName};
      try
      {
         if (asynch)
            this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME,
                                                      "_rpc_invalidate_all", params, rpc_invalidate_all_types, true);
         else
            this.partition.callMethodOnCluster (this.RPC_HANLE_NAME,
                                                "_rpc_invalidate_all", params, rpc_invalidate_all_types, true);
      }
      catch (Exception e)
      {
         log.debug ("Distributed invalidation (2) has failed for group " +
                    invalidationGroupName + " (Bridge: " + this.bridgeName + ")");
      }
   
protected void_do_rpc_invalidates(java.lang.String invalidationGroupName, java.io.Serializable[] keys, boolean asynch)

      Object[] params = new Object[] {invalidationGroupName, keys};
      try
      {         
         if (asynch)
            this.partition.callAsynchMethodOnCluster (this.RPC_HANLE_NAME,
                                                      "_rpc_invalidates", params, rpc_invalidates_types, true);
         else
            this.partition.callMethodOnCluster (this.RPC_HANLE_NAME,
                                                "_rpc_invalidates", params, rpc_invalidates_types, true);
      }
      catch (Exception e)
      {
         log.debug ("Distributed invalidation (2) has failed for group " + 
                    invalidationGroupName + " (Bridge: " + this.bridgeName + ")");
      }
   
public void_rpc_batchInvalidate(org.jboss.cache.invalidation.BatchInvalidation[] invalidations)

      if (log.isTraceEnabled () && invalidations != null)
         log.trace ("Received remote batch invalidation for this number of groups: " + invalidations.length);

      this.invalidationSubscription.batchInvalidate (invalidations);
   
public void_rpc_invalidate(java.lang.String invalidationGroupName, java.io.Serializable key)

      if (log.isTraceEnabled ())
         log.trace ("Received remote invalidation for group: " + invalidationGroupName);

      this.invalidationSubscription.invalidate (invalidationGroupName, key);
   
public void_rpc_invalidate_all(java.lang.String invalidationGroupName)

      if (log.isTraceEnabled ())
         log.trace ("Received remote invalidate_all for group: " + invalidationGroupName);

      this.invalidationSubscription.invalidateAll (invalidationGroupName);
   
public void_rpc_invalidates(java.lang.String invalidationGroupName, java.io.Serializable[] keys)

      if (log.isTraceEnabled ())
         log.trace ("Received remote invalidations for group: " + invalidationGroupName);

      this.invalidationSubscription.invalidate (invalidationGroupName, keys);
   
public voidbatchInvalidate(org.jboss.cache.invalidation.BatchInvalidation[] invalidations, boolean asynchronous)

      if (invalidations == null) return;
      
      // we need to sort which group other nodes accept or refuse and propagate through the net
      //      
      ArrayList acceptedGroups = new ArrayList();
      
      for (int i=0; i<invalidations.length; i++)
      {
         BatchInvalidation currBI = invalidations[i];
         if (groupExistsRemotely (currBI.getInvalidationGroupName ()))
            acceptedGroups.add (currBI);
      }
      
      if (acceptedGroups.size () > 0)
      {
         BatchInvalidation[] result = new BatchInvalidation[acceptedGroups.size ()];
         result = (BatchInvalidation[])acceptedGroups.toArray (result);
         
         if (log.isTraceEnabled ())
            log.trace ("Transmitting batch invalidation: " + result);
         this._do_rpc_batchInvalidate (result, asynchronous);      
      }
   
public java.lang.StringgetBridgeName()

      return this.bridgeName;
   
public org.jboss.ha.framework.server.ClusterPartitionMBeangetClusterPartition()

      return clusterPartition;
   
public java.lang.StringgetInvalidationManager()

      return this.invalidationManagerName;
   
public java.lang.StringgetPartitionName()

      return this.partitionName;
   
protected booleangroupExistsRemotely(java.lang.String groupName)

      return this.bridgedGroups.contains (groupName);
   
public voidgroupIsDropped(java.lang.String groupInvalidationName)

      try
      {
         this.publishLocalInvalidationGroups ();
         //this.updatedBridgedInvalidationGroupsInfo ();
      }
      catch (Exception e)
      {
         log.info ("Problem while un-registering a new invalidation group over the cluster", e);
      }
   
public voidinvalidate(java.lang.String invalidationGroupName, java.io.Serializable[] keys, boolean asynchronous)

      // if the group exists on another node, we simply propagate to other nodes
      //
      if (log.isTraceEnabled ())
         log.trace ("Transmitting invalidations for group: " + invalidationGroupName);
      
      if (groupExistsRemotely (invalidationGroupName))
         _do_rpc_invalidates (invalidationGroupName, keys, asynchronous);
   
public voidinvalidate(java.lang.String invalidationGroupName, java.io.Serializable key, boolean asynchronous)

      // if the group exists on another node, we simply propagate to other nodes
      //
      if (log.isTraceEnabled ())
         log.trace ("Transmitting invalidation for group: " + invalidationGroupName);

      if (groupExistsRemotely (invalidationGroupName))
         _do_rpc_invalidate (invalidationGroupName, key, asynchronous);
   
public voidinvalidateAll(java.lang.String groupName, boolean async)

      if (log.isTraceEnabled ())
         log.trace ("Transmitting for all entries for invalidation for group: " + groupName);
      if (groupExistsRemotely (groupName))
         _do_rpc_invalidate_all (groupName, async);
   
public voidkeyHasBeenRemoved(java.lang.String category, java.io.Serializable key, java.io.Serializable previousContent, boolean locallyModified)

       this.updatedBridgedInvalidationGroupsInfo ();
    
public voidnewGroupCreated(java.lang.String groupInvalidationName)

      try
      {
         this.publishLocalInvalidationGroups ();
         //this.updatedBridgedInvalidationGroupsInfo ();
      }
      catch (Exception e)
      {
         log.info ("Problem while registering a new invalidation group over the cluster", e);
      }
   
protected synchronized voidpublishLocalInvalidationGroups()

      this.localGroups = invalMgr.getInvalidationGroups ();      
      
      log.debug ("Publishing locally available invalidation groups: " + this.localGroups);

      ArrayList content = new ArrayList (this.localGroups);      
      ArrayList result = new ArrayList (content.size ());
      

      for (int i = 0; i < content.size(); i++)
      {
         String aGroup = ((InvalidationGroup)content.get(i)).getGroupName ();
         result.add (aGroup);
      }
      
      if (result.size () > 0)       
      {
         NodeInfo info = new NodeInfo (result, this.nodeName);
         this.ds.set (this.RPC_HANLE_NAME, this.nodeName, info, true);
      }
      else
         this.ds.remove (this.RPC_HANLE_NAME, this.nodeName, true);
   
public synchronized voidreplicantsChanged(java.lang.String key, java.util.List newReplicants, int newReplicantsViewId)

todo
examine thread safety. synchronized keyword was added to method signature when internal behavior of DistributedReplicantManagerImpl was changed so that multiple threads could concurrently send replicantsChanged notifications. Need to examine in detail how this method interacts with DistributedState to see if we can remove/narrow the synchronization.

      if (key.equals (this.RPC_HANLE_NAME) && this.drm.isMasterReplica (this.RPC_HANLE_NAME))
      {
         log.debug ("The list of replicant for the JG bridge has changed, computing and updating local info...");
         
         // we remove any entry from the DS whose node is dead
         //
         java.util.Collection coll = this.ds.getAllKeys (this.RPC_HANLE_NAME);                  
         if (coll == null) 
         {
            log.debug ("... No bridge info was associated to this node");
            return;
         }
         
         // to avoid ConcurrentModificationException, we copy the list of keys in a new structure
         //
         ArrayList collCopy = new java.util.ArrayList (coll);
         java.util.List newReplicantsNodeNames = this.drm.lookupReplicantsNodeNames (this.RPC_HANLE_NAME);
         

         for (int i = 0; i < collCopy.size(); i++)
         {
            String nodeEntry = (String)collCopy.get(i);
            if (!newReplicantsNodeNames.contains (nodeEntry))
            {
               // the list of bridged topic contains a dead member: we remove it
               //
               try
               {
                  log.debug ("removing bridge information associated to this node from the DS");
                  this.ds.remove (this.RPC_HANLE_NAME, nodeEntry, true);
               }
               catch (Exception e)
               {
                  log.info ("Unable to remove a node entry from the distributed cache", e);
               }
            }
         }
      }
   
public voidsetBridgeName(java.lang.String name)

      this.bridgeName = name;
   
public voidsetClusterPartition(org.jboss.ha.framework.server.ClusterPartitionMBean clusterPartition)

      this.clusterPartition = clusterPartition;
   
public voidsetInvalidationManager(java.lang.String objectName)

      this.invalidationManagerName = objectName;
   
public voidsetPartitionName(java.lang.String partitionName)

      this.partitionName = partitionName;
   
public voidstartService()

      RPC_HANLE_NAME = "DCacheBridge-" + this.bridgeName;
      
      // Support old-style partition lookup for configs that don't
      // inject the partition.
      // TODO remove this after a while; deprecated in 4.0.4
      if (this.clusterPartition == null)
      {
         javax.naming.Context ctx = new javax.naming.InitialContext ();
         this.partition = (HAPartition)ctx.lookup("/HAPartition/" + this.partitionName); 
      }
      else
      {
         this.partition = this.clusterPartition.getHAPartition();
         this.partitionName = this.partition.getPartitionName();
      }
      
      this.ds = this.partition.getDistributedStateService ();
      this.drm = this.partition.getDistributedReplicantManager ();
      this.nodeName = this.partition.getNodeName ();
      
      this.drm.add (this.RPC_HANLE_NAME, "");
      this.drm.registerListener (this.RPC_HANLE_NAME, this);      
      this.ds.registerDSListenerEx (RPC_HANLE_NAME, this);
      this.partition.registerRPCHandler(RPC_HANLE_NAME, this);   
      
      // we now publish the list of caches we have access to
      //
      this.invalMgr = (org.jboss.cache.invalidation.InvalidationManagerMBean)
         org.jboss.system.Registry.lookup (this.invalidationManagerName);
      
      publishLocalInvalidationGroups ();      
      this.updatedBridgedInvalidationGroupsInfo ();
      
      this.invalidationSubscription = invalMgr.registerBridgeListener (this);
      
   
public voidstopService()

      try
      {
         this.partition.unregisterRPCHandler (this.RPC_HANLE_NAME, this);
         this.ds.unregisterDSListenerEx (this.RPC_HANLE_NAME, this);
         this.drm.unregisterListener (this.RPC_HANLE_NAME, this);
         this.drm.remove (this.RPC_HANLE_NAME);
         
         this.invalidationSubscription.unregister ();
                  
         this.ds.remove (this.RPC_HANLE_NAME, this.nodeName, true);
         
         this.invalMgr = null;
         this.partition = null;
         this.drm = null;
         this.ds = null;
         this.invalidationSubscription = null;
         this.RPC_HANLE_NAME = null;
         this.nodeName = null;
         this.localGroups = null;
         this.bridgedGroups = new Vector ();                  
      }
      catch (Exception e)
      {
         log.info ("Problem while shuting down invalidation cache bridge", e);
      }
   
protected voidupdatedBridgedInvalidationGroupsInfo()

      Collection bridgedByNode = this.ds.getAllValues (this.RPC_HANLE_NAME);
      
      log.debug ("Updating list of invalidation groups that are bridged...");
      
      if (bridgedByNode != null)
      {
         // Make a copy
         //      
         ArrayList copy = new ArrayList (bridgedByNode);

         Vector result  = new Vector ();
         

         for (int i = 0; i < copy.size(); i++)
         {
            NodeInfo infoForNode = (NodeInfo)copy.get(i);
            log.trace ("InfoForNode: " + infoForNode);
            
            if (infoForNode != null && !infoForNode.groupName.equals (this.nodeName))
            {
               ArrayList groupsForNode = infoForNode.groups;
               log.trace ("Groups for node: " + groupsForNode);
               

               for (int j = 0; j < groupsForNode.size(); j++)
               {
                  String aGroup = (String)groupsForNode.get(j);
                  if (!result.contains (aGroup))
                  {
                     log.trace ("Adding: " + aGroup);
                     result.add (aGroup);                  
                  }
               }
               
            }            
            
         }
         // atomic assignation of the result
         //
         this.bridgedGroups = result;
         
         log.debug ("... computed list of bridged groups: " + result);
      }
      else
      {
         log.debug ("... nothing needs to be bridged.");            
      }
         
   
public voidvalueHasChanged(java.lang.String category, java.io.Serializable key, java.io.Serializable value, boolean locallyModified)

       this.updatedBridgedInvalidationGroupsInfo ();