FileDocCategorySizeDatePackage
TcpFailureDetector.javaAPI DocApache Tomcat 6.0.1413513Fri Jul 20 04:20:34 BST 2007org.apache.catalina.tribes.group.interceptors

TcpFailureDetector

public class TcpFailureDetector extends org.apache.catalina.tribes.group.ChannelInterceptorBase

Title: A perfect failure detector

Description: The TcpFailureDetector is a useful interceptor that adds reliability to the membership layer.

If the network is busy, or the system is busy so that the membership receiver thread is not getting enough time to update its table, members can be "timed out" This failure detector will intercept the memberDisappeared message(unless its a true shutdown message) and connect to the member using TCP.

The TcpFailureDetector works in two ways.
1. It intercepts memberDisappeared events 2. It catches send errors

author
Filip Hanik
version
1.0

Fields Summary
private static org.apache.juli.logging.Log
log
protected static byte[]
TCP_FAIL_DETECT
protected boolean
performConnectTest
protected long
connectTimeout
protected boolean
performSendTest
protected boolean
performReadTest
protected long
readTestTimeout
protected org.apache.catalina.tribes.membership.Membership
membership
protected HashMap
removeSuspects
protected HashMap
addSuspects
Constructors Summary
Methods Summary
public voidcheckMembers(boolean checkAll)

        
        try {
            if (membership == null) setupMembership();
            synchronized (membership) {
                if ( !checkAll ) performBasicCheck();
                else performForcedCheck();
            }
        }catch ( Exception x ) {
            log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
        } finally {
            
        }
    
public org.apache.catalina.tribes.MembergetLocalMember(boolean incAlive)

        return super.getLocalMember(incAlive);
    
public org.apache.catalina.tribes.MembergetMember(org.apache.catalina.tribes.Member mbr)

        if ( membership == null ) setupMembership();
        return membership.getMember(mbr);
    
public org.apache.catalina.tribes.Member[]getMembers()

        if ( membership == null ) setupMembership();
        return membership.getMembers();
    
public booleanhasMembers()

        if ( membership == null ) setupMembership();
        return membership.hasMembers();
    
public voidheartbeat()

        super.heartbeat();
        checkMembers(false);
    
public voidmemberAdded(org.apache.catalina.tribes.Member member)

        if ( membership == null ) setupMembership();
        boolean notify = false;
        synchronized (membership) {
            if (removeSuspects.containsKey(member)) {
                //previously marked suspect, system below picked up the member again
                removeSuspects.remove(member);
            } else if (membership.getMember( (MemberImpl) member) == null){
                //if we add it here, then add it upwards too
                //check to see if it is alive
                if (memberAlive(member)) {
                    membership.memberAlive( (MemberImpl) member);
                    notify = true;
                } else {
                    addSuspects.put(member, new Long(System.currentTimeMillis()));
                }
            }
        }
        if ( notify ) super.memberAdded(member);
    
protected booleanmemberAlive(org.apache.catalina.tribes.Member mbr)

        return memberAlive(mbr,TCP_FAIL_DETECT,performSendTest,performReadTest,readTestTimeout,connectTimeout,getOptionFlag());
    
protected static booleanmemberAlive(org.apache.catalina.tribes.Member mbr, byte[] msgData, boolean sendTest, boolean readTest, long readTimeout, long conTimeout, int optionFlag)

        //could be a shutdown notification
        if ( Arrays.equals(mbr.getCommand(),Member.SHUTDOWN_PAYLOAD) ) return false;
        
        Socket socket = new Socket();        
        try {
            InetAddress ia = InetAddress.getByAddress(mbr.getHost());
            InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
            socket.setSoTimeout((int)readTimeout);
            socket.connect(addr, (int) conTimeout);
            if ( sendTest ) {
                ChannelData data = new ChannelData(true);
                data.setAddress(mbr);
                data.setMessage(new XByteBuffer(msgData,false));
                data.setTimestamp(System.currentTimeMillis());
                int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
                if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK);
                else options = (options & (~Channel.SEND_OPTIONS_USE_ACK));
                data.setOptions(options);
                byte[] message = XByteBuffer.createDataPackage(data);
                socket.getOutputStream().write(message);
                if ( readTest ) {
                    int length = socket.getInputStream().read(message);
                    return length > 0;
                }
            }//end if
            return true;
        } catch ( SocketTimeoutException sx) {
            //do nothing, we couldn't connect
        } catch ( ConnectException cx) {
            //do nothing, we couldn't connect
        }catch (Exception x ) {
            log.error("Unable to perform failure detection check, assuming member down.",x);
        } finally {
            try {socket.close(); } catch ( Exception ignore ){}
        }
        return false;
    
