FileDocCategorySizeDatePackage
ChannelCoordinator.javaAPI DocApache Tomcat 6.0.1412745Fri Jul 20 04:20:34 BST 2007org.apache.catalina.tribes.group

ChannelCoordinator

public class ChannelCoordinator extends ChannelInterceptorBase implements org.apache.catalina.tribes.MessageListener
The channel coordinator object coordinates the membership service, the sender and the receiver. This is the last interceptor in the chain.
author
Filip Hanik
version
$Revision: 532943 $, $Date: 2007-04-27 05:14:58 +0200 (ven., 27 avr. 2007) $

Fields Summary
private org.apache.catalina.tribes.ChannelReceiver
clusterReceiver
private org.apache.catalina.tribes.ChannelSender
clusterSender
private org.apache.catalina.tribes.MembershipService
membershipService
protected int
optionFlag
private int
startLevel
Constructors Summary
public ChannelCoordinator()


      
        
    
public ChannelCoordinator(org.apache.catalina.tribes.ChannelReceiver receiver, org.apache.catalina.tribes.ChannelSender sender, org.apache.catalina.tribes.MembershipService service)

        this();
        this.setClusterReceiver(receiver);
        this.setClusterSender(sender);
        this.setMembershipService(service);
    
Methods Summary
public org.apache.catalina.tribes.ChannelReceivergetClusterReceiver()

        return clusterReceiver;
    
public org.apache.catalina.tribes.ChannelSendergetClusterSender()

        return clusterSender;
    
public org.apache.catalina.tribes.MembergetLocalMember(boolean incAlive)
Return the member that represents this node.

return
Member

        return this.getMembershipService().getLocalMember(incAlive);
    
public org.apache.catalina.tribes.MembergetMember(org.apache.catalina.tribes.Member mbr)

param
mbr Member
return
Member

        return this.getMembershipService().getMember(mbr);
    
public org.apache.catalina.tribes.Member[]getMembers()
Get all current cluster members

return
all members or empty array

        return this.getMembershipService().getMembers();
    
public org.apache.catalina.tribes.MembershipServicegetMembershipService()

        return membershipService;
    
public intgetOptionFlag()

       return optionFlag;
public booleanhasMembers()
has members

        return this.getMembershipService().hasMembers();
    
public voidheartbeat()

        if ( clusterSender!=null ) clusterSender.heartbeat();
        super.heartbeat();
    
protected synchronized voidinternalStart(int svc)
Starts up the channel. This can be called multiple times for individual services to start The svc parameter can be the logical or value of any constants

param
svc int value of
DEFAULT - will start all services
MBR_RX_SEQ - starts the membership receiver
MBR_TX_SEQ - starts the membership broadcaster
SND_TX_SEQ - starts the replication transmitter
SND_RX_SEQ - starts the replication receiver
throws
ChannelException if a startup error occurs or the service is already started.

        try {
            boolean valid = false;
            //make sure we don't pass down any flags that are unrelated to the bottom layer
            svc = svc & Channel.DEFAULT;

            if (startLevel == Channel.DEFAULT) return; //we have already started up all components
            if (svc == 0 ) return;//nothing to start
            
            if (svc == (svc & startLevel)) throw new ChannelException("Channel already started for level:"+svc);

            //must start the receiver first so that we can coordinate the port it
            //listens to with the local membership settings
            if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
                clusterReceiver.setMessageListener(this);
                clusterReceiver.start();
                //synchronize, big time FIXME
                membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort());
                valid = true;
            }
            if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
                clusterSender.start();
                valid = true;
            }
            
            if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
                membershipService.setMembershipListener(this);
                membershipService.start(MembershipService.MBR_RX);
                valid = true;
            }
            if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
                membershipService.start(MembershipService.MBR_TX);
                valid = true;
            }
            
            if ( !valid) {
                throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
            }
            startLevel = (startLevel | svc);
        }catch ( ChannelException cx ) {
            throw cx;
        }catch ( Exception x ) {
            throw new ChannelException(x);
        }
    
protected synchronized voidinternalStop(int svc)
Shuts down the channel. This can be called multiple times for individual services to shutdown The svc parameter can be the logical or value of any constants

