Methods Summary |
---|
public synchronized void | add(java.lang.Object item)
long addWaitTime = 0;
while (_suspended || _tail == -1) {
long waitStart = System.currentTimeMillis();
try {
wait();
} catch (InterruptedException e) {}
addWaitTime += (System.currentTimeMillis() - waitStart);
}
_addWaitTime += addWaitTime;
// add the item to the queue's backing store
_backingStore[_tail] = item;
_queueEmpty = false;
_tail++;
if (_tail >= _backingStore.length)
_tail = 0; // wrap
if (_tail == _head) {
_tail = -1; // special case, we're full
}
_queueFull = (_tail == -1) ? true : false;
notifyAll();
|
public synchronized void | addConsumer()
String consumerName = Thread.currentThread().getName();
if (isTraceOn())
System.out.println("Queue.addConsumer(): INFO: " + "Adding consumer \'"
+ consumerName + "\'.");
_consumers.add(consumerName);
|
public synchronized void | addSupplier()
String supplierName = Thread.currentThread().getName();
if (isTraceOn())
System.out.println("Queue.addSupplier(): INFO: " + "Adding supplier \'"
+ supplierName + "\'.");
_suppliers.add(supplierName);
|
public long | getAddWaitTime()
return _addWaitTime;
|
public java.lang.Object[] | getBackingStore()
return _backingStore;
|
public int | getNumberOfConsumers()
return _consumers.size();
|
public long | getNumberOfItemsProcessed()
return _numberOfItemsProcessed;
|
public int | getNumberOfSuppliers()
return _suppliers.size();
|
public int | getQueueSize()
return _backingStore.length;
|
public long | getRemoveWaitTime()
return _removeWaitTime;
|
public boolean | isEndOfInput()
return _endOfInput;
|
public boolean | isQueueEmpty()
return _queueEmpty;
|
public boolean | isQueueFull()
return _queueFull;
|
public boolean | isSuspended()
return _suspended;
|
public synchronized java.lang.Object | remove()
return (remove(WAIT_FOREVER));
|
public synchronized java.lang.Object | remove(long timeout)
boolean timedOut = false;
Object ret = null;
long removeWaitTime = 0;
if (!(_queueEmpty && _endOfInput)) {
while (_suspended || _queueEmpty) {
long waitStart = System.currentTimeMillis();
try {
if (timeout == WAIT_FOREVER)
wait();
else
wait(timeout/1000);
} catch (InterruptedException e) {}
if (_endOfInput) {
break;
}
if (timeout != WAIT_FOREVER && (System.currentTimeMillis() -
waitStart) > timeout) {
timedOut = true;
break;
}
removeWaitTime += System.currentTimeMillis() - waitStart;
}
if (!timedOut && !_queueEmpty) {
_removeWaitTime += removeWaitTime;
ret = _backingStore[_head];
_queueFull = false;
_numberOfItemsProcessed++;
_backingStore[_head] = null;
if (_tail == -1)
_tail = _head; // point tail to new free spot
_head++;
if (_head >= _backingStore.length)
_head = 0; // wrap head pointer
_queueEmpty = (_head == _tail) ? true : false;
}
}
notifyAll();
return ret;
|
public synchronized void | removeConsumer()
String consumer = Thread.currentThread().getName();
if (isTraceOn())
System.out.println("Queue.removeConsumer(): INFO: " + "Looking for consumer \'"
+ consumer + "\' to remove it.");
for (int aa = 0; aa < _consumers.size(); aa++) {
String s = (String)_consumers.get(aa);
if (consumer.equals(s)) {
if (isTraceOn())
System.out.println("Queue.removeConsumer(): INFO: " + "Removing consumer \'"
+ s + "\'.");
_consumers.remove(aa);
break;
}
}
notifyAll();
|
public synchronized void | removeSupplier()
String supplier = Thread.currentThread().getName();
if (isTraceOn())
System.out.println("Queue.removeSupplier(): INFO: " + "Looking for supplier \'"
+ supplier + "\' to remove it.");
for (int aa = 0; aa < _suppliers.size(); aa++) {
String s = (String)_suppliers.get(aa);
if (supplier.equals(s)) {
if (isTraceOn())
System.out.println("Queue.removeSupplier(): INFO: " + "Removing supplier \'"
+ s + "\'.");
_suppliers.remove(aa);
break;
}
}
if (_suppliers.size() == 0)
_endOfInput = true;
notifyAll();
|
public void | reset()
setNumberOfItemsProcessed(0);
setAddWaitTime(0);
setRemoveWaitTime(0);
setNumberOfResets(getNumberOfResets() + 1);
|
public synchronized void | resume()
_suspended = false;
notifyAll();
|
public void | setAddWaitTime(long value)
_addWaitTime = value;
|
public void | setNumberOfItemsProcessed(long value)
_numberOfItemsProcessed = value;
|
public synchronized void | setQueueSize(int queueSize)
// Can't set queue size on a suspended queue
if (!_suspended) {
// Only allow the queue to grow, not shrink...
if (queueSize > _backingStore.length) {
// allocate new array
Object[] newStore = new Object[queueSize];
System.arraycopy(_backingStore, 0, newStore, 0, _backingStore.length);
}
}
notifyAll();
|
public void | setRemoveWaitTime(long value)
_removeWaitTime = value;
|
public synchronized void | suspend()
_suspended = true;
|