FileDocCategorySizeDatePackage
GroupChannel.javaAPI DocApache Tomcat 6.0.1426373Fri Jul 20 04:20:32 BST 2007org.apache.catalina.tribes.group

GroupChannel

public class GroupChannel extends ChannelInterceptorBase implements org.apache.catalina.tribes.ManagedChannel
The default implementation of a Channel.
The GroupChannel manages the replication channel. It coordinates message being sent and received with membership announcements. The channel has an chain of interceptors that can modify the message or perform other logic.
It manages a complete group, both membership and replication.
author
Filip Hanik
version
$Revision: 500684 $, $Date: 2007-01-28 00:27:18 +0100 (dim., 28 janv. 2007) $

Fields Summary
protected boolean
heartbeat
Flag to determine if the channel manages its own heartbeat If set to true, the channel will start a local thread for the heart beat.
protected long
heartbeatSleeptime
If heartbeat == true then how often do we want this heartbeat to run. default is one minute
protected HeartbeatThread
hbthread
Internal heartbeat thread
protected ChannelCoordinator
coordinator
The ChannelCoordinator coordinates the bottom layer components:
- MembershipService
- ChannelSender
- ChannelReceiver
protected org.apache.catalina.tribes.ChannelInterceptor
interceptors
The first interceptor in the inteceptor stack. The interceptors are chained in a linked list, so we only need a reference to the first one
protected ArrayList
membershipListeners
A list of membership listeners that subscribe to membership announcements
protected ArrayList
channelListeners
A list of channel listeners that subscribe to incoming messages
protected boolean
optionCheck
If set to true, the GroupChannel will check to make sure that
Constructors Summary
public GroupChannel()
Creates a GroupChannel. This constructor will also add the first interceptor in the GroupChannel.
The first interceptor is always the channel itself.


                               
      
        addInterceptor(this);
    
Methods Summary
public voidaddChannelListener(org.apache.catalina.tribes.ChannelListener channelListener)
Adds a channel listener to the channel.
Channel listeners are uniquely identified using the equals(Object) method

param
channelListener ChannelListener

        if (!this.channelListeners.contains(channelListener) ) {
            this.channelListeners.add(channelListener);
        } else {
            throw new IllegalArgumentException("Listener already exists:"+channelListener+"["+channelListener.getClass().getName()+"]");
        }
    
public voidaddInterceptor(org.apache.catalina.tribes.ChannelInterceptor interceptor)
Adds an interceptor to the stack for message processing
Interceptors are ordered in the way they are added.
channel.addInterceptor(A);
channel.addInterceptor(C);
channel.addInterceptor(B);
Will result in a interceptor stack like this:
A -> C -> B
The complete stack will look like this:
Channel -> A -> C -> B -> ChannelCoordinator

param
interceptor ChannelInterceptorBase

        if ( interceptors == null ) {
            interceptors = interceptor;
            interceptors.setNext(coordinator);
            interceptors.setPrevious(null);
            coordinator.setPrevious(interceptors);
        } else {
            ChannelInterceptor last = interceptors;
            while ( last.getNext() != coordinator ) {
                last = last.getNext();
            }
            last.setNext(interceptor);
            interceptor.setNext(coordinator);
            interceptor.setPrevious(last);
            coordinator.setPrevious(interceptor);
        }
    
public voidaddMembershipListener(org.apache.catalina.tribes.MembershipListener membershipListener)
Adds a membership listener to the channel.
Membership listeners are uniquely identified using the equals(Object) method

param
membershipListener MembershipListener

        if (!this.membershipListeners.contains(membershipListener) )
            this.membershipListeners.add(membershipListener);
    
protected voidcheckOptionFlags()
Validates the option flags that each interceptor is using and reports an error if two interceptor share the same flag.

