Methods Summary |
---|
public synchronized void | add(java.lang.Object item)
long addWaitTime = 0;
while (_suspended || _tail == -1) {
long waitStart = System.currentTimeMillis();
try {
wait();
} catch (InterruptedException e) {}
addWaitTime += (System.currentTimeMillis() - waitStart);
}
_addWaitTime += addWaitTime;
// add the item to the queue's backing store
_backingStore[_tail] = item;
_queueWatcher.reset();
_queueEmpty = false;
_tail++;
if (_tail >= _backingStore.length)
_tail = 0; // wrap
if (_tail == _head) {
_tail = -1; // special case, we're full
}
_queueFull = (_tail == -1) ? true : false;
// if (_queueFull)
// trace("Queue is now full.");
notifyAll();
|
public synchronized void | addConsumer()
String consumerName = Thread.currentThread().getName();
trace("Queue.addConsumer(): INFO: " + "Adding consumer \'" + consumerName
+ "\'.");
_consumers.add(consumerName);
|
public void | addNotificationListener(javax.management.NotificationListener listener, javax.management.NotificationFilter filter, java.lang.Object handback)Adds a listener to a registered MBean.
//
// Implement NotificationBroadcaster interface via containment
//
_broadcaster.addNotificationListener(listener, filter, handback);
|
public synchronized void | addSupplier()
String supplierName = Thread.currentThread().getName();
trace("Queue.addSupplier(): INFO: " + "Adding supplier \'" + supplierName
+ "\'.");
_suppliers.add(supplierName);
|
public long | getAddWaitTime()
return _addWaitTime;
|
public java.lang.Object[] | getBackingStore()
return _backingStore;
|
public javax.management.MBeanNotificationInfo[] | getNotificationInfo()Returns an MBeanNotificationInfo object contaning the name of the Java
class of the notification and the notification types sent.
return _broadcaster.getNotificationInfo();
|
public int | getNumberOfConsumers()
return _consumers.size();
|
public long | getNumberOfItemsProcessed()
return _numberOfItemsProcessed;
|
public int | getNumberOfSuppliers()
return _suppliers.size();
|
public int | getQueueSize()
return _backingStore.length;
|
public long | getRemoveWaitTime()
return _removeWaitTime;
|
public boolean | isEndOfInput()
return _endOfInput;
|
public boolean | isQueueEmpty()
// trace("Queue.isQueueEmpty(): INFO: returning " + _queueEmpty);
return _queueEmpty;
|
public boolean | isQueueFull()
// trace("Queue.isQueueFull(): INFO: returning " + _queueFull);
return _queueFull;
|
public boolean | isSuspended()
return _suspended;
|
public void | postDeregister()
trace("Queue.postDeregister(): INFO: Invocation complete");
|
public void | postRegister(java.lang.Boolean registrationDone)
trace("Queue.postRegister(): INFO: Invocation complete");
|
public void | preDeregister()
trace("Queue.preDeregister(): INFO: Invocation complete");
|
public javax.management.ObjectName | preRegister(javax.management.MBeanServer server, javax.management.ObjectName name)
ObjectName ret = name;
if (name == null)
ret = new ObjectName(server.getDefaultDomain() + ":" + "name=Queue,selfRegister=true");
trace("Queue.preRegister(): INFO: Invocation complete");
return ret;
|
public synchronized java.lang.Object | remove()
Object ret = null;
long removeWaitTime = 0;
if (!(_queueEmpty && _endOfInput)) {
while (_suspended || _queueEmpty) {
long waitStart = System.currentTimeMillis();
try {
wait();
} catch (InterruptedException e) {}
if (_endOfInput) {
break;
}
removeWaitTime += System.currentTimeMillis() - waitStart;
}
if (!_queueEmpty) {
_removeWaitTime += removeWaitTime;
ret = _backingStore[_head];
_queueWatcher.reset();
_queueFull = false;
_numberOfItemsProcessed++;
_backingStore[_head] = null;
if (_tail == -1)
_tail = _head; // point tail to new free spot
_head++;
if (_head >= _backingStore.length)
_head = 0; // wrap head pointer
_queueEmpty = (_head == _tail) ? true : false;
// if (_queueEmpty)
// trace("Queue is now empty.");
}
}
notifyAll();
return ret;
|
public synchronized void | removeConsumer()
String consumer = Thread.currentThread().getName();
trace("Queue.removeConsumer(): INFO: " + "Looking for consumer \'" +
consumer + "\' to remove it.");
for (int aa = 0; aa < _consumers.size(); aa++) {
String s = (String)_consumers.get(aa);
if (consumer.equals(s)) {
trace("Queue.removeConsumer(): INFO: " + "Removing consumer \'"
+ s + "\'.");
_consumers.remove(aa);
break;
}
}
notifyAll();
|
public void | removeNotificationListener(javax.management.NotificationListener listener)Removes a listener from a registered MBean.
_broadcaster.removeNotificationListener(listener);
|
public synchronized void | removeSupplier()
String supplier = Thread.currentThread().getName();
trace("Queue.removeSupplier(): INFO: " + "Looking for supplier \'" +
supplier + "\' to remove it.");
for (int aa = 0; aa < _suppliers.size(); aa++) {
String s = (String)_suppliers.get(aa);
if (supplier.equals(s)) {
trace("Queue.removeSupplier(): INFO: " + "Removing supplier \'"
+ s + "\'.");
_suppliers.remove(aa);
break;
}
}
if (_suppliers.size() == 0)
_endOfInput = true;
notifyAll();
|
public void | reset()
setNumberOfItemsProcessed(0);
setAddWaitTime(0);
setRemoveWaitTime(0);
setNumberOfResets(getNumberOfResets() + 1);
|
public synchronized void | resume()
_suspended = false;
notifyAll();
|
public void | setAddWaitTime(long value)
_addWaitTime = value;
|
public void | setNumberOfItemsProcessed(long value)
_numberOfItemsProcessed = value;
|
public synchronized void | setQueueSize(int queueSize)
// Can't set queue size on a suspended queue
if (!_suspended) {
// Only allow the queue to grow, not shrink...
if (queueSize > _backingStore.length) {
// allocate new array
Object[] newStore = new Object[queueSize];
System.arraycopy(_backingStore, 0, newStore, 0, _backingStore.length);
}
}
notifyAll();
|
public void | setRemoveWaitTime(long value)
_removeWaitTime = value;
|
public synchronized void | suspend()
_suspended = true;
|
private void | trace(java.lang.String message)
if (isTraceOn()) {
System.out.println(Thread.currentThread().getName() + ":" + message);
}
|