FileDocCategorySizeDatePackage
NonBlockingCoordinator.javaAPI DocApache Tomcat 6.0.1436782Fri Jul 20 04:20:32 BST 2007org.apache.catalina.tribes.group.interceptors

NonBlockingCoordinator

public class NonBlockingCoordinator extends org.apache.catalina.tribes.group.ChannelInterceptorBase

Title: Auto merging leader election algorithm

Description: Implementation of a simple coordinator algorithm that not only selects a coordinator, it also merges groups automatically when members are discovered that werent part of the

This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on

This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership to pass a token ring of the current membership.
This is not the same as just using AbsoluteOrder! Consider the following scenario:
Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all nodes are receiving pings from all the other nodes. meaning, that node{i} receives pings from node{all}-node{i}
but the following could happen if a multicast problem occurs. A has members {B,C,D}
B has members {A,C}
C has members {D,E}
D has members {A,B,C,E}
E has members {A,C,D}
Because the default Tribes membership implementation, relies on the multicast packets to arrive at all nodes correctly, there is nothing guaranteeing that it will.

To best explain how this algorithm works, lets take the above example: For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work where messages overlap, as they all depend on absolute order
Scenario 1: A,B,C,D,E all come online at the same time Eval phase, A thinks of itself as leader, B thinks of A as leader, C thinks of itself as leader, D,E think of A as leader
Token phase:
(1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)
(1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)
(2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C
(2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E
(3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D
(3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A
(4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A
(4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members
(5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E}
At this point, the state looks like
A - {A-ldr, mbrs-A,B,C,D,E, id=X}
B - {A-ldr, mbrs-A,B,C,D, id=X}
C - {A-ldr, mbrs-A,B,C,D,E, id=X}
D - {A-ldr, mbrs-A,B,C,D,E, id=X}
E - {A-ldr, mbrs-A,B,C,D,E, id=Y}

A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader. As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have arrived at the same membership and all nodes are informed of each other.
To synchronize the rest we simply perform the following check at A when A receives X:
Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}
Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B When A receives X again, the token is complete.
Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then install and accept the view.

Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.
Lets also assume that C1 sees the following view {B,D,E}
C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.
In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.
In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D
D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E
E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A
A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again. At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E

To ensure that the view gets implemented at all nodes at the same time, A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.

Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships

The example above, of course can be simplified with a finite statemachine:
But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.
Maybe I'll do a state diagram :)

State Diagrams

Initiate an election

Receive an election message

author
Filip Hanik
version
1.0

Fields Summary
protected static final byte[]
COORD_HEADER
header for a coordination message
protected static final byte[]
COORD_REQUEST
Coordination request
protected static final byte[]
COORD_CONF
Coordination confirmation, for blocking installations
protected static final byte[]
COORD_ALIVE
Alive message
protected long
waitForCoordMsgTimeout
Time to wait for coordination timeout
protected org.apache.catalina.tribes.membership.Membership
view
Our current view
protected org.apache.catalina.tribes.UniqueId
viewId
Out current viewId
protected org.apache.catalina.tribes.membership.Membership
membership
Our nonblocking membership
protected org.apache.catalina.tribes.UniqueId
suggestedviewId
indicates that we are running an election and this is the one we are running
protected org.apache.catalina.tribes.membership.Membership
suggestedView
protected boolean
started
protected final int
startsvc
protected Object
electionMutex
protected AtomicBoolean
coordMsgReceived
Constructors Summary
public NonBlockingCoordinator()

    
      
        super();
    
Methods Summary
public booleanaccept(org.apache.catalina.tribes.ChannelMessage msg)

        return super.accept(msg);
    
protected booleanalive(org.apache.catalina.tribes.Member mbr)

        return TcpFailureDetector.memberAlive(mbr,
                                              COORD_ALIVE,
                                              false,
                                              false,
                                              waitForCoordMsgTimeout,
                                              waitForCoordMsgTimeout,
                                              getOptionFlag());
    
public org.apache.catalina.tribes.io.ChannelDatacreateData(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.membership.MemberImpl local)

        msg.write();
        ChannelData data = new ChannelData(true);
        data.setAddress(local);
        data.setMessage(msg.getBuffer());
        data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
        data.setTimestamp(System.currentTimeMillis());
        return data;
    
private org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessagecreateElectionMsg(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.membership.MemberImpl[] others, org.apache.catalina.tribes.membership.MemberImpl leader)

        Membership m = new Membership(local,AbsoluteOrder.comp,true);
        Arrays.fill(m,others);
        MemberImpl[] mbrs = m.getMembers();
        m.reset(); 
        CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST);
        return msg;
    
public voidfireInterceptorEvent(InterceptorEvent event)

        if (event instanceof CoordinationEvent &&
            ((CoordinationEvent)event).type == CoordinationEvent.EVT_CONF_RX) 
            log.info(event);
    
public org.apache.catalina.tribes.MembergetCoordinator()
Returns coordinator if one is available

return
Member

        return (view != null && view.hasMembers()) ? view.getMembers()[0] : null;
    
public org.apache.catalina.tribes.MembergetLocalMember(boolean incAlive)
Return the member that represents this node.

return
Member

        Member local = super.getLocalMember(incAlive);
        if ( view == null && (local != null)) setupMembership();
        return local;
    
public org.apache.catalina.tribes.MembergetMember(org.apache.catalina.tribes.Member mbr)

param
mbr Member
return
Member

        
        return membership.getMember(mbr);
    
public org.apache.catalina.tribes.Member[]getMembers()
Get all current cluster members

return
all members or empty array

        
        return membership.getMembers();
    
public org.apache.catalina.tribes.MembergetNextInLine(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.membership.MemberImpl[] others)

        MemberImpl result = null;
        for ( int i=0; i<others.length; i++ ) {
            
        }
        return result;
    
public org.apache.catalina.tribes.Member[]getView()

        return (view != null && view.hasMembers()) ? view.getMembers() : new Member[0];
    
public org.apache.catalina.tribes.UniqueIdgetViewId()

        return viewId;
    
protected voidhalt()
Block in/out messages while a election is going on


   
protected voidhandleMyToken(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender, org.apache.catalina.tribes.membership.Membership merged)

        if ( local.equals(msg.getLeader()) ) {
            //no leadership change
            if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) {
                msg.type = COORD_CONF;
                super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null);
                handleViewConf(msg,local,merged);
            } else {
                //membership change
                suggestedView = new Membership(local,AbsoluteOrder.comp,true);
                suggestedviewId = msg.getId();
                Arrays.fill(suggestedView,merged.getMembers());
                msg.view = (MemberImpl[])merged.getMembers();
                sendElectionMsgToNextInline(local,msg);
            }
        } else {
            //leadership change
            suggestedView = null;
            suggestedviewId = null;
            msg.view = (MemberImpl[])merged.getMembers();
            sendElectionMsgToNextInline(local,msg);
        }
    
protected voidhandleOtherToken(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender, org.apache.catalina.tribes.membership.Membership merged)

        if ( local.equals(msg.getLeader()) ) {
            //I am the new leader
            //startElection(false);
        } else {
            msg.view = (MemberImpl[])merged.getMembers();
            sendElectionMsgToNextInline(local,msg);
        }
    
protected voidhandleToken(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender, org.apache.catalina.tribes.membership.Membership merged)

        MemberImpl local = (MemberImpl)getLocalMember(false);
        if ( local.equals(msg.getSource()) ) {
            //my message msg.src=local
            handleMyToken(local, msg, sender,merged);
        } else {
            handleOtherToken(local, msg, sender,merged);
        }
    
protected voidhandleViewConf(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender, org.apache.catalina.tribes.membership.Membership merged)

        if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view
        view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true);
        Arrays.fill(view,msg.getMembers());
        viewId = msg.getId();
        
        if ( viewId.equals(suggestedviewId) ) {
            suggestedView = null;
            suggestedviewId = null;
        }
        
        if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) {
            suggestedView = null;
            suggestedviewId = null;
        }
        
        viewChange(viewId,view.getMembers());
        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View"));
        
        if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
            startElection(false);
        }
    
