FileDocCategorySizeDatePackage
HAServiceMBeanSupport.javaAPI DocJBoss 4.2.116229Fri Jul 13 20:52:38 BST 2007org.jboss.ha.jmx

HAServiceMBeanSupport.java

/*
 * JBoss, Home of Professional Open Source.
 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
 * as indicated by the @author tags. See the copyright.txt file in the
 * distribution for a full listing of individual contributors.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */
package org.jboss.ha.jmx;

import java.io.Serializable;
import java.util.List;
import java.util.Set;

import javax.management.AttributeChangeNotification;
import javax.management.InstanceNotFoundException;
import javax.management.Notification;
import javax.management.ObjectInstance;
import javax.management.Query;
import javax.management.QueryExp;

import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.DistributedState;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.server.ClusterPartition;
import org.jboss.ha.framework.server.ClusterPartitionMBean;
import org.jboss.mx.util.MBeanProxyExt;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.system.server.ServerConfigUtil;

/**
 *  
 * Management Bean for an HA-Service.
 * Provides a convenient common base for cluster symmetric MBeans.
 * 
 * This class is also a user transparent extension
 * of the standard NotificationBroadcasterSupport
 * to a clustered environment.
 * Listeners register with their local broadcaster.
 * Invoking sendNotification() on any broadcaster,
 * will notify all listeners in the same cluster partition.
 * TODO: The performance can be further optimized by avoiding broadcast messages
 * when remote listener nodes are not interested (e.g. have no local subscribers)
 * or by iterating locally over filters or remote listeners. 
 *  
 * @author  Ivelin Ivanov <ivelin@apache.org> *
 * @version $Revision: 57188 $
 *
 */
