FileDocCategorySizeDatePackage
McastServiceImpl.javaAPI DocApache Tomcat 6.0.1415208Fri Jul 20 04:20:36 BST 2007org.apache.catalina.tribes.membership

McastServiceImpl

public class McastServiceImpl extends Object
A membership implementation using simple multicast. This is the representation of a multicast membership service. This class is responsible for maintaining a list of active cluster nodes in the cluster. If a node fails to send out a heartbeat, the node will be dismissed. This is the low level implementation that handles the multicasting sockets. Need to fix this, could use java.nio and only need one thread to send and receive, or just use a timeout on the receive
author
Filip Hanik
version
$Revision: 467222 $, $Date: 2006-10-24 05:17:11 +0200 (mar., 24 oct. 2006) $

Fields Summary
private static org.apache.juli.logging.Log
log
protected static int
MAX_PACKET_SIZE
protected boolean
doRunSender
Internal flag used for the listen thread that listens to the multicasting socket.
protected boolean
doRunReceiver
protected int
startLevel
protected MulticastSocket
socket
Socket that we intend to listen to
protected MemberImpl
member
The local member that we intend to broad cast over and over again
protected InetAddress
address
The multicast address
protected int
port
The multicast port
protected long
timeToExpiration
The time it takes for a member to expire.
protected long
sendFrequency
How often to we send out a broadcast saying we are alive, must be smaller than timeToExpiration
protected DatagramPacket
sendPacket
Reuse the sendPacket, no need to create a new one everytime
protected DatagramPacket
receivePacket
Reuse the receivePacket, no need to create a new one everytime
protected Membership
membership
The membership, used so that we calculate memberships when they arrive or don't arrive
protected org.apache.catalina.tribes.MembershipListener
service
The actual listener, for callback when shits goes down
protected ReceiverThread
receiver
Thread to listen for pings
protected SenderThread
sender
Thread to send pings
protected long
serviceStartTime
When was the service started
protected int
mcastTTL
Time to live for the multicast packets that are being sent out
protected int
mcastSoTimeout
Read timeout on the mcast socket
protected InetAddress
mcastBindAddress
bind address
protected Object
expiredMutex
Constructors Summary
public McastServiceImpl(MemberImpl member, long sendFrequency, long expireTime, int port, InetAddress bind, InetAddress mcastAddress, int ttl, int soTimeout, org.apache.catalina.tribes.MembershipListener service)
Create a new mcast service impl

param
member - the local member
param
sendFrequency - the time (ms) in between pings sent out
param
expireTime - the time (ms) for a member to expire
param
port - the mcast port
param
bind - the bind address (not sure this is used yet)
param
mcastAddress - the mcast address
param
service - the callback service
throws
IOException

    
                                                                           
     
         
         
         
         
         
         
         
         
         
      
        this.member = member;
        this.address = mcastAddress;
        this.port = port;
        this.mcastSoTimeout = soTimeout;
        this.mcastTTL = ttl;
        this.mcastBindAddress = bind;
        this.timeToExpiration = expireTime;
        this.service = service;
        this.sendFrequency = sendFrequency;
        setupSocket();
        sendPacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE);
        sendPacket.setAddress(address);
        sendPacket.setPort(port);
        receivePacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE);
        receivePacket.setAddress(address);
        receivePacket.setPort(port);
        membership = new Membership(member);
    
Methods Summary
protected voidcheckExpired()

       
        synchronized (expiredMutex) {
            MemberImpl[] expired = membership.expire(timeToExpiration);
            for (int i = 0; i < expired.length; i++) {
                final MemberImpl member = expired[i];
                if (log.isDebugEnabled())
                    log.debug("Mcast exipre  member " + expired[i]);
                try {
                    Thread t = new Thread() {
                        public void run() {
                            service.memberDisappeared(member);
                        }
                    };
                    t.start();
                } catch (Exception x) {
                    log.error("Unable to process member disappeared message.", x);
                }
            }
        }
    
public longgetServiceStartTime()

       return this.serviceStartTime;
    
public voidreceive()
Receive a datagram packet, locking wait

