/*
* JBoss, Home of Professional Open Source.
* Copyright 2006, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.ha.framework.server;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.Name;
import javax.naming.NameNotFoundException;
import javax.naming.Reference;
import javax.naming.StringRefAddr;
import javax.management.MBeanServer;
import org.jgroups.JChannel;
import org.jgroups.MergeView;
import org.jgroups.View;
import org.jgroups.Message;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jboss.invocation.MarshalledValueInputStream;
import org.jboss.invocation.MarshalledValueOutputStream;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.DistributedState;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.naming.NonSerializableFactory;
import org.jboss.logging.Logger;
/**
* This class is an abstraction class for a JGroups RPCDispatch and JChannel.
* It is a default implementation of HAPartition for the
* <a href="http://www.jgroups.com/">JGroups</A> framework
*
* @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>.
* @author Scott.Stark@jboss.org
* @version $Revision: 62255 $
*/
public class HAPartitionImpl
extends org.jgroups.blocks.RpcDispatcher
implements org.jgroups.MessageListener, org.jgroups.MembershipListener,
HAPartition, AsynchEventHandler.AsynchEventProcessor
{
private static class NoHandlerForRPC implements Serializable
{
static final long serialVersionUID = -1263095408483622838L;
}
// Constants -----------------------------------------------------
// final MethodLookup method_lookup_clos = new MethodLookupClos();
// Attributes ----------------------------------------------------
protected HashMap rpcHandlers = new HashMap();
protected HashMap stateHandlers = new HashMap();
/** Do we send any membership change notifications synchronously? */
protected boolean allowSyncListeners = false;
/** The synch HAMembershipListener and HAMembershipExtendedListeners */
protected ArrayList synchListeners = new ArrayList();
/** The asynch HAMembershipListener and HAMembershipExtendedListeners */
protected ArrayList asynchListeners = new ArrayList();
/** The handler used to send membership change notifications asynchronously */
protected AsynchEventHandler asynchHandler;
/** The current cluster partition members */
protected Vector members = null;
protected Vector jgmembers = null;
public Vector history = null;
/** The partition members other than this node */
protected Vector otherMembers = null;
protected Vector jgotherMembers = null;
/** The JChannel name */
protected String partitionName;
/** the local JG IP Address */
protected org.jgroups.stack.IpAddress localJGAddress = null;
/** The cluster transport protocol address string */
protected String nodeName;
/** me as a ClusterNode */
protected ClusterNode me = null;
/** The timeout for cluster RPC calls */
protected long timeout = 60000;
/** The JGroups partition channel */
protected JChannel channel;
/** The cluster replicant manager */
protected DistributedReplicantManagerImpl replicantManager;
/** The cluster state manager */
protected DistributedStateImpl dsManager;
/** The cluster instance log category */
protected Logger log;
protected Logger clusterLifeCycleLog;
/** The current cluster view id */
protected long currentViewId = -1;
/** The JMX MBeanServer to use for registrations */
protected MBeanServer server;
/** Number of ms to wait for state */
protected long state_transfer_timeout=60000;
/** Whether to bind the partition into JNDI */
protected boolean bindIntoJndi = true;
/**
* True if state was initialized during start-up.
*/
protected boolean isStateSet = false;
/**
* An exception occuring upon fetch state.
*/
protected Exception setStateException;
private final Object stateLock = new Object();
// Static --------------------------------------------------------
/**
* Creates an object from a byte buffer
*/
public static Object objectFromByteBuffer (byte[] buffer) throws Exception
{
if(buffer == null)
return null;
ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
return mvis.readObject();
}
/**
* Serializes an object into a byte buffer.
* The object has to implement interface Serializable or Externalizable
*/
public static byte[] objectToByteBuffer (Object obj) throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
mvos.writeObject(obj);
mvos.flush();
return baos.toByteArray();
}
public long getStateTransferTimeout() {
return state_transfer_timeout;
}
public void setStateTransferTimeout(long state_transfer_timeout) {
this.state_transfer_timeout=state_transfer_timeout;
}
public long getMethodCallTimeout() {
return timeout;
}
public void setMethodCallTimeout(long timeout) {
this.timeout=timeout;
}
// Constructors --------------------------------------------------
public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection, MBeanServer server) throws Exception
{
this(partitionName, channel, deadlock_detection);
this.server = server;
}
public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection) throws Exception
{
super(channel, null, null, new Object(), deadlock_detection); // init RpcDispatcher with a fake target object
this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
this.channel = channel;
this.partitionName = partitionName;
this.history = new Vector();
this.setMarshaller(new MarshallerImpl());
logHistory ("Partition object created");
}
// Public --------------------------------------------------------
public void init() throws Exception
{
log.info("Initializing");
logHistory ("Initializing partition");
// Subscribe to dHA events comming generated by the org.jgroups. protocol stack
//
log.debug("setMembershipListener");
setMembershipListener(this);
log.debug("setMessageListener");
setMessageListener(this);
// Create the DRM and link it to this HAPartition
//
log.debug("create replicant manager");
this.replicantManager = new DistributedReplicantManagerImpl(this, this.server);
log.debug("init replicant manager");
this.replicantManager.init();
log.debug("bind replicant manager");
// Create the DS and link it to this HAPartition
//
log.debug("create distributed state");
this.dsManager = new DistributedStateImpl(this, this.server);
log.debug("init distributed state service");
this.dsManager.init();
log.debug("bind distributed state service");
// Create the asynchronous handler for view changes
asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
log.debug("done initing.");
}
public void startPartition() throws Exception
{
// get current JG group properties
//
logHistory ("Starting partition");
log.debug("get nodeName");
this.localJGAddress = (IpAddress)channel.getLocalAddress();
this.me = new ClusterNode(this.localJGAddress);
this.nodeName = this.me.getName();
log.debug("Get current members");
View view = channel.getView();
this.jgmembers = (Vector)view.getMembers().clone();
this.members = translateAddresses(this.jgmembers); // TRANSLATE
log.info("Number of cluster members: " + members.size());
for(int m = 0; m > members.size(); m ++)
{
Object node = members.get(m);
log.debug(node);
}
// Keep a list of other members only for "exclude-self" RPC calls
//
this.jgotherMembers = (Vector)view.getMembers().clone();
this.jgotherMembers.remove (channel.getLocalAddress());
this.otherMembers = translateAddresses(this.jgotherMembers); // TRANSLATE
log.info ("Other members: " + this.otherMembers.size ());
verifyNodeIsUnique (view.getMembers());
// Update the initial view id
//
this.currentViewId = view.getVid().getId();
// We must now synchronize new state transfer subscriber
//
fetchState();
// We start now able to start our DRM and DS
//
this.replicantManager.start();
this.dsManager.start();
// Start the asynch listener handler thread
asynchHandler.start();
// Bind ourself in the public JNDI space if configured to do so
if (!bindIntoJndi)
return;
Context ctx = new InitialContext();
this.bind("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx);
}
protected void fetchState() throws Exception
{
log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
long start, stop;
isStateSet = false;
start = System.currentTimeMillis();
boolean rc = channel.getState(null, this.state_transfer_timeout);
if (rc)
{
synchronized (stateLock)
{
while (!isStateSet)
{
if (setStateException != null)
throw setStateException;
try
{
stateLock.wait();
}
catch (InterruptedException iex)
{
}
}
}
stop = System.currentTimeMillis();
log.info("state was retrieved successfully (in " + (stop - start) + " milliseconds)");
}
else
{
// No one provided us with state.
// We need to find out if we are the coordinator, so we must
// block until viewAccepted() is called at least once
synchronized (members)
{
while (members.size() == 0)
{
log.debug("waiting on viewAccepted()");
try
{
members.wait();
}
catch (InterruptedException iex)
{
}
}
}
if (isCurrentNodeCoordinator())
{
log.info("State could not be retrieved (we are the first member in group)");
}
else
{
throw new IllegalStateException("Initial state transfer failed: " +
"Channel.getState() returned false");
}
}
}
public void closePartition() throws Exception
{
logHistory ("Closing partition");
log.info("Closing partition " + partitionName);
try
{
asynchHandler.stop();
}
catch( Exception e)
{
log.warn("Failed to stop asynchHandler", e);
}
// Stop the DRM and DS services
//
try
{
this.replicantManager.stop();
}
catch (Exception e)
{
log.error("operation failed", e);
}
try
{
this.dsManager.stop();
}
catch (Exception e)
{
log.error("operation failed", e);
}
// NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
// add the destroyPartition() step
try
{
// channel.close();
channel.disconnect();
}
catch (Exception e)
{
log.error("operation failed", e);
}
if (bindIntoJndi)
{
String boundName = "/HAPartition/" + partitionName;
InitialContext ctx = new InitialContext();
try
{
ctx.unbind(boundName);
}
finally
{
ctx.close();
}
NonSerializableFactory.unbind (boundName);
}
log.info("Partition " + partitionName + " closed.");
}
// NR 200505 : [JBCLUSTER-38] destroy partition close the channel
public void destroyPartition() throws Exception
{
try
{
this.replicantManager.destroy();
}
catch (Exception e)
{
log.error("operation failed", e);
}
try
{
this.dsManager.destroy();
}
catch (Exception e)
{
log.error("operation failed", e);
}
try
{
channel.close();
}
catch (Exception e)
{
log.error("operation failed", e);
}
log.info("Partition " + partitionName + " destroyed.");
}
// org.jgroups.MessageListener implementation ----------------------------------------------
// MessageListener methods
//
public byte[] getState()
{
logHistory ("getState called on partition");
boolean debug = log.isDebugEnabled();
log.debug("getState called.");
try
{
// we now get the sub-state of each HAPartitionStateTransfer subscribers and
// build a "macro" state
//
HashMap state = new HashMap();
Iterator keys = stateHandlers.keySet().iterator();
while (keys.hasNext())
{
String key = (String)keys.next();
HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
if (debug)
log.debug("getState for " + key);
state.put(key, subscriber.getCurrentState());
}
return objectToByteBuffer(state);
}
catch (Exception ex)
{
log.error("getState failed", ex);
}
return null;
}
public void setState(byte[] obj)
{
logHistory ("setState called on partition");
try
{
log.debug("setState called");
if (obj == null)
{
log.debug("state is null");
return;
}
long used_mem_before, used_mem_after;
int state_size=obj != null? obj.length : 0;
Runtime rt=Runtime.getRuntime();
used_mem_before=rt.totalMemory() - rt.freeMemory();
HashMap state = (HashMap)objectFromByteBuffer(obj);
java.util.Iterator keys = state.keySet().iterator();
while (keys.hasNext())
{
String key = (String)keys.next();
log.debug("setState for " + key);
Object someState = state.get(key);
HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
if (subscriber != null)
{
try
{
subscriber.setCurrentState((java.io.Serializable)someState);
}
catch (Exception e)
{
// Don't let issues with one subscriber affect others
// unless it is DRM or DS, which are really internal
// functions of the HAPartition
if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key)
|| DistributedStateImpl.SERVICE_NAME.equals(key))
{
if (e instanceof RuntimeException)
throw (RuntimeException) e;
else
throw new RuntimeException(e);
}
else
{
log.error("Caught exception setting state to " + subscriber, e);
}
}
}
else
{
log.debug("There is no stateHandler for: " + key);
}
}
used_mem_after=rt.totalMemory() - rt.freeMemory();
log.debug("received a state of " + state_size + " bytes; expanded memory by " +
(used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
", used memory after: " + used_mem_after + ")");
isStateSet = true;
}
catch (Throwable t)
{
log.error("failed setting state", t);
if (t instanceof Exception)
setStateException = (Exception) t;
else
setStateException = new Exception(t);
}
finally
{
synchronized (stateLock)
{
// Notify wait that state has been set.
stateLock.notifyAll();
}
}
}
public void receive(org.jgroups.Message msg)
{ /* complete */}
// org.jgroups.MembershipListener implementation ----------------------------------------------
public void suspect(org.jgroups.Address suspected_mbr)
{
logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
if (isCurrentNodeCoordinator ())
clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
else
log.info("Suspected member: " + suspected_mbr);
}
public void block() {}
/** Notification of a cluster view change. This is done from the JG protocol
* handlder thread and we must be careful to not unduly block this thread.
* Because of this there are two types of listeners, synchronous and
* asynchronous. The synchronous listeners are messaged with the view change
* event using the calling thread while the asynchronous listeners are
* messaged using a seperate thread.
*
* @param newView
*/
public void viewAccepted(View newView)
{
try
{
// we update the view id
//
this.currentViewId = newView.getVid().getId();
// Keep a list of other members only for "exclude-self" RPC calls
//
this.jgotherMembers = (Vector)newView.getMembers().clone();
this.jgotherMembers.remove (channel.getLocalAddress());
this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
Vector translatedNewView = translateAddresses ((Vector)newView.getMembers().clone());
logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
" (old view: " + this.members + " )");
// Save the previous view and make a copy of the new view
Vector oldMembers = this.members;
Vector newjgMembers = (Vector)newView.getMembers().clone();
Vector newMembers = translateAddresses(newjgMembers); // TRANSLATE
if (this.members == null)
{
// Initial viewAccepted
this.members = newMembers;
this.jgmembers = newjgMembers;
log.debug("ViewAccepted: initial members set");
return;
}
this.members = newMembers;
this.jgmembers = newjgMembers;
int difference = 0;
if (oldMembers == null)
difference = newMembers.size () - 1;
else
difference = newMembers.size () - oldMembers.size ();
if (isCurrentNodeCoordinator ())
clusterLifeCycleLog.info ("New cluster view for partition " + this.partitionName + " (id: " +
this.currentViewId + ", delta: " + difference + ") : " + this.members);
else
log.info("New cluster view for partition " + this.partitionName + ": " +
this.currentViewId + " (" + this.members + " delta: " + difference + ")");
// Build a ViewChangeEvent for the asynch listeners
ViewChangeEvent event = new ViewChangeEvent();
event.viewId = currentViewId;
event.allMembers = translatedNewView;
event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
event.newMembers = getNewMembers(oldMembers, event.allMembers);
event.originatingGroups = null;
// if the new view occurs because of a merge, we first inform listeners of the merge
if(newView instanceof MergeView)
{
MergeView mergeView = (MergeView) newView;
event.originatingGroups = mergeView.getSubgroups();
}
log.debug("membership changed from " +
(oldMembers == null ? 0 : oldMembers.size()) + " to " +
event.allMembers.size());
// Put the view change to the asynch queue
this.asynchHandler.queueEvent(event);
// Broadcast the new view to the synchronous view change listeners
if (this.allowSyncListeners)
{
this.notifyListeners(synchListeners, event.viewId, event.allMembers,
event.deadMembers, event.newMembers, event.originatingGroups);
}
}
catch (Exception ex)
{
log.error("ViewAccepted failed", ex);
}
}
// HAPartition implementation ----------------------------------------------
public String getNodeName()
{
return nodeName;
}
public String getPartitionName()
{
return partitionName;
}
public DistributedReplicantManager getDistributedReplicantManager()
{
return replicantManager;
}
public DistributedState getDistributedStateService()
{
return this.dsManager;
}
public long getCurrentViewId()
{
return this.currentViewId;
}
public Vector getCurrentView()
{
Vector result = new Vector (this.members.size());
for (int i = 0; i < members.size(); i++)
{
result.add( ((ClusterNode) members.elementAt(i)).getName() );
}
return result;
}
public ClusterNode[] getClusterNodes ()
{
ClusterNode[] nodes = new ClusterNode[this.members.size()];
this.members.toArray(nodes);
return nodes;
}
public ClusterNode getClusterNode ()
{
return me;
}
public boolean isCurrentNodeCoordinator ()
{
if(this.members == null || this.members.size() == 0 || this.me == null)
return false;
return this.members.elementAt (0).equals (this.me);
}
// ***************************
// ***************************
// RPC multicast communication
// ***************************
// ***************************
//
public void registerRPCHandler(String objName, Object subscriber)
{
rpcHandlers.put(objName, subscriber);
}
public void unregisterRPCHandler(String objName, Object subscriber)
{
rpcHandlers.remove(objName);
}
/**
*
* @param objName
* @param methodName
* @param args
* @param excludeSelf
* @return
* @throws Exception
* @deprecated Use {@link #callMethodOnCluster(String,String,Object[],Class[], boolean)} instead
*/
public ArrayList callMethodOnCluster(String objName, String methodName,
Object[] args, boolean excludeSelf) throws Exception
{
return callMethodOnCluster(objName, methodName, args, null, excludeSelf);
}
/**
* This function is an abstraction of RpcDispatcher.
*/
public ArrayList callMethodOnCluster(String objName, String methodName,
Object[] args, Class[] types, boolean excludeSelf) throws Exception
{
return callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.timeout);
}
public ArrayList callMethodOnCluster(String objName, String methodName,
Object[] args, Class[] types, boolean excludeSelf, long methodTimeout) throws Exception
{
ArrayList rtn = new ArrayList();
MethodCall m=null;
RspList rsp = null;
boolean trace = log.isTraceEnabled();
if(types != null)
m=new MethodCall(objName + "." + methodName, args, types);
else
m=new MethodCall(objName + "." + methodName, args);
if (excludeSelf)
{
if( trace )
{
log.trace("callMethodOnCluster(true), objName="+objName
+", methodName="+methodName+", members="+jgotherMembers);
}
rsp = this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
}
else
{
if( trace )
{
log.trace("callMethodOnCluster(false), objName="+objName
+", methodName="+methodName+", members="+members);
}
rsp = this.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
}
if (rsp != null)
{
for (int i = 0; i < rsp.size(); i++)
{
Object item = rsp.elementAt(i);
if (item instanceof Rsp)
{
Rsp response = (Rsp) item;
// Only include received responses
boolean wasReceived = response.wasReceived();
if( wasReceived == true )
{
item = response.getValue();
if (!(item instanceof NoHandlerForRPC))
rtn.add(item);
}
else if( trace )
log.trace("Ignoring non-received response: "+response);
}
else
{
if (!(item instanceof NoHandlerForRPC))
rtn.add(item);
else if( trace )
log.trace("Ignoring NoHandlerForRPC");
}
}
}
return rtn;
}
/**
* Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
* cluster.
* and is replaced
* @param objName
* @param methodName
* @param args
* @param types
* @param excludeSelf
* @return
* @throws Exception
*/
public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
Object[] args, Class[] types,boolean excludeSelf) throws Exception
{
return callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf,this.timeout);
}
/**
* Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
* cluster.
* and is replaced
* @param objName
* @param methodName
* @param args
* @param types
* @param excludeSelf
* @param methodTimeout
* @return
* @throws Exception
*/
public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
Object[] args, Class[] types,boolean excludeSelf, long methodTimeout) throws Exception
{
ArrayList rtn = new ArrayList();
MethodCall m=null;
RspList rsp = null;
boolean trace = log.isTraceEnabled();
if(types != null)
m=new MethodCall(objName + "." + methodName, args, types);
else
m=new MethodCall(objName + "." + methodName, args);
if( trace )
{
log.trace("callMethodOnCoordinatorNode(false), objName="+objName
+", methodName="+methodName);
}
// the first cluster view member is the coordinator
Vector coordinatorOnly = new Vector();
// If we are the coordinator, only call ourself if 'excludeSelf' is false
if (false == isCurrentNodeCoordinator () ||
false == excludeSelf)
coordinatorOnly.addElement(this.jgmembers.elementAt (0));
rsp = this.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
if (rsp != null)
{
for (int i = 0; i < rsp.size(); i++)
{
Object item = rsp.elementAt(i);
if (item instanceof Rsp)
{
Rsp response = (Rsp) item;
// Only include received responses
boolean wasReceived = response.wasReceived();
if( wasReceived == true )
{
item = response.getValue();
if (!(item instanceof NoHandlerForRPC))
rtn.add(item);
}
else if( trace )
log.trace("Ignoring non-received response: "+response);
}
else
{
if (!(item instanceof NoHandlerForRPC))
rtn.add(item);
else if( trace )
log.trace("Ignoring NoHandlerForRPC");
}
}
}
return rtn;
}
/**
*
* @param objName
* @param methodName
* @param args
* @param excludeSelf
* @throws Exception
* @deprecated Use {@link #callAsynchMethodOnCluster(String, String, Object[], Class[], boolean)} instead
*/
public void callAsynchMethodOnCluster(String objName, String methodName,
Object[] args, boolean excludeSelf) throws Exception {
callAsynchMethodOnCluster(objName, methodName, args, null, excludeSelf);
}
/**
* This function is an abstraction of RpcDispatcher for asynchronous messages
*/
public void callAsynchMethodOnCluster(String objName, String methodName,
Object[] args, Class[] types, boolean excludeSelf) throws Exception
{
MethodCall m = null;
boolean trace = log.isTraceEnabled();
if(types != null)
m=new MethodCall(objName + "." + methodName, args, types);
else
m=new MethodCall(objName + "." + methodName, args);
if (excludeSelf)
{
if( trace )
{
log.trace("callAsynchMethodOnCluster(true), objName="+objName
+", methodName="+methodName+", members="+jgotherMembers);
}
this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, timeout);
}
else
{
if( trace )
{
log.trace("callAsynchMethodOnCluster(false), objName="+objName
+", methodName="+methodName+", members="+members);
}
this.callRemoteMethods(null, m, GroupRequest.GET_NONE, timeout);
}
}
// *************************
// *************************
// State transfer management
// *************************
// *************************
//
public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
{
stateHandlers.put(objectName, subscriber);
}
public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
{
stateHandlers.remove(objectName);
}
// *************************
// *************************
// Group Membership listeners
// *************************
// *************************
//
public void registerMembershipListener(HAMembershipListener listener)
{
boolean isAsynch = (this.allowSyncListeners == false)
|| (listener instanceof AsynchHAMembershipListener)
|| (listener instanceof AsynchHAMembershipExtendedListener);
if( isAsynch ) {
synchronized(this.asynchListeners) {
this.asynchListeners.add(listener);
}
}
else {
synchronized(this.synchListeners) {
this.synchListeners.add(listener);
}
}
}
public void unregisterMembershipListener(HAMembershipListener listener)
{
boolean isAsynch = (this.allowSyncListeners == false)
|| (listener instanceof AsynchHAMembershipListener)
|| (listener instanceof AsynchHAMembershipExtendedListener);
if( isAsynch ) {
synchronized(this.asynchListeners) {
this.asynchListeners.remove(listener);
}
}
else {
synchronized(this.synchListeners) {
this.synchListeners.remove(listener);
}
}
}
public boolean getAllowSynchronousMembershipNotifications()
{
return allowSyncListeners;
}
public void setAllowSynchronousMembershipNotifications(boolean allowSync)
{
this.allowSyncListeners = allowSync;
}
// org.jgroups.RpcDispatcher overrides ---------------------------------------------------
/**
* Message contains MethodCall. Execute it against *this* object and return result.
* Use MethodCall.Invoke() to do this. Return result.
*
* This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
* @param req The org.jgroups. representation of the method invocation
* @return The serializable return value from the invocation
*/
public Object handle(Message req)
{
Object body = null;
Object retval = null;
MethodCall method_call = null;
boolean trace = log.isTraceEnabled();
if( trace )
log.trace("Partition " + partitionName + " received msg");
if(req == null || req.getBuffer() == null)
{
log.warn("message or message buffer is null !");
return null;
}
try
{
body = objectFromByteBuffer(req.getBuffer());
}
catch(Exception e)
{
log.warn("failed unserializing message buffer (msg=" + req + ")", e);
return null;
}
if(body == null || !(body instanceof MethodCall))
{
log.warn("message does not contain a MethodCall object !");
return null;
}
// get method call informations
//
method_call = (MethodCall)body;
String methodName = method_call.getName();
if( trace )
log.trace("pre methodName: " + methodName);
int idx = methodName.lastIndexOf('.');
String handlerName = methodName.substring(0, idx);
String newMethodName = methodName.substring(idx + 1);
if( trace )
{
log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
log.trace("Handle: " + methodName);
}
// prepare method call
method_call.setName(newMethodName);
Object handler = rpcHandlers.get(handlerName);
if (handler == null)
{
if( trace )
log.debug("No rpc handler registered under: "+handlerName);
return new NoHandlerForRPC();
}
/* Invoke it and just return any exception with trace level logging of
the exception. The exception semantics of a group rpc call are weak as
the return value may be a normal return value or the exception thrown.
*/
try
{
retval = method_call.invoke(handler);
if( trace )
log.trace("rpc call return value: "+retval);
}
catch (Throwable t)
{
if( trace )
log.trace("rpc call threw exception", t);
retval = t;
}
return retval;
}
// AsynchEventHandler.AsynchEventProcessor -----------------------
public void processEvent(Object event)
{
ViewChangeEvent vce = (ViewChangeEvent) event;
notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
vce.deadMembers, vce.newMembers, vce.originatingGroups);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
protected void verifyNodeIsUnique (Vector javaGroupIpAddresses) throws Exception
{
byte[] localUniqueName = this.localJGAddress.getAdditionalData();
if (localUniqueName == null)
{
log.warn("No additional information has been found in the JavaGroup address: " +
"make sure you are running with a correct version of JGroups and that the protocol " +
" you are using supports the 'additionalData' behaviour");
return;
}
for (int i = 0; i < javaGroupIpAddresses.size(); i++)
{
IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i);
if (!address.equals(this.localJGAddress))
{
if (localUniqueName.equals(address.getAdditionalData()))
throw new Exception ("Local node removed from cluster (" + this.localJGAddress + "): another node (" + address + ") publicizing the same name was already there");
}
}
}
/**
* Helper method that binds the partition in the JNDI tree.
* @param jndiName Name under which the object must be bound
* @param who Object to bind in JNDI
* @param classType Class type under which should appear the bound object
* @param ctx Naming context under which we bind the object
* @throws Exception Thrown if a naming exception occurs during binding
*/
protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception
{
// Ah ! This service isn't serializable, so we use a helper class
//
NonSerializableFactory.bind(jndiName, who);
Name n = ctx.getNameParser("").parse(jndiName);
while (n.size () > 1)
{
String ctxName = n.get (0);
try
{
ctx = (Context)ctx.lookup (ctxName);
}
catch (NameNotFoundException e)
{
log.debug ("creating Subcontext" + ctxName);
ctx = ctx.createSubcontext (ctxName);
}
n = n.getSuffix (1);
}
// The helper class NonSerializableFactory uses address type nns, we go on to
// use the helper class to bind the service object in JNDI
//
StringRefAddr addr = new StringRefAddr("nns", jndiName);
Reference ref = new Reference(classType.getName (), addr, NonSerializableFactory.class.getName (), null);
ctx.rebind (n.get (0), ref);
}
/**
* Helper method that returns a vector of dead members from two input vectors: new and old vectors of two views.
* Dead members are old - new members.
* @param oldMembers Vector of old members
* @param newMembers Vector of new members
* @return Vector of members that have died between the two views, can be empty.
*/
protected Vector getDeadMembers(Vector oldMembers, Vector newMembers)
{
boolean debug = log.isDebugEnabled();
if(oldMembers == null) oldMembers=new Vector();
if(newMembers == null) newMembers=new Vector();
Vector dead=(Vector)oldMembers.clone();
dead.removeAll(newMembers);
if(dead.size() > 0 && debug)
log.debug("dead members: " + dead);
return dead;
}
/**
* Helper method that returns a vector of new members from two input vectors: new and old vectors of two views.
* @param oldMembers Vector of old members
* @param allMembers Vector of new members
* @return Vector of members that have joined the partition between the two views
*/
protected Vector getNewMembers(Vector oldMembers, Vector allMembers)
{
if(oldMembers == null) oldMembers=new Vector();
if(allMembers == null) allMembers=new Vector();
Vector newMembers=(Vector)allMembers.clone();
newMembers.removeAll(oldMembers);
return newMembers;
}
protected void notifyListeners(ArrayList theListeners, long viewID,
Vector allMembers, Vector deadMembers, Vector newMembers,
Vector originatingGroups)
{
log.debug("Begin notifyListeners, viewID: "+viewID);
synchronized(theListeners)
{
// JBAS-3619 -- don't hold synch lock while notifying
theListeners = (ArrayList) theListeners.clone();
}
for (int i = 0; i < theListeners.size(); i++)
{
HAMembershipListener aListener = null;
try
{
aListener = (HAMembershipListener) theListeners.get(i);
if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
{
HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
exListener.membershipChangedDuringMerge (deadMembers, newMembers,
allMembers, originatingGroups);
}
else
{
aListener.membershipChanged(deadMembers, newMembers, allMembers);
}
}
catch (Throwable e)
{
// a problem in a listener should not prevent other members to receive the new view
log.warn("HAMembershipListener callback failure: "+aListener, e);
}
}
log.debug("End notifyListeners, viewID: "+viewID);
}
/*
* Allows caller to specify whether the partition instance should be bound into JNDI. Default value is true.
* This method must be called before the partition is started as the binding occurs during startup.
*
* @param bind Whether to bind the partition into JNDI.
*/
public void setBindIntoJndi(boolean bind)
{
bindIntoJndi = bind;
}
/*
* Allows caller to determine whether the partition instance has been bound into JNDI.
*
* @return true if the partition has been bound into JNDI.
*/
public boolean getBindIntoJndi()
{
return bindIntoJndi;
}
protected Vector translateAddresses (Vector jgAddresses)
{
if (jgAddresses == null)
return null;
Vector result = new Vector (jgAddresses.size());
for (int i = 0; i < jgAddresses.size(); i++)
{
IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
result.add(new ClusterNode (addr));
}
return result;
}
public void logHistory (String message)
{
try
{
history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
}
catch (Exception ignored){}
}
/** A simple data class containing the view change event needed to
* message the HAMembershipListeners
*/
private static class ViewChangeEvent
{
long viewId;
Vector deadMembers;
Vector newMembers;
Vector allMembers;
Vector originatingGroups;
}
private class MarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
{
public Object objectFromByteBuffer(byte[] buf) throws Exception
{
return HAPartitionImpl.objectFromByteBuffer(buf);
}
public byte[] objectToByteBuffer(Object obj) throws Exception
{
return HAPartitionImpl.objectToByteBuffer(obj);
}
}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
|