FileDocCategorySizeDatePackage
TcpPingInterceptor.javaAPI DocApache Tomcat 6.0.146457Fri Jul 20 04:20:32 BST 2007org.apache.catalina.tribes.group.interceptors

TcpPingInterceptor.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.catalina.tribes.group.interceptors;

import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.io.ChannelData;

/**
 * 
 * Sends a ping to all members.
 * Configure this interceptor with the TcpFailureDetector below it,
 * and the TcpFailureDetector will act as the membership guide.
 * @author Filip Hanik
 * @version 1.0
 */

public class TcpPingInterceptor extends ChannelInterceptorBase {
    
    protected static org.apache.juli.logging.Log log = 
        org.apache.juli.logging.LogFactory.getLog(TcpPingInterceptor.class);
    
    protected static byte[] TCP_PING_DATA = new byte[] {
        79, -89, 115, 72, 121, -33, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
        125, -39, 82, 91, -21, -33, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
        55, 21, -66, -121, 69, 33, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
        85, -10, -108, -73, 58, -33, 33, 120, -111, 4, 125, -41, 114, -124, -64, -43};  

    protected long interval = 1000; //1 second

    protected boolean useThread = false;
    protected boolean staticOnly = false;
    protected boolean running = true;
    protected PingThread thread = null;
    protected static AtomicInteger cnt = new AtomicInteger(0);
    
    WeakReference<TcpFailureDetector> failureDetector = null;
    WeakReference<StaticMembershipInterceptor> staticMembers = null;
    
    public synchronized void start(int svc) throws ChannelException {
        super.start(svc);
        running = true;
        if ( thread == null ) {
            thread = new PingThread();
            thread.setDaemon(true);
            thread.setName("TcpPingInterceptor.PingThread-"+cnt.addAndGet(1));
            thread.start();
        }
        
        //acquire the interceptors to invoke on send ping events
        ChannelInterceptor next = getNext();
        while ( next != null ) {
            if ( next instanceof TcpFailureDetector ) 
                failureDetector = new WeakReference<TcpFailureDetector>((TcpFailureDetector)next);
            if ( next instanceof StaticMembershipInterceptor ) 
                staticMembers = new WeakReference<StaticMembershipInterceptor>((StaticMembershipInterceptor)next);
            next = next.getNext();
        }
        
    }
    
    public void stop(int svc) throws ChannelException {
        running = false;
        if ( thread != null ) thread.interrupt();
        thread = null;
        super.stop(svc);
    }
    
    public void heartbeat() {
        super.heartbeat();
        if (!getUseThread()) sendPing();
    }

    public long getInterval() {
        return interval;
    }

    public void setInterval(long interval) {
        this.interval = interval;
    }

    public void setUseThread(boolean useThread) {
        this.useThread = useThread;
    }

    public void setStaticOnly(boolean staticOnly) {
        this.staticOnly = staticOnly;
    }

    public boolean getUseThread() {
        return useThread;
    }

    public boolean getStaticOnly() {
        return staticOnly;
    }

    protected void sendPing() {
        if (failureDetector.get()!=null) {
            //we have a reference to the failure detector
            //piggy back on that dude
            failureDetector.get().checkMembers(true);
        }else {
            if (staticOnly && staticMembers.get()!=null) {
                sendPingMessage(staticMembers.get().getMembers());
            } else {
                sendPingMessage(getMembers());
            }
        }
    }

    protected void sendPingMessage(Member[] members) {
        if ( members == null || members.length == 0 ) return;
        ChannelData data = new ChannelData(true);//generates a unique Id
        data.setAddress(getLocalMember(false));
        data.setTimestamp(System.currentTimeMillis());
        data.setOptions(getOptionFlag());
        try {
            super.sendMessage(members, data, null);
        }catch (ChannelException x) {
            log.warn("Unable to send TCP ping.",x);
        }
    }
    
    public void messageReceived(ChannelMessage msg) {
        //catch incoming 
        boolean process = true;
        if ( okToProcess(msg.getOptions()) ) {
            //check to see if it is a ping message, if so, process = false
            process = ( (msg.getMessage().getLength() != TCP_PING_DATA.length) ||
                        (!Arrays.equals(TCP_PING_DATA,msg.getMessage().getBytes()) ) );
        }//end if

        //ignore the message, it doesnt have the flag set
        if ( process ) super.messageReceived(msg);
        else if ( log.isDebugEnabled() ) log.debug("Received a TCP ping packet:"+msg);
    }//messageReceived
    
    protected class PingThread extends Thread {
        public void run() {
            while (running) {
                try {
                    sleep(interval);
                    sendPing();
                }catch ( InterruptedException ix ) {
                    interrupted();
                }catch ( Exception x )  {
                    log.warn("Unable to send ping from TCP ping thread.",x);
                }
            }
        }
    }

    
    

}