Methods Summary |
---|
public void | _add(java.lang.String key, java.lang.String nodeName, java.io.Serializable replicant)Cluster callback called when a new replicant is added on another node
if( trace )
log.trace("_add(" + key + ", " + nodeName);
try
{
addReplicant(key, nodeName, replicant);
// Notify listeners asynchronously
KeyChangeEvent kce = new KeyChangeEvent();
kce.key = key;
kce.replicants = lookupReplicants(key);
asynchHandler.queueEvent(kce);
}
catch (Exception ex)
{
log.error("_add failed", ex);
}
|
public void | _remove(java.lang.String key, java.lang.String nodeName)Cluster callback called when a replicant is removed by another node
try
{
if (removeReplicant (key, nodeName)) {
// Notify listeners asynchronously
KeyChangeEvent kce = new KeyChangeEvent();
kce.key = key;
kce.replicants = lookupReplicants(key);
asynchHandler.queueEvent(kce);
}
}
catch (Exception ex)
{
log.error("_remove failed", ex);
}
|
public void | add(java.lang.String key, java.io.Serializable replicant)
if( trace )
log.trace("add, key="+key+", value="+replicant);
partitionNameKnown.acquire (); // we don't propagate until our name is known
Object[] args = {key, this.nodeName, replicant};
partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
synchronized(localReplicants)
{
localReplicants.put(key, replicant);
notifyKeyListeners(key, lookupReplicants(key));
}
|
protected void | addReplicant(java.lang.String key, java.lang.String nodeName, java.io.Serializable replicant)Add a replicant to the replicants map.
addReplicant(replicants, key, nodeName, replicant);
|
protected void | addReplicant(java.util.Map map, java.lang.String key, java.lang.String nodeName, java.io.Serializable replicant)Logic for adding replicant to any map.
synchronized(map)
{
HashMap rep = (HashMap)map.get(key);
if (rep == null)
{
if( trace )
log.trace("_adding new HashMap");
rep = new HashMap();
map.put(key, rep);
}
rep.put(nodeName, replicant);
}
|
protected int | calculateReplicantsHash(java.util.List members)
int result = 0;
Object obj = null;
for (int i=0; i<members.size (); i++)
{
obj = members.get (i);
if (obj != null)
result+= obj.hashCode (); // no explicit overflow with int addition
}
return result;
|
protected void | cleanupKeyListeners()
// NOT IMPLEMENTED YET
|
public void | destroy()
// now partition can't be resuscitated, so remove local replicants
if (localReplicants != null)
{
synchronized(localReplicants)
{
String[] keys = new String[localReplicants.size()];
localReplicants.keySet().toArray(keys);
for(int n = 0; n < keys.length; n ++)
{
this.removeLocal(keys[n]); // channel is disconnected, so
// don't try to notify cluster
}
}
}
this.mbeanserver.unregisterMBean (this.jmxName);
partition.unregisterRPCHandler(SERVICE_NAME, this);
partition.unsubscribeFromStateTransferEvents(SERVICE_NAME, this);
partition.unregisterMembershipListener(this);
|
public java.util.Collection | getAllServices()
HashSet services = new HashSet();
services.addAll (localReplicants.keySet ());
services.addAll (replicants.keySet ());
return services;
|
public java.io.Serializable | getCurrentState()
java.util.Collection services = this.getAllServices ();
HashMap result = new HashMap ();
java.util.Iterator catsIter = services.iterator ();
while (catsIter.hasNext ())
{
String category = (String)catsIter.next ();
HashMap content = (HashMap)this.replicants.get (category);
if (content == null)
content = new HashMap ();
else
content = (HashMap)content.clone ();
Serializable local = lookupLocalReplicant(category);
if (local != null)
content.put (this.nodeName, local);
result.put (category, content);
}
// we add the intraviewid cache to the global result
//
Object[] globalResult = new Object[] {result, intraviewIdCache};
return globalResult;
|
protected java.util.Vector | getKeysReplicatedByNode(java.lang.String nodeName)
Vector result = new Vector ();
synchronized (replicants)
{
Iterator keysIter = replicants.keySet ().iterator ();
while (keysIter.hasNext ())
{
String key = (String)keysIter.next ();
HashMap values = (HashMap)replicants.get (key);
if ( (values != null) && values.containsKey (nodeName) )
{
result.add (key);
}
}
}
return result;
|
public int | getReplicantsViewId(java.lang.String key)
Integer result = (Integer)this.intraviewIdCache.get (key);
if (result == null)
return 0;
else
return result.intValue ();
|
public void | init()
log.debug("registerRPCHandler");
partition.registerRPCHandler(SERVICE_NAME, this);
log.debug("subscribeToStateTransferEvents");
partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
log.debug("registerMembershipListener");
partition.registerMembershipListener(this);
// subscribed this "sub-service" of HAPartition with JMX
// TODO: In the future (when state transfer issues will be completed),
// we will need to redesign the way HAPartitions and its sub-protocols are
// registered with JMX. They will most probably be independant JMX services.
//
String name = "jboss:service=" + SERVICE_NAME +
",partitionName=" + this.partition.getPartitionName();
this.jmxName = new javax.management.ObjectName(name);
this.mbeanserver.registerMBean(this, jmxName);
|
public boolean | isMasterReplica(java.lang.String key)
if( trace )
log.trace("isMasterReplica, key="+key);
// if I am not a replicat, I cannot be the master...
//
if (!localReplicants.containsKey (key))
{
if( trace )
log.trace("no localReplicants, key="+key+", isMasterReplica=false");
return false;
}
Vector allNodes = this.partition.getCurrentView ();
HashMap repForKey = (HashMap)replicants.get(key);
if (repForKey==null)
{
if( trace )
log.trace("no replicants, key="+key+", isMasterReplica=true");
return true;
}
Vector replicaNodes = new Vector ((repForKey).keySet ());
boolean isMasterReplica = false;
for (int i=0; i<allNodes.size (); i++)
{
String aMember = (String)allNodes.elementAt (i);
if( trace )
log.trace("Testing member: "+aMember);
if (replicaNodes.contains (aMember))
{
if( trace )
log.trace("Member found in replicaNodes, isMasterReplica=false");
break;
}
else if (aMember.equals (this.nodeName))
{
if( trace )
log.trace("Member == nodeName, isMasterReplica=true");
isMasterReplica = true;
break;
}
}
return isMasterReplica;
|
public java.lang.String | listContent()
// we merge all replicants services: local only or not
//
java.util.Collection services = this.getAllServices ();
StringBuffer result = new StringBuffer ();
java.util.Iterator catsIter = services.iterator ();
result.append ("<pre>");
while (catsIter.hasNext ())
{
String category = (String)catsIter.next ();
HashMap content = (HashMap)this.replicants.get (category);
if (content == null)
content = new HashMap ();
java.util.Iterator keysIter = content.keySet ().iterator ();
result.append ("-----------------------------------------------\n");
result.append ("Service : ").append (category).append ("\n\n");
Serializable local = lookupLocalReplicant(category);
if (local == null)
result.append ("\t- Service is *not* available locally\n");
else
result.append ("\t- Service *is* also available locally\n");
while (keysIter.hasNext ())
{
String location = (String)keysIter.next ();
result.append ("\t- ").append(location).append ("\n");
}
result.append ("\n");
}
result.append ("</pre>");
return result.toString ();
|
public java.lang.String | listXmlContent()
// we merge all replicants services: local only or not
//
java.util.Collection services = this.getAllServices ();
StringBuffer result = new StringBuffer ();
result.append ("<ReplicantManager>\n");
java.util.Iterator catsIter = services.iterator ();
while (catsIter.hasNext ())
{
String category = (String)catsIter.next ();
HashMap content = (HashMap)this.replicants.get (category);
if (content == null)
content = new HashMap ();
java.util.Iterator keysIter = content.keySet ().iterator ();
result.append ("\t<Service>\n");
result.append ("\t\t<ServiceName>").append (category).append ("</ServiceName>\n");
Serializable local = lookupLocalReplicant(category);
if (local != null)
{
result.append ("\t\t<Location>\n");
result.append ("\t\t\t<Name local=\"True\">").append (this.nodeName).append ("</Name>\n");
result.append ("\t\t</Location>\n");
}
while (keysIter.hasNext ())
{
String location = (String)keysIter.next ();
result.append ("\t\t<Location>\n");
result.append ("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n");
result.append ("\t\t</Location>\n");
}
result.append ("\t<Service>\n");
}
result.append ("<ReplicantManager>\n");
return result.toString ();
|
public java.io.Serializable | lookupLocalReplicant(java.lang.String key)
return (Serializable)localReplicants.get(key);
|
public java.lang.Object[] | lookupLocalReplicants()Cluster callback called when a node wants to know our complete list of local replicants
partitionNameKnown.acquire (); // we don't answer until our name is known
Object[] rtn = {this.nodeName, localReplicants};
if( trace )
log.trace ("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + localReplicants.size ());
return rtn;
|
public java.util.List | lookupReplicants(java.lang.String key)
Serializable local = lookupLocalReplicant(key);
HashMap replicant = (HashMap)replicants.get(key);
if (replicant == null && local == null)
return null;
ArrayList rtn = new ArrayList();
if (replicant == null)
{
if (local != null)
rtn.add(local);
}
else
{
// JBAS-2677. Put the replicants in view order.
ClusterNode[] nodes = partition.getClusterNodes();
String replNode;
Object replVal;
for (int i = 0; i < nodes.length; i++)
{
replNode = nodes[i].getName();
if (local != null && nodeName.equals(replNode))
{
rtn.add(local);
continue;
}
replVal = replicant.get(replNode);
if (replVal != null)
rtn.add(replVal);
}
}
return rtn;
|
public java.util.List | lookupReplicantsNodeNames(java.lang.String key)
boolean locallyReplicated = localReplicants.containsKey (key);
HashMap replicant = (HashMap)replicants.get(key);
if (replicant == null && !locallyReplicated)
return null;
ArrayList rtn = new ArrayList();
if (replicant == null)
{
if (locallyReplicated)
rtn.add(this.nodeName);
}
else
{
// JBAS-2677. Put the replicants in view order.
Set keys = replicant.keySet();
ClusterNode[] nodes = partition.getClusterNodes();
String keyOwner;
for (int i = 0; i < nodes.length; i++)
{
keyOwner = nodes[i].getName();
if (locallyReplicated && nodeName.equals(keyOwner))
{
rtn.add(this.nodeName);
continue;
}
if (keys.contains(keyOwner))
rtn.add(keyOwner);
}
}
return rtn;
|
public void | membershipChanged(java.util.Vector deadMembers, java.util.Vector newMembers, java.util.Vector allMembers)
// Here we only care about deadMembers. Purge all replicant lists of deadMembers
// and then notify all listening nodes.
//
log.info("I am (" + nodeName + ") received membershipChanged event:");
log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")");
log.info("New Members : " + newMembers.size() + " (" + newMembers + ")");
log.info("All Members : " + allMembers.size() + " (" + allMembers + ")");
purgeDeadMembers(deadMembers);
// we don't need to merge members anymore
|
public void | membershipChangedDuringMerge(java.util.Vector deadMembers, java.util.Vector newMembers, java.util.Vector allMembers, java.util.Vector originatingGroups)
// Here we only care about deadMembers. Purge all replicant lists of deadMembers
// and then notify all listening nodes.
//
log.info("Merging partitions...");
log.info("Dead members: " + deadMembers.size());
log.info("Originating groups: " + originatingGroups);
purgeDeadMembers(deadMembers);
if (newMembers.size() > 0)
{
new MergeMembers().start();
}
|
protected void | mergeMembers()
boolean isAlreadyMerging = ClusterMergeStatus.isMergeInProcess();
try
{
ClusterMergeStatus.startMergeProcess();
log.debug("Start merging members in DRM service...");
java.util.HashSet notifies = new java.util.HashSet ();
ArrayList rsp = partition.callMethodOnCluster(SERVICE_NAME,
"lookupLocalReplicants",
new Object[]{}, new Class[]{}, true);
if (rsp.size() == 0)
log.debug("No responses from other nodes during the DRM merge process.");
else
{
log.debug("The DRM merge process has received " + rsp.size() + " answers");
}
for (int i = 0; i < rsp.size(); i++)
{
Object o = rsp.get(i);
if (o == null)
{
log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
continue;
}
else if (o instanceof Throwable)
{
log.warn("As part of the answers received during the DRM merge process, a Throwable was received!", (Throwable) o);
continue;
}
Object[] objs = (Object[]) o;
String node = (String)objs[0];
Map replicants = (Map)objs[1];
Iterator keys = replicants.keySet().iterator();
//FIXME: We don't remove keys in the merge process but only add new keys!
while (keys.hasNext())
{
String key = (String)keys.next();
// done to reduce duplicate notifications
if (!replicantEntryAlreadyExists (key, node))
{
addReplicant(key, node, (Serializable)replicants.get(key));
notifies.add (key);
}
}
Vector currentStatus = getKeysReplicatedByNode (node);
if (currentStatus.size () > replicants.size ())
{
// The merge process needs to remove some (now)
// unexisting keys
//
for (int currentKeysId=0, currentKeysMax=currentStatus.size (); currentKeysId<currentKeysMax; currentKeysId++)
{
String theKey = (String)currentStatus.elementAt (currentKeysId);
if (!replicants.containsKey (theKey))
{
removeReplicant (theKey, node);
notifies.add(theKey);
}
}
}
}
Iterator notifIter = notifies.iterator ();
while (notifIter.hasNext ())
{
String key = (String)notifIter.next ();
notifyKeyListeners(key, lookupReplicants(key));
}
log.debug ("..Finished merging members in DRM service");
}
catch (Exception ex)
{
log.error("merge failed", ex);
}
finally
{
if (!isAlreadyMerging)
ClusterMergeStatus.endMergeProcess();
}
|
protected static synchronized int | nextThreadID()
return threadID ++;
|
protected void | notifyKeyListeners(java.lang.String key, java.util.List newReplicants)Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed
if( trace )
log.trace("notifyKeyListeners");
// we first update the intra-view id for this particular key
//
int newId = updateReplicantsHashId (key);
ArrayList listeners = (ArrayList)keyListeners.get(key);
if (listeners == null)
{
if( trace )
log.trace("listeners is null");
return;
}
// ArrayList's iterator is not thread safe
DistributedReplicantManager.ReplicantListener[] toNotify = null;
synchronized(listeners)
{
toNotify = new DistributedReplicantManager.ReplicantListener[listeners.size()];
toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners.toArray(toNotify);
}
if( trace )
log.trace("notifying " + toNotify.length + " listeners for key change: " + key);
for (int i = 0; i < toNotify.length; i++)
{
if (toNotify[i] != null)
toNotify[i].replicantsChanged(key, newReplicants, newId);
}
|
public void | processEvent(java.lang.Object event)
KeyChangeEvent kce = (KeyChangeEvent) event;
notifyKeyListeners(kce.key, kce.replicants);
|
protected void | purgeDeadMembers(java.util.Vector deadMembers)get rid of dead members from replicant list
return true if anything was purged.
if (deadMembers.size() <= 0)
return;
log.debug("purgeDeadMembers, "+deadMembers);
try
{
synchronized(replicants)
{
Iterator keys = replicants.keySet().iterator();
while (keys.hasNext())
{
String key = (String)keys.next();
HashMap replicant = (HashMap)replicants.get(key);
boolean modified = false;
for (int i = 0; i < deadMembers.size(); i++)
{
String node = deadMembers.elementAt(i).toString();
log.debug("trying to remove deadMember " + node + " for key " + key);
Object removed = replicant.remove(node);
if (removed != null)
{
log.debug(node + " was removed");
modified = true;
}
else
{
log.debug(node + " was NOT removed!!!");
}
}
if (modified)
{
notifyKeyListeners(key, lookupReplicants(key));
}
}
}
}
catch (Exception ex)
{
log.error("purgeDeadMembers failed", ex);
}
|
public void | registerListener(java.lang.String key, DistributedReplicantManager.ReplicantListener subscriber)
synchronized(keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get(key);
if (listeners == null)
{
listeners = new ArrayList();
keyListeners.put(key, listeners);
}
listeners.add(subscriber);
}
|
public void | remove(java.lang.String key)
partitionNameKnown.acquire (); // we don't propagate until our name is known
// optimisation: we don't make a costly network call
// if there is nothing to remove
if (localReplicants.containsKey(key))
{
Object[] args = {key, this.nodeName};
partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
removeLocal(key);
}
|
protected void | removeLocal(java.lang.String key)
synchronized(localReplicants)
{
localReplicants.remove(key);
List result = lookupReplicants(key);
if (result == null)
result = new ArrayList (); // don't pass null but an empty list
notifyKeyListeners(key, result);
}
|
protected boolean | removeReplicant(java.lang.String key, java.lang.String nodeName)
synchronized(replicants)
{
HashMap replicant = (HashMap)replicants.get(key);
if (replicant == null) return false;
Object removed = replicant.remove(nodeName);
if (removed != null)
{
Collection values = replicant.values();
if (values.size() == 0)
{
replicants.remove(key);
}
return true;
}
}
return false;
|
protected boolean | replicantEntryAlreadyExists(java.lang.String key, java.lang.String nodeName)Indicates if the a replicant already exists for a given key/node pair
return replicantEntryAlreadyExists (replicants, key, nodeName);
|
protected boolean | replicantEntryAlreadyExists(java.util.Map map, java.lang.String key, java.lang.String nodeName)Indicates if the a replicant already exists for a given key/node pair in the give data structure
HashMap rep = (HashMap)map.get(key);
if (rep == null)
return false;
else
return rep.containsKey (nodeName);
|
protected void | republishLocalReplicants()
try
{
if( trace )
log.trace("Start Re-Publish local replicants in DRM");
HashMap localReplicants;
synchronized (this.localReplicants)
{
localReplicants = new HashMap(this.localReplicants);
}
Iterator entries = localReplicants.entrySet().iterator();
while( entries.hasNext() )
{
Map.Entry entry = (Map.Entry) entries.next();
String key = (String) entry.getKey();
Object replicant = entry.getValue();
if (replicant != null)
{
if( trace )
log.trace("publishing, key=" + key + ", value=" + replicant);
Object[] args = {key, this.nodeName, replicant};
partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
notifyKeyListeners(key, lookupReplicants(key));
}
}
if( trace )
log.trace("End Re-Publish local replicants");
}
catch (Exception e)
{
log.error("Re-Publish failed", e);
}
|
public void | setCurrentState(java.io.Serializable newState)
Object[] globalState = (Object[])newState;
HashMap map = (HashMap)globalState[0];
this.replicants.putAll(map);
this.intraviewIdCache = (HashMap)globalState[1];
if( trace )
{
log.trace(nodeName + ": received new state, will republish local replicants");
}
MembersPublisher publisher = new MembersPublisher();
publisher.start();
|
public void | start()
this.nodeName = this.partition.getNodeName ();
// Create the asynch listener handler thread
asynchHandler = new AsynchEventHandler(this, "AsynchKeyChangeHandler");
asynchHandler.start();
partitionNameKnown.release (); // partition name is now known!
//log.info("mergemembers");
//mergeMembers();
|
public void | stop()
// BES 200604 -- implication of NR's JBLCUSTER-38 change. Moving to
// destroy allows restart of HAPartition while local registrations
// survive -- stopping partition does not stop all registered services
// e.g. ejbs; if we maintain their registrations we can pass them to
// the cluster when we restart. However, we are leaving all the remote
// replicants we have registered around, so they will still be included
// as targets if anyone contacts our EJB while partition is stopped.
// Probably OK; if they aren't valid the client will find this out.
// NR 200505 : [JBCLUSTER-38] move to destroy
// if (localReplicants != null)
// {
// synchronized(localReplicants)
// {
// while (! localReplicants.isEmpty ())
// {
// this.remove ((String)localReplicants.keySet().iterator().next ());
// }
// }
// }
// Stop the asynch handler thread
try
{
asynchHandler.stop();
}
catch( Exception e)
{
log.warn("Failed to stop asynchHandler", e);
}
// NR 200505 : [JBCLUSTER-38] move to destroy
// this.mbeanserver.unregisterMBean (this.jmxName);
|
public void | unregisterListener(java.lang.String key, DistributedReplicantManager.ReplicantListener subscriber)
synchronized(keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get (key);
if (listeners == null) return;
listeners.remove(subscriber);
if (listeners.size() == 0)
keyListeners.remove(key);
}
|
protected int | updateReplicantsHashId(java.lang.String key)
// we first get a list of all nodes names that replicate this key
//
List nodes = this.lookupReplicantsNodeNames (key);
int result = 0;
if ( (nodes == null) || (nodes.size () == 0) )
{
// no nore replicants for this key: we uncache our view id
//
this.intraviewIdCache.remove (key);
}
else
{
result = this.calculateReplicantsHash (nodes);
this.intraviewIdCache.put (key, new Integer (result));
}
return result;
|