param
svc int value of
DEFAULT - will shutdown all services
MBR_RX_SEQ - starts the membership receiver
MBR_TX_SEQ - starts the membership broadcaster
SND_TX_SEQ - starts the replication transmitter
SND_RX_SEQ - starts the replication receiver
throws
ChannelException if a startup error occurs or the service is already started.

        try {
            //make sure we don't pass down any flags that are unrelated to the bottom layer
            svc = svc & Channel.DEFAULT;

            if (startLevel == 0) return; //we have already stopped up all components
            if (svc == 0 ) return;//nothing to stop

            boolean valid = false;
            if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
                clusterReceiver.stop();
                clusterReceiver.setMessageListener(null);
                valid = true;
            }
            if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
                clusterSender.stop();
                valid = true;
            }

            if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
                membershipService.stop(MembershipService.MBR_RX);
                membershipService.setMembershipListener(null);
                valid = true;
                
            }
            if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
                valid = true;
                membershipService.stop(MembershipService.MBR_TX);
            }            
            if ( !valid) {
                throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
            }

            startLevel = (startLevel & (~svc));
            
        }catch ( Exception x ) {
            throw new ChannelException(x);
        } finally {
            
        }

    
public voidmemberAdded(org.apache.catalina.tribes.Member member)

        SenderState.getSenderState(member);
        super.memberAdded(member);
    
public voidmemberDisappeared(org.apache.catalina.tribes.Member member)

        SenderState.removeSenderState(member);
        super.memberDisappeared(member);
    
public voidmessageReceived(org.apache.catalina.tribes.ChannelMessage msg)

        if ( Logs.MESSAGES.isTraceEnabled() ) {
            Logs.MESSAGES.trace("ChannelCoordinator - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName());
        }
        super.messageReceived(msg);
    
public voidsendMessage(org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.ChannelMessage msg, InterceptorPayload payload)
Send a message to one or more members in the cluster

param
destination Member[] - the destinations, null or zero length means all
param
msg ClusterMessage - the message to send
param
options int - sender options, see class documentation
return
ClusterMessage[] - the replies from the members, if any.

        if ( destination == null ) destination = membershipService.getMembers();
        clusterSender.sendMessage(msg,destination);
        if ( Logs.MESSAGES.isTraceEnabled() ) {
            Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination));
        }
    
public voidsetClusterReceiver(org.apache.catalina.tribes.ChannelReceiver clusterReceiver)

        if ( clusterReceiver != null ) {
            this.clusterReceiver = clusterReceiver;
            this.clusterReceiver.setMessageListener(this);
        } else {
            if  (this.clusterReceiver!=null ) this.clusterReceiver.setMessageListener(null);
            this.clusterReceiver = null;
        }
    
public voidsetClusterSender(org.apache.catalina.tribes.ChannelSender clusterSender)

        this.clusterSender = clusterSender;
    
public voidsetMembershipService(org.apache.catalina.tribes.MembershipService membershipService)

        this.membershipService = membershipService;
        this.membershipService.setMembershipListener(this);
    
public voidsetOptionFlag(int flag)

optionFlag=flag;
public voidstart(int svc)
Starts up the channel. This can be called multiple times for individual services to start The svc parameter can be the logical or value of any constants

param
svc int value of
DEFAULT - will start all services
MBR_RX_SEQ - starts the membership receiver
MBR_TX_SEQ - starts the membership broadcaster
SND_TX_SEQ - starts the replication transmitter
SND_RX_SEQ - starts the replication receiver
throws
ChannelException if a startup error occurs or the service is already started.

        this.internalStart(svc);
    
public voidstop(int svc)
Shuts down the channel. This can be called multiple times for individual services to shutdown The svc parameter can be the logical or value of any constants

param
svc int value of
DEFAULT - will shutdown all services
MBR_RX_SEQ - stops the membership receiver
MBR_TX_SEQ - stops the membership broadcaster
SND_TX_SEQ - stops the replication transmitter
SND_RX_SEQ - stops the replication receiver
throws
ChannelException if a startup error occurs or the service is already started.

        this.internalStop(svc);