FileDocCategorySizeDatePackage
FragmentationInterceptor.javaAPI DocApache Tomcat 6.0.148958Fri Jul 20 04:20:36 BST 2007org.apache.catalina.tribes.group.interceptors

FragmentationInterceptor

public class FragmentationInterceptor extends org.apache.catalina.tribes.group.ChannelInterceptorBase
The fragmentation interceptor splits up large messages into smaller messages and assembles them on the other end. This is very useful when you don't want large messages hogging the sending sockets and smaller messages can make it through.
Configuration Options
OrderInteceptor.expire= - how long do we keep the fragments in memory and wait for the rest to arrivedefault=60,000ms -> 60seconds This setting is useful to avoid OutOfMemoryErrors
OrderInteceptor.maxSize= - message size in bytes default=1024*100 (around a tenth of a MB)
author
Filip Hanik
version
1.0

Fields Summary
private static org.apache.juli.logging.Log
log
protected HashMap
fragpieces
private int
maxSize
private long
expire
protected boolean
deepclone
Constructors Summary
Methods Summary
public voiddefrag(org.apache.catalina.tribes.ChannelMessage msg)

 
        FragKey key = new FragKey(msg.getUniqueId());
        FragCollection coll = getFragCollection(key,msg);
        coll.addMessage((ChannelMessage)msg.deepclone());

        if ( coll.complete() ) {
            removeFragCollection(key);
            ChannelMessage complete = coll.assemble();
            super.messageReceived(complete);
            
        }
    
public voidfrag(org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.group.InterceptorPayload payload)

        int size = msg.getMessage().getLength();

        int count = ((size / maxSize )+(size%maxSize==0?0:1));
        ChannelMessage[] messages = new ChannelMessage[count];
        int remaining = size;
        for ( int i=0; i<count; i++ ) {
            ChannelMessage tmp = (ChannelMessage)msg.clone();
            int offset = (i*maxSize);
            int length = Math.min(remaining,maxSize);
            tmp.getMessage().clear();
            tmp.getMessage().append(msg.getMessage().getBytesDirect(),offset,length);
            //add the msg nr
            //tmp.getMessage().append(XByteBuffer.toBytes(i),0,4);
            tmp.getMessage().append(i);
            //add the total nr of messages
            //tmp.getMessage().append(XByteBuffer.toBytes(count),0,4);
            tmp.getMessage().append(count);
            //add true as the frag flag
            //byte[] flag = XByteBuffer.toBytes(true);
            //tmp.getMessage().append(flag,0,flag.length);
            tmp.getMessage().append(true);
            messages[i] = tmp;
            remaining -= length;
            
        }
        for ( int i=0; i<messages.length; i++ ) {
            super.sendMessage(destination,messages[i],payload);
        }
    
public longgetExpire()

        return expire;
    
public org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor$FragCollectiongetFragCollection(org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor$FragKey key, org.apache.catalina.tribes.ChannelMessage msg)

        FragCollection coll = (FragCollection)fragpieces.get(key);
        if ( coll == null ) {
            synchronized (fragpieces) {
                coll = (FragCollection)fragpieces.get(key);
                if ( coll == null ) {
                    coll = new FragCollection(msg);
                    fragpieces.put(key, coll);
                }
            }
        } 
        return coll;
    
public intgetMaxSize()

        return maxSize;
    
public voidheartbeat()

        try {
            Set set = fragpieces.keySet(); 
            Object[] keys = set.toArray();
            for ( int i=0; i<keys.length; i++ ) {
                FragKey key = (FragKey)keys[i];
                if ( key != null && key.expired(getExpire()) ) 
                    removeFragCollection(key);
            }
        }catch ( Exception x ) {
            if ( log.isErrorEnabled() ) {
                log.error("Unable to perform heartbeat clean up in the frag interceptor",x);
            }
        }
        super.heartbeat();
    
public voidmessageReceived(org.apache.catalina.tribes.ChannelMessage msg)

        boolean isFrag = XByteBuffer.toBoolean(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-1);
        msg.getMessage().trim(1);
        if ( isFrag ) {
            defrag(msg);
        } else {
            super.messageReceived(msg);
        }
    
public voidremoveFragCollection(org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor$FragKey key)

        fragpieces.remove(key);
    
public voidsendMessage(org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.group.InterceptorPayload payload)



              
        int size = msg.getMessage().getLength();
        boolean frag = (size>maxSize) && okToProcess(msg.getOptions());
        if ( frag ) {
            frag(destination, msg, payload);
        } else {
            msg.getMessage().append(frag);
            super.sendMessage(destination, msg, payload);
        }
    
public voidsetExpire(long expire)

        this.expire = expire;
    
public voidsetMaxSize(int maxSize)

        this.maxSize = maxSize;