protected booleanhasHigherPriority(org.apache.catalina.tribes.Member[] complete, org.apache.catalina.tribes.Member[] local)

        if ( local == null || local.length == 0 ) return false;
        if ( complete == null || complete.length == 0 ) return true;
        AbsoluteOrder.absoluteOrder(complete);
        AbsoluteOrder.absoluteOrder(local);
        return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0);
        
    
public booleanhasMembers()
has members

        
        return membership.hasMembers();
    
public voidheartbeat()

        try {
            MemberImpl local = (MemberImpl)getLocalMember(false);
            if ( view != null && (Arrays.diff(view,membership,local).length != 0 ||  Arrays.diff(membership,view,local).length != 0) ) {
                if ( isHighest() ) {
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this,
                                                               "Heartbeat found inconsistency, restart election"));
                    startElection(true);
                }            
            }
        } catch ( Exception x  ){
            log.error("Unable to perform heartbeat.",x);
        } finally {
            super.heartbeat();
        }
    
public booleanisCoordinator()

        Member coord = getCoordinator();
        return coord != null && getLocalMember(false).equals(coord);
    
public booleanisHighest()

        Member local = getLocalMember(false);
        if ( membership.getMembers().length == 0 ) return true;
        else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0;
    
protected booleanisViewConf(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg)

        return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length);
    
