Fields Summary |
---|
private static org.apache.juli.logging.Log | log |
private SingleRemoveSynchronizedAddLock | lockThis is the actual queue |
private LinkObject | firstFirst Object at queue (consumer message) |
private LinkObject | lastLast object in queue (producer Object) |
private int | sizeCurrent Queue elements size |
private boolean | checkLockcheck lock to detect strange threadings things |
private boolean | timeWaitprotocol the thread wait times |
private boolean | inAdd |
private boolean | inRemove |
private boolean | inMutex |
private int | maxQueueLengthlimit the queue legnth ( default is unlimited) |
private long | addWaitTimeoutaddWaitTimeout for producer |
private long | removeWaitTimeoutremoveWaitTimeout for consumer |
private boolean | enabledenabled the queue |
private int | maxSizemax queue size |
private int | sampleIntervalavg size sample interval |
Methods Summary |
---|
public boolean | add(org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.group.InterceptorPayload payload)Add new data to the queue
boolean ok = true;
long time = 0;
if (!enabled) {
if (log.isInfoEnabled())
log.info("FastQueue.add: queue disabled, add aborted");
return false;
}
if (timeWait) {
time = System.currentTimeMillis();
}
lock.lockAdd();
try {
if (log.isTraceEnabled()) {
log.trace("FastQueue.add: starting with size " + size);
}
if (checkLock) {
if (inAdd)
log.warn("FastQueue.add: Detected other add");
inAdd = true;
if (inMutex)
log.warn("FastQueue.add: Detected other mutex in add");
inMutex = true;
}
if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
ok = false;
if (log.isTraceEnabled()) {
log.trace("FastQueue.add: Could not add, since queue is full (" + size + ">=" + maxQueueLength + ")");
}
} else {
LinkObject element = new LinkObject(msg,destination, payload);
if (size == 0) {
first = last = element;
size = 1;
} else {
if (last == null) {
ok = false;
log.error("FastQueue.add: Could not add, since last is null although size is "+ size + " (>0)");
} else {
last.append(element);
last = element;
size++;
}
}
}
if (first == null) {
log.error("FastQueue.add: first is null, size is " + size + " at end of add");
}
if (last == null) {
log.error("FastQueue.add: last is null, size is " + size+ " at end of add");
}
if (checkLock) {
if (!inMutex) log.warn("FastQueue.add: Cancelled by other mutex in add");
inMutex = false;
if (!inAdd) log.warn("FastQueue.add: Cancelled by other add");
inAdd = false;
}
if (log.isTraceEnabled()) log.trace("FastQueue.add: add ending with size " + size);
} finally {
lock.unlockAdd(true);
}
return ok;
|
public long | getAddWaitTimeout()get current add wait timeout
addWaitTimeout = lock.getAddWaitTimeout();
return addWaitTimeout;
|
public SingleRemoveSynchronizedAddLock | getLock()
return lock;
|
public int | getMaxQueueLength()get Max Queue length
return maxQueueLength;
|
public int | getMaxSize()
return maxSize;
|
public long | getRemoveWaitTimeout()get current remove wait timeout
removeWaitTimeout = lock.getRemoveWaitTimeout();
return removeWaitTimeout;
|
public int | getSize()
return size;
|
public boolean | isCheckLock()
return checkLock;
|
public boolean | isEnabled()
return enabled;
|
public LinkObject | remove()remove the complete queued object list
LinkObject element;
boolean gotLock;
long time = 0;
if (!enabled) {
if (log.isInfoEnabled())
log.info("FastQueue.remove: queue disabled, remove aborted");
return null;
}
if (timeWait) {
time = System.currentTimeMillis();
}
gotLock = lock.lockRemove();
try {
if (!gotLock) {
if (enabled) {
if (log.isInfoEnabled())
log.info("FastQueue.remove: Remove aborted although queue enabled");
} else {
if (log.isInfoEnabled())
log.info("FastQueue.remove: queue disabled, remove aborted");
}
return null;
}
if (log.isTraceEnabled()) {
log.trace("FastQueue.remove: remove starting with size " + size);
}
if (checkLock) {
if (inRemove)
log.warn("FastQueue.remove: Detected other remove");
inRemove = true;
if (inMutex)
log.warn("FastQueue.remove: Detected other mutex in remove");
inMutex = true;
}
element = first;
first = last = null;
size = 0;
if (checkLock) {
if (!inMutex)
log.warn("FastQueue.remove: Cancelled by other mutex in remove");
inMutex = false;
if (!inRemove)
log.warn("FastQueue.remove: Cancelled by other remove");
inRemove = false;
}
if (log.isTraceEnabled()) {
log.trace("FastQueue.remove: remove ending with size " + size);
}
if (timeWait) {
time = System.currentTimeMillis();
}
} finally {
lock.unlockRemove();
}
return element;
|
public void | setAddWaitTimeout(long timeout)Set add wait timeout (default 10000 msec)
addWaitTimeout = timeout;
lock.setAddWaitTimeout(addWaitTimeout);
|
public void | setCheckLock(boolean checkLock)
this.checkLock = checkLock;
|
public void | setEnabled(boolean enable)
enabled = enable;
if (!enabled) {
lock.abortRemove();
last = first = null;
}
|
public void | setMaxQueueLength(int length)
maxQueueLength = length;
|
public void | setMaxSize(int size)
maxSize = size;
|
public void | setRemoveWaitTimeout(long timeout)set remove wait timeout ( default 30000 msec)
removeWaitTimeout = timeout;
lock.setRemoveWaitTimeout(removeWaitTimeout);
|
public void | start()start queuing
setEnabled(true);
|
public void | stop()start queuing
setEnabled(false);
|
public void | unlockAdd()unlock queue for next add
lock.unlockAdd(size > 0 ? true : false);
|
public void | unlockRemove()unlock queue for next remove
lock.unlockRemove();
|