NonBlockingCoordinatorpublic class NonBlockingCoordinator extends org.apache.catalina.tribes.group.ChannelInterceptorBase Title: Auto merging leader election algorithm
Description: Implementation of a simple coordinator algorithm that not only selects a coordinator,
it also merges groups automatically when members are discovered that werent part of the
This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership
to pass a token ring of the current membership.
This is not the same as just using AbsoluteOrder! Consider the following scenario:
Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all
nodes are receiving pings from all the other nodes.
meaning, that node{i} receives pings from node{all}-node{i}
but the following could happen if a multicast problem occurs.
A has members {B,C,D}
B has members {A,C}
C has members {D,E}
D has members {A,B,C,E}
E has members {A,C,D}
Because the default Tribes membership implementation, relies on the multicast packets to
arrive at all nodes correctly, there is nothing guaranteeing that it will.
To best explain how this algorithm works, lets take the above example:
For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work
where messages overlap, as they all depend on absolute order
Scenario 1: A,B,C,D,E all come online at the same time
Eval phase, A thinks of itself as leader, B thinks of A as leader,
C thinks of itself as leader, D,E think of A as leader
Token phase:
(1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)
(1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)
(2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C
(2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E
(3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D
(3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A
(4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A
(4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members
(5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E}
At this point, the state looks like
A - {A-ldr, mbrs-A,B,C,D,E, id=X}
B - {A-ldr, mbrs-A,B,C,D, id=X}
C - {A-ldr, mbrs-A,B,C,D,E, id=X}
D - {A-ldr, mbrs-A,B,C,D,E, id=X}
E - {A-ldr, mbrs-A,B,C,D,E, id=Y}
A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader.
As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have
arrived at the same membership and all nodes are informed of each other.
To synchronize the rest we simply perform the following check at A when A receives X:
Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}
Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B
When A receives X again, the token is complete.
Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then
install and accept the view.
Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.
Lets also assume that C1 sees the following view {B,D,E}
C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.
In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.
In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D
D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E
E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A
A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again.
At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E
To ensure that the view gets implemented at all nodes at the same time,
A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.
Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships
The example above, of course can be simplified with a finite statemachine:
But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.
Maybe I'll do a state diagram :)
State Diagrams
Initiate an election
Receive an election message
|
Fields Summary |
---|
protected static final byte[] | COORD_HEADERheader for a coordination message | protected static final byte[] | COORD_REQUESTCoordination request | protected static final byte[] | COORD_CONFCoordination confirmation, for blocking installations | protected static final byte[] | COORD_ALIVEAlive message | protected long | waitForCoordMsgTimeoutTime to wait for coordination timeout | protected org.apache.catalina.tribes.membership.Membership | viewOur current view | protected org.apache.catalina.tribes.UniqueId | viewIdOut current viewId | protected org.apache.catalina.tribes.membership.Membership | membershipOur nonblocking membership | protected org.apache.catalina.tribes.UniqueId | suggestedviewIdindicates that we are running an election
and this is the one we are running | protected org.apache.catalina.tribes.membership.Membership | suggestedView | protected boolean | started | protected final int | startsvc | protected Object | electionMutex | protected AtomicBoolean | coordMsgReceived |
Constructors Summary |
---|
public NonBlockingCoordinator()
super();
|
Methods Summary |
---|
public boolean | accept(org.apache.catalina.tribes.ChannelMessage msg)
return super.accept(msg);
| protected boolean | alive(org.apache.catalina.tribes.Member mbr)
return TcpFailureDetector.memberAlive(mbr,
COORD_ALIVE,
false,
false,
waitForCoordMsgTimeout,
waitForCoordMsgTimeout,
getOptionFlag());
| public org.apache.catalina.tribes.io.ChannelData | createData(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.membership.MemberImpl local)
msg.write();
ChannelData data = new ChannelData(true);
data.setAddress(local);
data.setMessage(msg.getBuffer());
data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
data.setTimestamp(System.currentTimeMillis());
return data;
| private org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage | createElectionMsg(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.membership.MemberImpl[] others, org.apache.catalina.tribes.membership.MemberImpl leader)
Membership m = new Membership(local,AbsoluteOrder.comp,true);
Arrays.fill(m,others);
MemberImpl[] mbrs = m.getMembers();
m.reset();
CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST);
return msg;
| public void | fireInterceptorEvent(InterceptorEvent event)
if (event instanceof CoordinationEvent &&
((CoordinationEvent)event).type == CoordinationEvent.EVT_CONF_RX)
log.info(event);
| public org.apache.catalina.tribes.Member | getCoordinator()Returns coordinator if one is available
return (view != null && view.hasMembers()) ? view.getMembers()[0] : null;
| public org.apache.catalina.tribes.Member | getLocalMember(boolean incAlive)Return the member that represents this node.
Member local = super.getLocalMember(incAlive);
if ( view == null && (local != null)) setupMembership();
return local;
| public org.apache.catalina.tribes.Member | getMember(org.apache.catalina.tribes.Member mbr)
return membership.getMember(mbr);
| public org.apache.catalina.tribes.Member[] | getMembers()Get all current cluster members
return membership.getMembers();
| public org.apache.catalina.tribes.Member | getNextInLine(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.membership.MemberImpl[] others)
MemberImpl result = null;
for ( int i=0; i<others.length; i++ ) {
}
return result;
| public org.apache.catalina.tribes.Member[] | getView()
return (view != null && view.hasMembers()) ? view.getMembers() : new Member[0];
| public org.apache.catalina.tribes.UniqueId | getViewId()
return viewId;
| protected void | halt()Block in/out messages while a election is going on
| protected void | handleMyToken(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender, org.apache.catalina.tribes.membership.Membership merged)
if ( local.equals(msg.getLeader()) ) {
//no leadership change
if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) {
msg.type = COORD_CONF;
super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null);
handleViewConf(msg,local,merged);
} else {
//membership change
suggestedView = new Membership(local,AbsoluteOrder.comp,true);
suggestedviewId = msg.getId();
Arrays.fill(suggestedView,merged.getMembers());
msg.view = (MemberImpl[])merged.getMembers();
sendElectionMsgToNextInline(local,msg);
}
} else {
//leadership change
suggestedView = null;
suggestedviewId = null;
msg.view = (MemberImpl[])merged.getMembers();
sendElectionMsgToNextInline(local,msg);
}
| protected void | handleOtherToken(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender, org.apache.catalina.tribes.membership.Membership merged)
if ( local.equals(msg.getLeader()) ) {
//I am the new leader
//startElection(false);
} else {
msg.view = (MemberImpl[])merged.getMembers();
sendElectionMsgToNextInline(local,msg);
}
| protected void | handleToken(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender, org.apache.catalina.tribes.membership.Membership merged)
MemberImpl local = (MemberImpl)getLocalMember(false);
if ( local.equals(msg.getSource()) ) {
//my message msg.src=local
handleMyToken(local, msg, sender,merged);
} else {
handleOtherToken(local, msg, sender,merged);
}
| protected void | handleViewConf(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender, org.apache.catalina.tribes.membership.Membership merged)
if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view
view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true);
Arrays.fill(view,msg.getMembers());
viewId = msg.getId();
if ( viewId.equals(suggestedviewId) ) {
suggestedView = null;
suggestedviewId = null;
}
if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) {
suggestedView = null;
suggestedviewId = null;
}
viewChange(viewId,view.getMembers());
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View"));
if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
startElection(false);
}
| protected boolean | hasHigherPriority(org.apache.catalina.tribes.Member[] complete, org.apache.catalina.tribes.Member[] local)
if ( local == null || local.length == 0 ) return false;
if ( complete == null || complete.length == 0 ) return true;
AbsoluteOrder.absoluteOrder(complete);
AbsoluteOrder.absoluteOrder(local);
return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0);
| public boolean | hasMembers()has members
return membership.hasMembers();
| public void | heartbeat()
try {
MemberImpl local = (MemberImpl)getLocalMember(false);
if ( view != null && (Arrays.diff(view,membership,local).length != 0 || Arrays.diff(membership,view,local).length != 0) ) {
if ( isHighest() ) {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this,
"Heartbeat found inconsistency, restart election"));
startElection(true);
}
}
} catch ( Exception x ){
log.error("Unable to perform heartbeat.",x);
} finally {
super.heartbeat();
}
| public boolean | isCoordinator()
Member coord = getCoordinator();
return coord != null && getLocalMember(false).equals(coord);
| public boolean | isHighest()
Member local = getLocalMember(false);
if ( membership.getMembers().length == 0 ) return true;
else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0;
| protected boolean | isViewConf(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg)
return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length);
| public void | memberAdded(org.apache.catalina.tribes.Member member)
memberAdded(member,true);
| public void | memberAdded(org.apache.catalina.tribes.Member member, boolean elect)
try {
if ( membership == null ) setupMembership();
if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
try {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")"));
if (started && elect) startElection(false);
}catch ( ChannelException x ) {
log.error("Unable to start election when member was added.",x);
}
}finally {
}
| public void | memberDisappeared(org.apache.catalina.tribes.Member member)
try {
membership.removeMember((MemberImpl)member);
super.memberDisappeared(member);
try {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")"));
if ( started && (isCoordinator() || isHighest()) )
startElection(true); //to do, if a member disappears, only the coordinator can start
}catch ( ChannelException x ) {
log.error("Unable to start election when member was removed.",x);
}
}finally {
}
| protected org.apache.catalina.tribes.membership.Membership | mergeOnArrive(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender)
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge"));
MemberImpl local = (MemberImpl)getLocalMember(false);
Membership merged = new Membership(local,AbsoluteOrder.comp,true);
Arrays.fill(merged,msg.getMembers());
Arrays.fill(merged,getMembers());
Member[] diff = Arrays.diff(merged,membership,local);
for ( int i=0; i<diff.length; i++ ) {
if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
else memberAdded(diff[i],false);
}
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge"));
return merged;
| public void | messageReceived(org.apache.catalina.tribes.ChannelMessage msg)
if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
//ignore message, its an alive message
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message"));
} else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
try {
CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage());
Member[] cmbr = cmsg.getMembers();
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")"));
processCoordMessage(cmsg, msg.getAddress());
}catch ( ChannelException x ) {
log.error("Error processing coordination message. Could be fatal.",x);
}
} else {
super.messageReceived(msg);
}
| protected void | processCoordMessage(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender)
if ( !coordMsgReceived.get() ) {
coordMsgReceived.set(true);
synchronized (electionMutex) { electionMutex.notifyAll();}
}
msg.timestamp = System.currentTimeMillis();
Membership merged = mergeOnArrive(msg, sender);
if (isViewConf(msg)) handleViewConf(msg, sender, merged);
else handleToken(msg, sender, merged);
ClassLoader loader;
| protected void | release()Release lock for in/out messages election is completed
| protected void | sendElectionMsg(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.membership.MemberImpl next, org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg)
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")"));
super.sendMessage(new Member[] {next}, createData(msg, local), null);
| protected void | sendElectionMsgToNextInline(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg)
int next = Arrays.nextIndex(local,msg.getMembers());
int current = next;
msg.leader = msg.getMembers()[0];
boolean sent = false;
while ( !sent && current >= 0 ) {
try {
sendElectionMsg(local, (MemberImpl) msg.getMembers()[current], msg);
sent = true;
}catch ( ChannelException x ) {
log.warn("Unable to send election message to:"+msg.getMembers()[current]);
current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers());
if ( current == next ) throw x;
}
}
| public void | sendMessage(org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.group.InterceptorPayload payload)
waitForRelease();
super.sendMessage(destination, msg, payload);
| protected synchronized void | setupMembership()
if ( membership == null ) {
membership = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,false);
}
| public void | start(int svc)
if (membership == null) setupMembership();
if (started)return;
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
super.start(startsvc);
started = true;
if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
startElection(false);
| public void | startElection(boolean force)
synchronized (electionMutex) {
MemberImpl local = (MemberImpl)getLocalMember(false);
MemberImpl[] others = (MemberImpl[])membership.getMembers();
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated"));
if ( others.length == 0 ) {
this.viewId = new UniqueId(UUIDGenerator.randomUUID(false));
this.view = new Membership(local,AbsoluteOrder.comp, true);
this.handleViewConf(this.createElectionMsg(local,others,local),local,view);
return; //the only member, no need for an election
}
if ( suggestedviewId != null ) {
if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 && Arrays.diff(suggestedView,view,local).length == 0) {
suggestedviewId = null;
suggestedView = null;
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view"));
} else {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running"));
}
return; //election already running, I'm not allowed to have two of them
}
if ( view != null && Arrays.diff(view,membership,local).length == 0 && Arrays.diff(membership,view,local).length == 0) {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, view matches membership"));
return; //already have this view installed
}
int prio = AbsoluteOrder.comp.compare(local,others[0]);
MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view?
if ( local.equals(leader) || force ) {
CoordinationMessage msg = createElectionMsg(local, others, leader);
suggestedviewId = msg.getId();
suggestedView = new Membership(local,AbsoluteOrder.comp,true);
Arrays.fill(suggestedView,msg.getMembers());
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PROCESS_ELECT,this,"Election, sending request"));
sendElectionMsg(local,others[0],msg);
} else {
try {
coordMsgReceived.set(false);
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting for request"));
electionMutex.wait(waitForCoordMsgTimeout);
}catch ( InterruptedException x ) {
Thread.currentThread().interrupted();
}
if ( suggestedviewId == null && (!coordMsgReceived.get())) {
//no message arrived, send the coord msg
// fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
// startElection(true);
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out."));
} else {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message"));
}
}//end if
}
| public void | stop(int svc)
try {
halt();
synchronized (electionMutex) {
if (!started)return;
started = false;
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop"));
super.stop(startsvc);
this.view = null;
this.viewId = null;
this.suggestedView = null;
this.suggestedviewId = null;
this.membership.reset();
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop"));
}
}finally {
release();
}
| protected void | viewChange(org.apache.catalina.tribes.UniqueId viewId, org.apache.catalina.tribes.Member[] view)
//invoke any listeners
| protected void | waitForRelease()Wait for an election to end
|
|