public voidmemberAdded(org.apache.catalina.tribes.Member member)

        memberAdded(member,true);
    
public voidmemberAdded(org.apache.catalina.tribes.Member member, boolean elect)

        try {
            if ( membership == null ) setupMembership();
            if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
            try {
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")"));
                if (started && elect) startElection(false);
            }catch ( ChannelException x ) {
                log.error("Unable to start election when member was added.",x);
            }
        }finally {
        }
        
    
public voidmemberDisappeared(org.apache.catalina.tribes.Member member)

        try {
            
            membership.removeMember((MemberImpl)member);
            super.memberDisappeared(member);
            try {
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")"));
                if ( started && (isCoordinator() || isHighest()) ) 
                    startElection(true); //to do, if a member disappears, only the coordinator can start
            }catch ( ChannelException x ) {
                log.error("Unable to start election when member was removed.",x);
            }
        }finally {
        }
    
protected org.apache.catalina.tribes.membership.MembershipmergeOnArrive(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender)

        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge"));
        MemberImpl local = (MemberImpl)getLocalMember(false);
        Membership merged = new Membership(local,AbsoluteOrder.comp,true);
        Arrays.fill(merged,msg.getMembers());
        Arrays.fill(merged,getMembers());
        Member[] diff = Arrays.diff(merged,membership,local);
        for ( int i=0; i<diff.length; i++ ) {
            if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
            else memberAdded(diff[i],false);
        }
        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge"));
        return merged;
    
public voidmessageReceived(org.apache.catalina.tribes.ChannelMessage msg)

        if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
            //ignore message, its an alive message
            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message"));

        } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
            try {
                CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage());
                Member[] cmbr = cmsg.getMembers();
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")"));
                processCoordMessage(cmsg, msg.getAddress());
            }catch ( ChannelException x ) {
                log.error("Error processing coordination message. Could be fatal.",x);
            }
        } else {
            super.messageReceived(msg);
        }
    
protected voidprocessCoordMessage(org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg, org.apache.catalina.tribes.Member sender)

        if ( !coordMsgReceived.get() ) {
            coordMsgReceived.set(true);
            synchronized (electionMutex) { electionMutex.notifyAll();}
        } 
        msg.timestamp = System.currentTimeMillis();
        Membership merged = mergeOnArrive(msg, sender);
        if (isViewConf(msg)) handleViewConf(msg, sender, merged);
        else handleToken(msg, sender, merged);
        ClassLoader loader;

    