public class HAServiceMBeanSupport
   extends ServiceMBeanSupport
   implements HAServiceMBean
{
   // Constants -----------------------------------------------------
 
   // Attributes ----------------------------------------------------
   private HAPartition partition_;
   private ClusterPartitionMBean clusterPartition;
   private String partitionName = ServerConfigUtil.getDefaultPartitionName();

   private DistributedReplicantManager.ReplicantListener drmListener = null;

   /** 
    * DRM participation TOKEN 
    */
   private String REPLICANT_TOKEN = "";

   private boolean sendLocalLifecycleNotifications = true;
   private boolean sendRemoteLifecycleNotifications = true;
   
   // Public --------------------------------------------------------

   public HAServiceMBeanSupport()
   {
      // for JMX
   }

   public ClusterPartitionMBean getClusterPartition()
   {
      return clusterPartition;
   }

   public void setClusterPartition(ClusterPartitionMBean clusterPartition)
   {
      if ((getState() != STARTED) && (getState() != STARTING))
      {         
         this.clusterPartition = clusterPartition;
      }
   }

   public String getPartitionName()
   {
      return partitionName;
   }

   public void setPartitionName(String newPartitionName)
   {
      if ((getState() != STARTED) && (getState() != STARTING))
      {
         partitionName = newPartitionName;
      }
   }


   // Protected ------------------------------

   /**
    * 
    * 
    * Convenience method for sharing state across a cluster partition.
    * Delegates to the DistributedStateService
    * 
    * @param key key for the distributed object
    * @param value the distributed object
    * 
    */
   public void setDistributedState(String key, Serializable value)
      throws Exception
   {
      DistributedState ds = getPartition().getDistributedStateService();
      ds.set(getServiceHAName(), key, value);
   }

   /**
    * 
    * Convenience method for sharing state across a cluster partition.
    * Delegates to the DistributedStateService
    * 
    * @param key key for the distributed object
    * @return Serializable the distributed object 
    * 
    */
   public Serializable getDistributedState(String key)
   {
      DistributedState ds = getPartition().getDistributedStateService();
      return ds.get(getServiceHAName(), key);
   }

   /**
    * <p>
    * Implementors of this method should not
    * code the singleton logic here. 
    * The MBean lifecycle create/start/stop are separate from
    * the singleton logic.
    * Singleton logic should originate in becomeMaster().
    * </p>
    * 
    * <p>
    * <b>Attention</b>: Always call this method when you overwrite it in a subclass
    *                   because it elects the master singleton node.
    * </p>
    * 
    */
   protected void startService() throws Exception
   {
      boolean debug = log.isDebugEnabled();
      if (debug)
         log.debug("start HAServiceMBeanSupport");

      setupPartition();

      registerRPCHandler();

      registerDRMListener();
   }

   /**
    * <p>
    * <b>Attention</b>: Always call this method when you overwrite it in a subclass
    * </p>
    * 
    */
   protected void stopService() throws Exception
   {
      boolean debug = log.isDebugEnabled();
      if (debug)
         log.debug("stop HAServiceMBeanSupport");

      unregisterDRMListener();

      unregisterRPCHandler();
   }

   protected void setupPartition() throws Exception
   {
      // get handle to the cluster partition
      if (clusterPartition == null)
      {
         String pName = getPartitionName();
         partition_ = findHAPartitionWithName(pName);
      }
      else
      {
         partition_ = clusterPartition.getHAPartition();
         partitionName = partition_.getPartitionName();
      }
   }

   protected void registerRPCHandler()
   {
      partition_.registerRPCHandler(getServiceHAName(), this);
   }

   protected void unregisterRPCHandler()
   {
      partition_.unregisterRPCHandler(getServiceHAName(), this);
   }

   protected void registerDRMListener() throws Exception
   {
      DistributedReplicantManager drm =
         this.partition_.getDistributedReplicantManager();

      // register to listen to topology changes, which might cause the election of a new master
      drmListener = new DistributedReplicantManager.ReplicantListener()
      {
         Object mutex = new Object();
         
         public void replicantsChanged(
            String key,
            List newReplicants,
            int newReplicantsViewId)
         {
            if (key.equals(getServiceHAName()))
            {
               // This synchronized block was added when the internal behavior of
               // DistributedReplicantManagerImpl was changed so that concurrent
               // replicantsChanged notifications are possible. Synchronization
               // ensures that this change won't break non-thread-safe 
               // subclasses of HAServiceMBeanSupport.
               synchronized(mutex)
               {
                  // change in the topology callback
                  HAServiceMBeanSupport.this.partitionTopologyChanged(newReplicants, newReplicantsViewId);
               }
            }
         }
      };
      drm.registerListener(getServiceHAName(), drmListener);

      // this ensures that the DRM knows that this node has the MBean deployed 
      drm.add(getServiceHAName(), REPLICANT_TOKEN);
   }

   protected void unregisterDRMListener() throws Exception
   {
      DistributedReplicantManager drm =
         this.partition_.getDistributedReplicantManager();

      // remove replicant node  
      drm.remove(getServiceHAName());

      // unregister 
      drm.unregisterListener(getServiceHAName(), drmListener);
   }

   public void partitionTopologyChanged(List newReplicants, int newReplicantsViewId)
   {
      boolean debug = log.isDebugEnabled();

      if (debug)
      {
         log.debug("partitionTopologyChanged(). cluster view id: " + newReplicantsViewId);
      }
   }

   protected boolean isDRMMasterReplica()
   {
      DistributedReplicantManager drm =
         getPartition().getDistributedReplicantManager();

      return drm.isMasterReplica(getServiceHAName());
   }


   public HAPartition getPartition()
   {
      return partition_;
   }

    /**
     *
     * @param methodName
     * @param args
     * @throws Exception
     * @deprecated Use {@link #callMethodOnPartition(String, Object[], Class[])} instead
     */
   public void callMethodOnPartition(String methodName, Object[] args)
      throws Exception
   {
      getPartition().callMethodOnCluster(
         getServiceHAName(),
         methodName,
         args,
         true);
   }

    public void callMethodOnPartition(String methodName, Object[] args, Class[] types)
      throws Exception
   {
      getPartition().callMethodOnCluster(
         getServiceHAName(),
         methodName,
         args, types,
         true);
   }

   /**
    * Gets whether JMX Notifications should be sent to local (same JVM) listeners
    * if the notification is for an attribute change to attribute "State".
    * <p>
    * Default is <code>true</code>.
    * </p>
    * @see #sendNotification(Notification)
    */
   public boolean getSendLocalLifecycleNotifications()
   {
      return sendLocalLifecycleNotifications;
   }

   /**
    * Sets whether JMX Notifications should be sent to local (same JVM) listeners
    * if the notification is for an attribute change to attribute "State".
    * <p>
    * Default is <code>true</code>.
    * </p>
    * @see #sendNotification(Notification)
    */
   public void setSendLocalLifecycleNotifications(boolean sendLocalLifecycleNotifications)
   {
      this.sendLocalLifecycleNotifications = sendLocalLifecycleNotifications;
   }

   /**
    * Gets whether JMX Notifications should be sent to remote listeners
    * if the notification is for an attribute change to attribute "State".
    * <p>
    * Default is <code>true</code>.
    * </p>
    * <p>
    * See http://jira.jboss.com/jira/browse/JBAS-3194 for an example of a
    * use case where this property should be set to false.
    * </p>
    * 
    * @see #sendNotification(Notification)
    */
   public boolean getSendRemoteLifecycleNotifications()
   {
      return sendRemoteLifecycleNotifications;
   }

   /**
    * Sets whether JMX Notifications should be sent to remote listeners
    * if the notification is for an attribute change to attribute "State".
    * <p>
    * Default is <code>true</code>.
    * </p>
    * <p>
    * See http://jira.jboss.com/jira/browse/JBAS-3194 for an example of a
    * use case where this property should be set to false.
    * </p>
    * 
    * @see #sendNotification(Notification)
    */
   public void setSendRemoteLifecycleNotifications(boolean sendRemoteLifecycleNotifications)
   {
      this.sendRemoteLifecycleNotifications = sendRemoteLifecycleNotifications;
   }

   /** 
    * Broadcast the notification to the remote listener nodes (if any) and then 
    * invoke super.sendNotification() to notify local listeners.
    * 
    * @param notification sent out to local listeners and other nodes. It should be serializable.
    * It is recommended that the source of the notification is an ObjectName of an MBean that 
    * is is available on all nodes where the broadcaster MBean is registered. 
    *   
    * @see #getSendLocalLifecycleNotifications()
    * @see #getSendRemoteLifecycleNotifications()
    * @see javax.management.NotificationBroadcasterSupport#sendNotification(Notification)
    */
   public void sendNotification(Notification notification)
   {
      boolean stateChange = isStateChangeNotification(notification);
      
      if (!stateChange || sendRemoteLifecycleNotifications)
      {
         try
         {
            // Overriding the source MBean with its ObjectName
            // to ensure that it can be safely transferred over the wire
            notification.setSource(this.getServiceName());
            sendNotificationRemote(notification);
         }
   
         catch (Throwable th)
         {
            boolean debug = log.isDebugEnabled();
            if (debug)
               log.debug("sendNotificationRemote( " + notification + " ) failed ", th);
            // even if broadcast failed, local notification should still be sent
   
         }
      }
      
      if (!stateChange || sendLocalLifecycleNotifications)
      {
         sendNotificationToLocalListeners(notification);
      }
   }
   
   private boolean isStateChangeNotification(Notification notification)
   {
      boolean stateChange = false;
      if (notification instanceof AttributeChangeNotification)
      {
         stateChange = "State".equals(((AttributeChangeNotification) notification).getAttributeName());
      }
      return stateChange;
   }

   protected void sendNotificationToLocalListeners(Notification notification)
   {
      super.sendNotification(notification);
   }

   protected void callAsyncMethodOnPartition(String methodName, Object[] args, Class[] types)
      throws Exception
   {
      HAPartition partition = getPartition();
      if (partition != null)
      {
         getPartition().callAsynchMethodOnCluster(
            getServiceHAName(),
            methodName,
            args, types,
            true);
      }
   }


   /**
    * 
    * Broadcast a notifcation remotely to the partition participants
    * 
    * @param notification
    */
   protected void sendNotificationRemote(Notification notification)
      throws Exception
   {
      callAsyncMethodOnPartition("_receiveRemoteNotification",
                                 new Object[]{notification}, new Class[]{Notification.class});
   }

   /**
    * 
    * Invoked by remote broadcasters. 
    * Delegates to the super class
    * 
    */
   public void _receiveRemoteNotification(Notification notification)
   {
      super.sendNotification(notification);
   }


   /**
    * 
    * Override this method only if you need to provide a custom partition wide unique service name. 
    * The default implementation will usually work, provided that 
    * the getServiceName() method returns a unique canonical MBean name.
    * 
    * @return partition wide unique service name
    */
   public String getServiceHAName()
   {
      return getServiceName().getCanonicalName();
   }


   protected HAPartition findHAPartitionWithName(String name) throws Exception
   {
      log.debug("findHAPartitionWithName, name="+name);
      HAPartition result = null;
      QueryExp classEQ = Query.eq(Query.classattr(),
               Query.value(ClusterPartition.class.getName()));
      QueryExp matchPartitionName = Query.match(Query.attr("PartitionName"),
         Query.value(name));
      QueryExp exp = Query.and(classEQ, matchPartitionName);
      Set mbeans = this.getServer().queryMBeans(null, exp);
      if (mbeans != null && mbeans.size() > 0)
      {
         ObjectInstance inst = (ObjectInstance) (mbeans.iterator().next());
         ClusterPartitionMBean cp =
            (ClusterPartitionMBean) MBeanProxyExt.create(
               ClusterPartitionMBean.class,
               inst.getObjectName(),
               this.getServer());
         result = cp.getHAPartition();
      }

      if( result == null )
      {
         String msg = "Failed to find HAPartition with PartitionName="+name;
         throw new InstanceNotFoundException(msg);
      }
      return result;
   }

   // Private -------------------------------------------------------

   // Inner classes -------------------------------------------------

}