throws
ChannelException

        StringBuffer conflicts = new StringBuffer();
        ChannelInterceptor first = interceptors;
        while ( first != null ) {
            int flag = first.getOptionFlag();
            if ( flag != 0 ) {
                ChannelInterceptor next = first.getNext();
                while ( next != null ) {
                    int nflag = next.getOptionFlag();
                    if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) {
                        conflicts.append("[");
                        conflicts.append(first.getClass().getName());
                        conflicts.append(":");
                        conflicts.append(flag);
                        conflicts.append(" == ");
                        conflicts.append(next.getClass().getName());
                        conflicts.append(":");
                        conflicts.append(nflag);
                        conflicts.append("] ");
                    }//end if
                    next = next.getNext();
                }//while
            }//end if
            first = first.getNext();
        }//while
        if ( conflicts.length() > 0 ) throw new ChannelException("Interceptor option flag conflict: "+conflicts.toString());

    
public org.apache.catalina.tribes.ChannelReceivergetChannelReceiver()
Returns the channel receiver component

return
ChannelReceiver

        return coordinator.getClusterReceiver();
    
public org.apache.catalina.tribes.ChannelSendergetChannelSender()
Returns the channel sender component

return
ChannelSender

        return coordinator.getClusterSender();
    
public org.apache.catalina.tribes.ChannelInterceptorgetFirstInterceptor()
Returns the first interceptor of the stack. Useful for traversal.

return
ChannelInterceptor

        if (interceptors != null) return interceptors;
        else return coordinator;
    
public booleangetHeartbeat()

see
#setHeartbeat(boolean)
return
boolean

        return heartbeat;
    
public longgetHeartbeatSleeptime()
Returns the sleep time in milliseconds that the internal heartbeat will sleep in between invokations of Channel.heartbeat()

return
long

        return heartbeatSleeptime;
    
public java.util.IteratorgetInterceptors()
Returns an iterator of all the interceptors in this stack

return
Iterator

        return new InterceptorIterator(this.getNext(),this.coordinator);
    
public org.apache.catalina.tribes.MembershipServicegetMembershipService()
Returns the membership service component

return
MembershipService

        return coordinator.getMembershipService();
    
public booleangetOptionCheck()

see
#setOptionCheck(boolean)
return
boolean

        return optionCheck;
    
public voidheartbeat()
Sends a heartbeat through the interceptor stack.
Invoke this method from the application on a periodic basis if you have turned off internal heartbeats channel.setHeartbeat(false)

        super.heartbeat();
        Iterator i = membershipListeners.iterator();
        while ( i.hasNext() ) {
            Object o = i.next();
            if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
        }
        i = channelListeners.iterator();
        while ( i.hasNext() ) {
            Object o = i.next();
            if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
        }

    
public voidmemberAdded(org.apache.catalina.tribes.Member member)
memberAdded gets invoked by the interceptor below the channel and the channel will broadcast it to the membership listeners

param
member Member - the new member

        //notify upwards
        for (int i=0; i<membershipListeners.size(); i++ ) {
            MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);
            if (membershipListener != null) membershipListener.memberAdded(member);
        }
    
public voidmemberDisappeared(org.apache.catalina.tribes.Member member)
memberDisappeared gets invoked by the interceptor below the channel and the channel will broadcast it to the membership listeners

param
member Member - the member that left or crashed

        //notify upwards
        for (int i=0; i<membershipListeners.size(); i++ ) {
            MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);
            if (membershipListener != null) membershipListener.memberDisappeared(member);
        }
    
public voidmessageReceived(org.apache.catalina.tribes.ChannelMessage msg)
Callback from the interceptor stack.
When a message is received from a remote node, this method will be invoked by the previous interceptor.
This method can also be used to send a message to other components within the same application, but its an extreme case, and you're probably better off doing that logic between the applications itself.