protected voidrelease()
Release lock for in/out messages election is completed


   
protected voidsendElectionMsg(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.membership.MemberImpl next, org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg)

        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")"));
        super.sendMessage(new Member[] {next}, createData(msg, local), null);
    
protected voidsendElectionMsgToNextInline(org.apache.catalina.tribes.membership.MemberImpl local, org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator$CoordinationMessage msg)

 
        int next = Arrays.nextIndex(local,msg.getMembers());
        int current = next;
        msg.leader = msg.getMembers()[0];
        boolean sent =  false;
        while ( !sent && current >= 0 ) {
            try {
                sendElectionMsg(local, (MemberImpl) msg.getMembers()[current], msg);
                sent = true;
            }catch ( ChannelException x  ) {
                log.warn("Unable to send election message to:"+msg.getMembers()[current]);
                current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers());
                if ( current == next ) throw x;
            }
        }
    
public voidsendMessage(org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.group.InterceptorPayload payload)

        waitForRelease();
        super.sendMessage(destination, msg, payload);
    
protected synchronized voidsetupMembership()

        if ( membership == null ) {
            membership  = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,false);
        }
    
public voidstart(int svc)

            if (membership == null) setupMembership();
            if (started)return;
            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
            super.start(startsvc);
            started = true;
            if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
            startElection(false);
    
public voidstartElection(boolean force)

        synchronized (electionMutex) {
            MemberImpl local = (MemberImpl)getLocalMember(false);
            MemberImpl[] others = (MemberImpl[])membership.getMembers();
            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated"));
            if ( others.length == 0 ) {
                this.viewId = new UniqueId(UUIDGenerator.randomUUID(false));
                this.view = new Membership(local,AbsoluteOrder.comp, true);
                this.handleViewConf(this.createElectionMsg(local,others,local),local,view);
                return; //the only member, no need for an election
            }
            if ( suggestedviewId != null ) {
                
                if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 &&  Arrays.diff(suggestedView,view,local).length == 0) {
                    suggestedviewId = null;
                    suggestedView = null;
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view"));
                } else {
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running"));
                }
                return; //election already running, I'm not allowed to have two of them
            }
            if ( view != null && Arrays.diff(view,membership,local).length == 0 &&  Arrays.diff(membership,view,local).length == 0) {
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, view matches membership"));
                return; //already have this view installed
            }            
            int prio = AbsoluteOrder.comp.compare(local,others[0]);
            MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view?
            if ( local.equals(leader) || force ) {
                CoordinationMessage msg = createElectionMsg(local, others, leader);
                suggestedviewId = msg.getId();
                suggestedView = new Membership(local,AbsoluteOrder.comp,true);
                Arrays.fill(suggestedView,msg.getMembers());
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PROCESS_ELECT,this,"Election, sending request"));
                sendElectionMsg(local,others[0],msg);
            } else {
                try {
                    coordMsgReceived.set(false);
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting for request"));
                    electionMutex.wait(waitForCoordMsgTimeout);
                }catch ( InterruptedException x ) {
                    Thread.currentThread().interrupted();
                }
                if ( suggestedviewId == null && (!coordMsgReceived.get())) {
                    //no message arrived, send the coord msg
//                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
//                    startElection(true);
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out."));
                } else {
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message"));
                }
            }//end if
            
        }
    
public voidstop(int svc)

        try {
            halt();
            synchronized (electionMutex) {
                if (!started)return;
                started = false;
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop"));
                super.stop(startsvc);
                this.view = null;
                this.viewId = null;
                this.suggestedView = null;
                this.suggestedviewId = null;
                this.membership.reset();
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop"));
            }
        }finally {
            release();
        }
    
protected voidviewChange(org.apache.catalina.tribes.UniqueId viewId, org.apache.catalina.tribes.Member[] view)

        //invoke any listeners
    
protected voidwaitForRelease()
Wait for an election to end