FileDocCategorySizeDatePackage
OrderInterceptor.javaAPI DocApache Tomcat 6.0.1411796Fri Jul 20 04:20:34 BST 2007org.apache.catalina.tribes.group.interceptors

OrderInterceptor

public class OrderInterceptor extends org.apache.catalina.tribes.group.ChannelInterceptorBase
The order interceptor guarantees that messages are received in the same order they were sent. This interceptor works best with the ack=true setting.
There is no point in using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.
If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads, this interceptor can really slow you down, as many messages will be completely out of order and the queue might become rather large. If this is the case, then you might want to set the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue)
Configuration Options
OrderInteceptor.expire= - if a message arrives out of order, how long before we act on it default=3000ms
OrderInteceptor.maxQueue= - how much can the queue grow to ensure ordering. This setting is useful to avoid OutOfMemoryErrorsdefault=Integer.MAX_VALUE
OrderInterceptor.forwardExpired= - this flag tells the interceptor what to do when a message has expired or the queue has grown larger than the maxQueue value. true means that the message is sent up the stack to the receiver that will receive and out of order message false means, forget the message and reset the message counter. default=true
author
Filip Hanik
version
1.1

Fields Summary
private HashMap
outcounter
private HashMap
incounter
private HashMap
incoming
private long
expire
private boolean
forwardExpired
private int
maxQueue
ReentrantReadWriteLock
inLock
ReentrantReadWriteLock
outLock
Constructors Summary
Methods Summary
public longgetExpire()

        return expire;
    
public booleangetForwardExpired()

        return forwardExpired;
    
protected org.apache.catalina.tribes.group.interceptors.OrderInterceptor$CountergetInCounter(org.apache.catalina.tribes.Member mbr)

        Counter cnt = (Counter)incounter.get(mbr);
        if ( cnt == null ) {
            cnt = new Counter();
            cnt.inc(); //always start at 1 for incoming
            incounter.put(mbr,cnt);
        }
        return cnt;
    
public intgetMaxQueue()

        return maxQueue;
    
protected org.apache.catalina.tribes.group.interceptors.OrderInterceptor$CountergetOutCounter(org.apache.catalina.tribes.Member mbr)

        Counter cnt = (Counter)outcounter.get(mbr);
        if ( cnt == null ) {
            cnt = new Counter();
            outcounter.put(mbr,cnt);
        }
        return cnt;
    
protected intincCounter(org.apache.catalina.tribes.Member mbr)

 
        Counter cnt = getOutCounter(mbr);
        return cnt.inc();
    
public voidmemberAdded(org.apache.catalina.tribes.Member member)

        //notify upwards
        super.memberAdded(member);
    
public voidmemberDisappeared(org.apache.catalina.tribes.Member member)

        //reset counters - lock free
        incounter.remove(member);
        outcounter.remove(member);
        //clear the remaining queue
        processLeftOvers(member,true);
        //notify upwards
        super.memberDisappeared(member);
    
public voidmessageReceived(org.apache.catalina.tribes.ChannelMessage msg)

        if ( !okToProcess(msg.getOptions()) ) {
            super.messageReceived(msg);
            return;
        }
        int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
        msg.getMessage().trim(4);
        MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
        try {
            inLock.writeLock().lock();
            if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
        }finally {
            inLock.writeLock().unlock();
        }
    
protected booleanprocessIncoming(org.apache.catalina.tribes.group.interceptors.OrderInterceptor$MessageOrder order)

param
order MessageOrder
return
boolean - true if a message expired and was processed

        boolean result = false;
        Member member = order.getMessage().getAddress();
        Counter cnt = getInCounter(member);
        
        MessageOrder tmp = (MessageOrder)incoming.get(member);
        if ( tmp != null ) {
            order = MessageOrder.add(tmp,order);
        }
        
        
        while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter())  ) {
            //we are right on target. process orders
            if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc();
            else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr());
            super.messageReceived(order.getMessage());
            order.setMessage(null);
            order = order.next;
        }
        MessageOrder head = order;
        MessageOrder prev = null;
        tmp = order;
        //flag to empty out the queue when it larger than maxQueue
        boolean empty = order!=null?order.getCount()>=maxQueue:false;
        while ( tmp != null ) {
            //process expired messages or empty out the queue
            if ( tmp.isExpired(expire) || empty ) {
                //reset the head
                if ( tmp == head ) head = tmp.next;
                cnt.setCounter(tmp.getMsgNr()+1);
                if ( getForwardExpired() ) 
                    super.messageReceived(tmp.getMessage());
                tmp.setMessage(null);
                tmp = tmp.next;
                if ( prev != null ) prev.next = tmp;  
                result = true;
            } else {
                prev = tmp;
                tmp = tmp.next;
            }
        }
        if ( head == null ) incoming.remove(member);
        else incoming.put(member, head);
        return result;
    
protected voidprocessLeftOvers(org.apache.catalina.tribes.Member member, boolean force)

        MessageOrder tmp = (MessageOrder)incoming.get(member);
        if ( force ) {
            Counter cnt = getInCounter(member);
            cnt.setCounter(Integer.MAX_VALUE);
        }
        if ( tmp!= null ) processIncoming(tmp);
    
public voidsendMessage(org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.group.InterceptorPayload payload)


              
        if ( !okToProcess(msg.getOptions()) ) {
            super.sendMessage(destination, msg, payload);
            return;
        }
        ChannelException cx = null;
        for (int i=0; i<destination.length; i++ ) {
            try {
                int nr = 0;
                try {
                    outLock.writeLock().lock();
                    nr = incCounter(destination[i]);
                } finally {
                    outLock.writeLock().unlock();
                }
                //reduce byte copy
                msg.getMessage().append(nr);
                try {
                    getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
                } finally {
                    msg.getMessage().trim(4);
                }
            }catch ( ChannelException x ) {
                if ( cx == null ) cx = x;
                cx.addFaultyMember(x.getFaultyMembers());
            }
        }//for
        if ( cx != null ) throw cx;
    
public voidsetExpire(long expire)

        this.expire = expire;
    
public voidsetForwardExpired(boolean forwardExpired)

        this.forwardExpired = forwardExpired;
    
public voidsetMaxQueue(int maxQueue)

        this.maxQueue = maxQueue;