param
msg ChannelMessage

        if ( msg == null ) return;
        try {
            if ( Logs.MESSAGES.isTraceEnabled() ) {
                Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName());
            }

            Serializable fwd = null;
            if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
                fwd = new ByteMessage(msg.getMessage().getBytes());
            } else {
                fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(),0,msg.getMessage().getLength());
            }
            if ( Logs.MESSAGES.isTraceEnabled() ) {
                Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
            }

            //get the actual member with the correct alive time
            Member source = msg.getAddress();
            boolean rx = false;
            boolean delivered = false;
            for ( int i=0; i<channelListeners.size(); i++ ) {
                ChannelListener channelListener = (ChannelListener)channelListeners.get(i);
                if (channelListener != null && channelListener.accept(fwd, source)) {
                    channelListener.messageReceived(fwd, source);
                    delivered = true;
                    //if the message was accepted by an RPC channel, that channel
                    //is responsible for returning the reply, otherwise we send an absence reply
                    if ( channelListener instanceof RpcChannel ) rx = true;
                }
            }//for
            if ((!rx) && (fwd instanceof RpcMessage)) {
                //if we have a message that requires a response,
                //but none was given, send back an immediate one
                sendNoRpcChannelReply((RpcMessage)fwd,source);
            }
            if ( Logs.MESSAGES.isTraceEnabled() ) {
                Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
            }

        } catch ( Exception x ) {
            if ( log.isDebugEnabled() ) log.error("Unable to process channel:IOException.",x);
            throw new RemoteProcessException("IOException:"+x.getMessage(),x);
        }
    
public voidremoveChannelListener(org.apache.catalina.tribes.ChannelListener channelListener)
Removes a channel listener from the channel.
Channel listeners are uniquely identified using the equals(Object) method

param
channelListener ChannelListener

        channelListeners.remove(channelListener);
    
public voidremoveMembershipListener(org.apache.catalina.tribes.MembershipListener membershipListener)
Removes a membership listener from the channel.
Membership listeners are uniquely identified using the equals(Object) method

param
membershipListener MembershipListener

        membershipListeners.remove(membershipListener);
    
public org.apache.catalina.tribes.UniqueIdsend(org.apache.catalina.tribes.Member[] destination, java.io.Serializable msg, int options)
Send a message to the destinations specified

param
destination Member[] - destination.length > 1
param
msg Serializable - the message to send
param
options int - sender options, options can trigger guarantee levels and different interceptors to react to the message see class documentation for the Channel object.
return
UniqueId - the unique Id that was assigned to this message
throws
ChannelException - if an error occurs processing the message
see
org.apache.catalina.tribes.Channel

        return send(destination,msg,options,null);
    
public org.apache.catalina.tribes.UniqueIdsend(org.apache.catalina.tribes.Member[] destination, java.io.Serializable msg, int options, org.apache.catalina.tribes.ErrorHandler handler)

param
destination Member[] - destination.length > 1
param
msg Serializable - the message to send
param
options int - sender options, options can trigger guarantee levels and different interceptors to react to the message see class documentation for the Channel object.
param
handler - callback object for error handling and completion notification, used when a message is sent asynchronously using the Channel.SEND_OPTIONS_ASYNCHRONOUS flag enabled.
return
UniqueId - the unique Id that was assigned to this message
throws
ChannelException - if an error occurs processing the message
see
org.apache.catalina.tribes.Channel

        if ( msg == null ) throw new ChannelException("Cant send a NULL message");
        XByteBuffer buffer = null;
        try {
            if ( destination == null || destination.length == 0) throw new ChannelException("No destination given");
            ChannelData data = new ChannelData(true);//generates a unique Id
            data.setAddress(getLocalMember(false));
            data.setTimestamp(System.currentTimeMillis());
            byte[] b = null;
            if ( msg instanceof ByteMessage ){
                b = ((ByteMessage)msg).getMessage();
                options = options | SEND_OPTIONS_BYTE_MESSAGE;
            } else {
                b = XByteBuffer.serialize(msg);
                options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
            }
            data.setOptions(options);
            //XByteBuffer buffer = new XByteBuffer(b.length+128,false);
            buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false);
            buffer.append(b,0,b.length);
            data.setMessage(buffer);
            InterceptorPayload payload = null;
            if ( handler != null ) {
                payload = new InterceptorPayload();
                payload.setErrorHandler(handler);
            }
            getFirstInterceptor().sendMessage(destination, data, payload);
            if ( Logs.MESSAGES.isTraceEnabled() ) {
                Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination));
                Logs.MESSAGES.trace("GroupChannel - Send Message:" + new UniqueId(data.getUniqueId()) + " is " +msg);
            }

            return new UniqueId(data.getUniqueId());
        }catch ( Exception x ) {
            if ( x instanceof ChannelException ) throw (ChannelException)x;
            throw new ChannelException(x);
        } finally {
            if ( buffer != null ) BufferPool.getBufferPool().returnBuffer(buffer);
        }
    
