FileDocCategorySizeDatePackage
MessageDispatchInterceptor.javaAPI DocApache Tomcat 6.0.147672Fri Jul 20 04:20:36 BST 2007org.apache.catalina.tribes.group.interceptors

MessageDispatchInterceptor

public class MessageDispatchInterceptor extends org.apache.catalina.tribes.group.ChannelInterceptorBase implements Runnable
The message dispatcher is a way to enable asynchronous communication through a channel. The dispatcher will look for the Channel.SEND_OPTIONS_ASYNCHRONOUS flag to be set, if it is, it will queue the message for delivery and immediately return to the sender.
author
Filip Hanik
version
1.0

Fields Summary
protected static org.apache.juli.logging.Log
log
protected long
maxQueueSize
protected org.apache.catalina.tribes.transport.bio.util.FastQueue
queue
protected boolean
run
protected Thread
msgDispatchThread
protected long
currentSize
protected boolean
useDeepClone
protected boolean
alwaysSend
Constructors Summary
public MessageDispatchInterceptor()


      
        setOptionFlag(Channel.SEND_OPTIONS_ASYNCHRONOUS);
    
Methods Summary
public synchronized longaddAndGetCurrentSize(long inc)

        currentSize += inc;
        return currentSize;
    
public booleanaddToQueue(org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.group.InterceptorPayload payload)

        return queue.add(msg,destination,payload);
    
public longgetCurrentSize()

        return currentSize;
    
public longgetMaxQueueSize()

        return maxQueueSize;
    
public booleangetUseDeepClone()

        return useDeepClone;
    
public org.apache.catalina.tribes.transport.bio.util.LinkObjectremoveFromQueue()

        return queue.remove();
    
public voidrun()

        while ( run ) {
            LinkObject link = removeFromQueue();
            if ( link == null ) continue; //should not happen unless we exceed wait time
            while ( link != null && run ) {
                link = sendAsyncData(link);
            }//while
        }//while
    
protected org.apache.catalina.tribes.transport.bio.util.LinkObjectsendAsyncData(org.apache.catalina.tribes.transport.bio.util.LinkObject link)

        ChannelMessage msg = link.data();
        Member[] destination = link.getDestination();
        try {
            super.sendMessage(destination,msg,null);
            try {
                if ( link.getHandler() != null ) link.getHandler().handleCompletion(new UniqueId(msg.getUniqueId())); 
            } catch ( Exception ex ) {
                log.error("Unable to report back completed message.",ex);
            }
        } catch ( Exception x ) {
            ChannelException cx = null;
            if ( x instanceof ChannelException ) cx = (ChannelException)x;
            else cx = new ChannelException(x);
            if ( log.isDebugEnabled() ) log.debug("Error while processing async message.",x);
            try {
                if (link.getHandler() != null) link.getHandler().handleError(cx, new UniqueId(msg.getUniqueId()));
            } catch ( Exception ex ) {
                log.error("Unable to report back error message.",ex);
            }
        } finally {
            addAndGetCurrentSize(-msg.getMessage().getLength());
            link = link.next();
        }//try
        return link;
    
public voidsendMessage(org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.group.InterceptorPayload payload)

        boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS;
        if ( async && run ) {
            if ( (getCurrentSize()+msg.getMessage().getLength()) > maxQueueSize ) {
                if ( alwaysSend ) {
                    super.sendMessage(destination,msg,payload);
                    return;
                } else {
                    throw new ChannelException("Asynchronous queue is full, reached its limit of " + maxQueueSize +" bytes, current:" + getCurrentSize() + " bytes.");
                }//end if
            }//end if
            //add to queue
            if ( useDeepClone ) msg = (ChannelMessage)msg.deepclone();
            if (!addToQueue(msg, destination, payload) ) {
                throw new ChannelException("Unable to add the message to the async queue, queue bug?");
            }
            addAndGetCurrentSize(msg.getMessage().getLength());
        } else {
            super.sendMessage(destination, msg, payload);
        }
    
public synchronized longsetAndGetCurrentSize(long value)

        currentSize = value;
        return value;
    
public voidsetMaxQueueSize(long maxQueueSize)

        this.maxQueueSize = maxQueueSize;
    
public voidsetOptionFlag(int flag)

        if ( flag != Channel.SEND_OPTIONS_ASYNCHRONOUS ) log.warn("Warning, you are overriding the asynchronous option flag, this will disable the Channel.SEND_OPTIONS_ASYNCHRONOUS that other apps might use.");
        super.setOptionFlag(flag);
    
public voidsetUseDeepClone(boolean useDeepClone)

        this.useDeepClone = useDeepClone;
    
public voidstart(int svc)

        //start the thread
        if (!run ) {
            synchronized (this) {
                if ( !run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ) ) {//only start with the sender
                    startQueue();
                }//end if
            }//sync
        }//end if
        super.start(svc);
    
public voidstartQueue()

        msgDispatchThread = new Thread(this);
        msgDispatchThread.setName("MessageDispatchInterceptor.MessageDispatchThread");
        msgDispatchThread.setDaemon(true);
        msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
        queue.setEnabled(true);
        run = true;
        msgDispatchThread.start();
    
public voidstop(int svc)

        //stop the thread
        if ( run ) {
            synchronized (this) {
                if ( run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ)) {
                    stopQueue();
                }//end if
            }//sync
        }//end if

        super.stop(svc);
    
public voidstopQueue()

        run = false;
        msgDispatchThread.interrupt();
        queue.setEnabled(false);
        setAndGetCurrentSize(0);