throws
IOException

        try {
            socket.receive(receivePacket);
            if(receivePacket.getLength() > MAX_PACKET_SIZE) {
                log.error("Multicast packet received was too long, dropping package:"+receivePacket.getLength());
            } else {
                byte[] data = new byte[receivePacket.getLength()];
                System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length);
                final MemberImpl m = MemberImpl.getMember(data);
                if (log.isTraceEnabled()) log.trace("Mcast receive ping from member " + m);
                Thread t = null;
                if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) {
                    if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m);
                    membership.removeMember(m);
                    t = new Thread() {
                        public void run() {
                            service.memberDisappeared(m);
                        }
                    };
                } else if (membership.memberAlive(m)) {
                    if (log.isDebugEnabled()) log.debug("Mcast add member " + m);
                    t = new Thread() {
                        public void run() {
                            service.memberAdded(m);
                        }
                    };
                } //end if
                if ( t != null ) t.start();
            }
        } catch (SocketTimeoutException x ) { 
            //do nothing, this is normal, we don't want to block forever
            //since the receive thread is the same thread
            //that does membership expiration
        }
        checkExpired();
    
public voidsend(boolean checkexpired)
Send a ping

throws
Exception

        //ignore if we haven't started the sender
        //if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return;
        member.inc();
        if(log.isTraceEnabled())
            log.trace("Mcast send ping from member " + member);
        byte[] data = member.getData();
        DatagramPacket p = new DatagramPacket(data,data.length);
        p.setAddress(address);
        p.setPort(port);
        socket.send(p);
        if ( checkexpired ) checkExpired();
    
protected voidsetupSocket()

        if (mcastBindAddress != null) socket = new MulticastSocket(new InetSocketAddress(mcastBindAddress, port));
        else socket = new MulticastSocket(port);
        socket.setLoopbackMode(false); //hint that we don't need loop back messages
        if (mcastBindAddress != null) {
			if(log.isInfoEnabled())
                log.info("Setting multihome multicast interface to:" +mcastBindAddress);
            socket.setInterface(mcastBindAddress);
        } //end if
        //force a so timeout so that we don't block forever
        if ( mcastSoTimeout <= 0 ) mcastSoTimeout = (int)sendFrequency;
        if(log.isInfoEnabled())
            log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout);
        socket.setSoTimeout(mcastSoTimeout);

        if ( mcastTTL >= 0 ) {
			if(log.isInfoEnabled())
                log.info("Setting cluster mcast TTL to " + mcastTTL);
            socket.setTimeToLive(mcastTTL);
        }
    
public synchronized voidstart(int level)
Start the service

param
level 1 starts the receiver, level 2 starts the sender
throws
IOException if the service fails to start
throws
IllegalStateException if the service is already started

        boolean valid = false;
        if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) {
            if ( receiver != null ) throw new IllegalStateException("McastService.receive already running.");
            if ( sender == null ) socket.joinGroup(address);
            doRunReceiver = true;
            receiver = new ReceiverThread();
            receiver.setDaemon(true);
            receiver.start();
            valid = true;
        } 
        if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
            if ( sender != null ) throw new IllegalStateException("McastService.send already running.");
            if ( receiver == null ) socket.joinGroup(address);
            //make sure at least one packet gets out there
            send(false);
            doRunSender = true;
            serviceStartTime = System.currentTimeMillis();
            sender = new SenderThread(sendFrequency);
            sender.setDaemon(true);
            sender.start();
            //we have started the receiver, but not yet waited for membership to establish
            valid = true;
        } 
        if (!valid) {
            throw new IllegalArgumentException("Invalid start level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
        }
        //pause, once or twice
        waitForMembers(level);
        startLevel = (startLevel | level);
    
public synchronized booleanstop(int level)
Stops the service

throws
IOException if the service fails to disconnect from the sockets

        boolean valid = false;
        
        if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) {
            valid = true;
            doRunReceiver = false;
            if ( receiver !=null ) receiver.interrupt();
            receiver = null;
        } 
        if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
            valid = true;
            doRunSender = false;
            if ( sender != null )sender.interrupt();
            sender = null;
        } 
        
        if (!valid) {
            throw new IllegalArgumentException("Invalid stop level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
        }
        startLevel = (startLevel & (~level));
        //we're shutting down, send a shutdown message and close the socket
        if ( startLevel == 0 ) {
            //send a stop message
            member.setCommand(Member.SHUTDOWN_PAYLOAD);
            member.getData(true, true);
            send(false);
            //leave mcast group
            try {socket.leaveGroup(address);}catch ( Exception ignore){}
            serviceStartTime = Long.MAX_VALUE;
        }
        return (startLevel == 0);
    
private voidwaitForMembers(int level)

        long memberwait = sendFrequency*2;
        if(log.isInfoEnabled())
            log.info("Sleeping for "+memberwait+" milliseconds to establish cluster membership, start level:"+level);
        try {Thread.sleep(memberwait);}catch (InterruptedException ignore){}
        if(log.isInfoEnabled())
            log.info("Done sleeping, membership established, start level:"+level);