public voidmemberDisappeared(org.apache.catalina.tribes.Member member)

        if ( membership == null ) setupMembership();
        boolean notify = false;
        boolean shutdown = Arrays.equals(member.getCommand(),Member.SHUTDOWN_PAYLOAD);
        if ( !shutdown ) 
            if(log.isInfoEnabled())
                log.info("Received memberDisappeared["+member+"] message. Will verify.");
        synchronized (membership) {
            //check to see if the member really is gone
            //if the payload is not a shutdown message
            if (shutdown || !memberAlive(member)) {
                //not correct, we need to maintain the map
                membership.removeMember( (MemberImpl) member);
                removeSuspects.remove(member);
                notify = true;
            } else {
                //add the member as suspect
                removeSuspects.put(member, new Long(System.currentTimeMillis()));
            }
        }
        if ( notify ) {
            if(log.isInfoEnabled())
                log.info("Verification complete. Member disappeared["+member+"]");
            super.memberDisappeared(member);
        } else {
            if(log.isInfoEnabled())
                log.info("Verification complete. Member still alive["+member+"]");

        }
    
public voidmessageReceived(org.apache.catalina.tribes.ChannelMessage msg)

        //catch incoming 
        boolean process = true;
        if ( okToProcess(msg.getOptions()) ) {
            //check to see if it is a testMessage, if so, process = false
            process = ( (msg.getMessage().getLength() != TCP_FAIL_DETECT.length) ||
                        (!Arrays.equals(TCP_FAIL_DETECT,msg.getMessage().getBytes()) ) );
        }//end if
            
        //ignore the message, it doesnt have the flag set
        if ( process ) super.messageReceived(msg);
        else if ( log.isDebugEnabled() ) log.debug("Received a failure detector packet:"+msg);
    
protected voidperformBasicCheck()

        //update all alive times
        Member[] members = super.getMembers();
        for (int i = 0; members != null && i < members.length; i++) {
            if (membership.memberAlive( (MemberImpl) members[i])) {
                //we don't have this one in our membership, check to see if he/she is alive
                if (memberAlive(members[i])) {
                    log.warn("Member added, even though we werent notified:" + members[i]);
                    super.memberAdded(members[i]);
                } else {
                    membership.removeMember( (MemberImpl) members[i]);
                } //end if
            } //end if
        } //for

        //check suspect members if they are still alive,
        //if not, simply issue the memberDisappeared message
        MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
        for (int i = 0; i < keys.length; i++) {
            MemberImpl m = (MemberImpl) keys[i];
            if (membership.getMember(m) != null && (!memberAlive(m))) {
                membership.removeMember(m);
                super.memberDisappeared(m);
                removeSuspects.remove(m);
                if(log.isInfoEnabled())
                    log.info("Suspect member, confirmed dead.["+m+"]");
            } //end if
        }

        //check add suspects members if they are alive now,
        //if they are, simply issue the memberAdded message
        keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
        for (int i = 0; i < keys.length; i++) {
            MemberImpl m = (MemberImpl) keys[i];
            if ( membership.getMember(m) == null && (memberAlive(m))) {
                membership.memberAlive(m);
                super.memberAdded(m);
                addSuspects.remove(m);
                if(log.isInfoEnabled())
                    log.info("Suspect member, confirmed alive.["+m+"]");
            } //end if
        }
    
protected voidperformForcedCheck()

        //update all alive times
        Member[] members = super.getMembers();
        for (int i = 0; members != null && i < members.length; i++) {
            if (memberAlive(members[i])) {
                if (membership.memberAlive((MemberImpl)members[i])) super.memberAdded(members[i]);
                addSuspects.remove(members[i]);
            } else {
                if (membership.getMember(members[i])!=null) {
                    membership.removeMember((MemberImpl)members[i]);
                    removeSuspects.remove(members[i]);
                    super.memberDisappeared((MemberImpl)members[i]);
                }
            } //end if
        } //for

    
public voidsendMessage(org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.group.InterceptorPayload payload)

    
              
        try {
            super.sendMessage(destination, msg, payload);
        }catch ( ChannelException cx ) {
            FaultyMember[] mbrs = cx.getFaultyMembers();
            for ( int i=0; i<mbrs.length; i++ ) {
                if ( mbrs[i].getCause()!=null &&  
                     (!(mbrs[i].getCause() instanceof RemoteProcessException)) ) {//RemoteProcessException's are ok
                    this.memberDisappeared(mbrs[i].getMember());
                }//end if
            }//for
            throw cx;
        }
    
protected synchronized voidsetupMembership()

        if ( membership == null ) {
            membership = new Membership((MemberImpl)super.getLocalMember(true));
        }