protected voidsendNoRpcChannelReply(RpcMessage msg, org.apache.catalina.tribes.Member destination)
Sends a NoRpcChannelReply message to a member
This method gets invoked by the channel if a RPC message comes in and no channel listener accepts the message. This avoids timeout

param
msg RpcMessage
param
destination Member - the destination for the reply

        try {
            //avoid circular loop
            if ( msg instanceof RpcMessage.NoRpcChannelReply) return;
            RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid);
            send(new Member[]{destination},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS);
        } catch ( Exception x ) {
            log.error("Unable to find rpc channel, failed to send NoRpcChannelReply.",x);
        }
    
public voidsetChannelReceiver(org.apache.catalina.tribes.ChannelReceiver clusterReceiver)
Sets the channel receiver component

param
clusterReceiver ChannelReceiver

        coordinator.setClusterReceiver(clusterReceiver);
    
public voidsetChannelSender(org.apache.catalina.tribes.ChannelSender clusterSender)
Sets the channel sender component

param
clusterSender ChannelSender

        coordinator.setClusterSender(clusterSender);
    
public voidsetHeartbeat(boolean heartbeat)
Enables or disables local heartbeat. if setHeartbeat(true) is invoked then the channel will start an internal thread to invoke Channel.heartbeat() every getHeartbeatSleeptime milliseconds

param
heartbeat boolean

        this.heartbeat = heartbeat;
    
public voidsetHeartbeatSleeptime(long heartbeatSleeptime)
Configure local heartbeat sleep time
Only used when getHeartbeat()==true

param
heartbeatSleeptime long - time in milliseconds to sleep between heartbeats

        this.heartbeatSleeptime = heartbeatSleeptime;
    
public voidsetMembershipService(org.apache.catalina.tribes.MembershipService membershipService)
Sets the membership component

param
membershipService MembershipService

        coordinator.setMembershipService(membershipService);
    
public voidsetOptionCheck(boolean optionCheck)
Enables/disables the option check
Setting this to true, will make the GroupChannel perform a conflict check on the interceptors. If two interceptors are using the same option flag and throw an error upon start.

param
optionCheck boolean

        this.optionCheck = optionCheck;
    
protected synchronized voidsetupDefaultStack()
Sets up the default implementation interceptor stack if no interceptors have been added

throws
ChannelException


        if ( getFirstInterceptor() != null &&
             ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) {
            ChannelInterceptor interceptor = null;
            Class clazz = null;
            try {
                clazz = Class.forName("org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor",
                                      true,GroupChannel.class.getClassLoader());
                clazz.newInstance();
            } catch ( Throwable x ) {
                clazz = MessageDispatchInterceptor.class;
            }//catch
            try {
                interceptor = (ChannelInterceptor) clazz.newInstance();
            } catch (Exception x) {
                throw new ChannelException("Unable to add MessageDispatchInterceptor to interceptor chain.",x);
            }
            this.addInterceptor(interceptor);
        }
    
public synchronized voidstart(int svc)
Starts the channel

param
svc int - what service to start
throws
ChannelException
see
org.apache.catalina.tribes.Channel#start(int)

        setupDefaultStack();
        if (optionCheck) checkOptionFlags();
        super.start(svc);
        if ( hbthread == null && heartbeat ) {
            hbthread = new HeartbeatThread(this,heartbeatSleeptime);
            hbthread.start();
        }
    
public synchronized voidstop(int svc)
Stops the channel

param
svc int
throws
ChannelException
see
org.apache.catalina.tribes.Channel#stop(int)

        if (hbthread != null) {
            hbthread.stopHeartbeat();
            hbthread = null;
        }
        super.stop(svc);