FileDocCategorySizeDatePackage
FastQueue.javaAPI DocApache Tomcat 6.0.1410853Fri Jul 20 04:20:36 BST 2007org.apache.catalina.tribes.transport.bio.util

FastQueue

public class FastQueue extends Object
A fast queue that remover thread lock the adder thread.
Limit the queue length when you have strange producer thread problemes. FIXME add i18n support to log messages
author
Rainer Jung
author
Peter Rossbach
version
$Revision: 500684 $ $Date: 2007-01-28 00:27:18 +0100 (dim., 28 janv. 2007) $

Fields Summary
private static org.apache.juli.logging.Log
log
private SingleRemoveSynchronizedAddLock
lock
This is the actual queue
private LinkObject
first
First Object at queue (consumer message)
private LinkObject
last
Last object in queue (producer Object)
private int
size
Current Queue elements size
private boolean
checkLock
check lock to detect strange threadings things
private boolean
timeWait
protocol the thread wait times
private boolean
inAdd
private boolean
inRemove
private boolean
inMutex
private int
maxQueueLength
limit the queue legnth ( default is unlimited)
private long
addWaitTimeout
addWaitTimeout for producer
private long
removeWaitTimeout
removeWaitTimeout for consumer
private boolean
enabled
enabled the queue
private int
maxSize
max queue size
private int
sampleInterval
avg size sample interval
Constructors Summary
public FastQueue()
Generate Queue SingleRemoveSynchronizedAddLock and set add and wait Timeouts


                  
      
        lock = new SingleRemoveSynchronizedAddLock();
        lock.setAddWaitTimeout(addWaitTimeout);
        lock.setRemoveWaitTimeout(removeWaitTimeout);
    
Methods Summary
public booleanadd(org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.group.InterceptorPayload payload)
Add new data to the queue

see
org.apache.catalina.tribes.util.IQueue#add(java.lang.String, java.lang.Object) FIXME extract some method

        boolean ok = true;
        long time = 0;

        if (!enabled) {
            if (log.isInfoEnabled())
                log.info("FastQueue.add: queue disabled, add aborted");
            return false;
        }

        if (timeWait) {
            time = System.currentTimeMillis();
        }
        lock.lockAdd();
        try {
            if (log.isTraceEnabled()) {
                log.trace("FastQueue.add: starting with size " + size);
            }
            if (checkLock) {
                if (inAdd)
                    log.warn("FastQueue.add: Detected other add");
                inAdd = true;
                if (inMutex)
                    log.warn("FastQueue.add: Detected other mutex in add");
                inMutex = true;
            }

            if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
                ok = false;
                if (log.isTraceEnabled()) {
                    log.trace("FastQueue.add: Could not add, since queue is full (" + size + ">=" + maxQueueLength + ")");
                }
            } else {
                LinkObject element = new LinkObject(msg,destination, payload);
                if (size == 0) {
                    first = last = element;
                    size = 1;
                } else {
                    if (last == null) {
                        ok = false;
                        log.error("FastQueue.add: Could not add, since last is null although size is "+ size + " (>0)");
                    } else {
                        last.append(element);
                        last = element;
                        size++;
                    }
                }
            }

            if (first == null) {
                log.error("FastQueue.add: first is null, size is " + size + " at end of add");
            }
            if (last == null) {
                log.error("FastQueue.add: last is null, size is " + size+ " at end of add");
            }

            if (checkLock) {
                if (!inMutex) log.warn("FastQueue.add: Cancelled by other mutex in add");
                inMutex = false;
                if (!inAdd) log.warn("FastQueue.add: Cancelled by other add");
                inAdd = false;
            }
            if (log.isTraceEnabled()) log.trace("FastQueue.add: add ending with size " + size);

        } finally {
            lock.unlockAdd(true);
        }
        return ok;
    
public longgetAddWaitTimeout()
get current add wait timeout

return
current wait timeout

        addWaitTimeout = lock.getAddWaitTimeout();
        return addWaitTimeout;
    
public SingleRemoveSynchronizedAddLockgetLock()

        return lock;
    
public intgetMaxQueueLength()
get Max Queue length

see
org.apache.catalina.tribes.util.IQueue#getMaxQueueLength()

        return maxQueueLength;
    
public intgetMaxSize()

return
The max size

        return maxSize;
    
public longgetRemoveWaitTimeout()
get current remove wait timeout

return
The timeout

        removeWaitTimeout = lock.getRemoveWaitTimeout();
        return removeWaitTimeout;
    
public intgetSize()

        return size;
    
public booleanisCheckLock()

return
Returns the checkLock.

        return checkLock;
    
public booleanisEnabled()

        return enabled;
    
public LinkObjectremove()
remove the complete queued object list

see
org.apache.catalina.tribes.util.IQueue#remove() FIXME extract some method

        LinkObject element;
        boolean gotLock;
        long time = 0;

        if (!enabled) {
            if (log.isInfoEnabled())
                log.info("FastQueue.remove: queue disabled, remove aborted");
            return null;
        }

        if (timeWait) {
            time = System.currentTimeMillis();
        }
        gotLock = lock.lockRemove();
        try {

            if (!gotLock) {
                if (enabled) {
                    if (log.isInfoEnabled())
                        log.info("FastQueue.remove: Remove aborted although queue enabled");
                } else {
                    if (log.isInfoEnabled())
                        log.info("FastQueue.remove: queue disabled, remove aborted");
                }
                return null;
            }

            if (log.isTraceEnabled()) {
                log.trace("FastQueue.remove: remove starting with size " + size);
            }
            if (checkLock) {
                if (inRemove)
                    log.warn("FastQueue.remove: Detected other remove");
                inRemove = true;
                if (inMutex)
                    log.warn("FastQueue.remove: Detected other mutex in remove");
                inMutex = true;
            }

            element = first;

            first = last = null;
            size = 0;

            if (checkLock) {
                if (!inMutex)
                    log.warn("FastQueue.remove: Cancelled by other mutex in remove");
                inMutex = false;
                if (!inRemove)
                    log.warn("FastQueue.remove: Cancelled by other remove");
                inRemove = false;
            }
            if (log.isTraceEnabled()) {
                log.trace("FastQueue.remove: remove ending with size " + size);
            }

            if (timeWait) {
                time = System.currentTimeMillis();
            }
        } finally {
            lock.unlockRemove();
        }
        return element;
    
public voidsetAddWaitTimeout(long timeout)
Set add wait timeout (default 10000 msec)

param
timeout

        addWaitTimeout = timeout;
        lock.setAddWaitTimeout(addWaitTimeout);
    
public voidsetCheckLock(boolean checkLock)

param
checkLock The checkLock to set.

        this.checkLock = checkLock;
    
public voidsetEnabled(boolean enable)

        enabled = enable;
        if (!enabled) {
            lock.abortRemove();
            last = first = null;
        }
    
public voidsetMaxQueueLength(int length)

        maxQueueLength = length;
    
public voidsetMaxSize(int size)

param
size

        maxSize = size;
    
public voidsetRemoveWaitTimeout(long timeout)
set remove wait timeout ( default 30000 msec)

param
timeout

        removeWaitTimeout = timeout;
        lock.setRemoveWaitTimeout(removeWaitTimeout);
    
public voidstart()
start queuing

        setEnabled(true);
    
public voidstop()
start queuing

        setEnabled(false);
    
public voidunlockAdd()
unlock queue for next add

        lock.unlockAdd(size > 0 ? true : false);
    
public voidunlockRemove()
unlock queue for next remove

        lock.unlockRemove();