FileDocCategorySizeDatePackage
Queue.javaAPI DocExample17072Thu May 23 09:32:50 BST 2002 sample.dynamic

Queue

public class Queue extends Basic implements DynamicMBean
A very simple queue. Implemented as a dynamic MBean.

Fields Summary
private static final int
DEFAULT_QUEUE_SIZE
private static final int
WAIT_FOREVER
private int
_head
private int
_tail
private Object[]
_backingStore
private boolean
_suspended
private boolean
_queueFull
private boolean
_queueEmpty
private ArrayList
_suppliers
private ArrayList
_consumers
private boolean
_endOfInput
private long
_addWaitTime
private long
_removeWaitTime
private long
_numberOfItemsProcessed
private MBeanInfo
_MBeanInfo
private String[]
_stateAsStringArray
Constructors Summary
public Queue()


       
        this(DEFAULT_QUEUE_SIZE);
    
public Queue(int queueSize)

        super();
        // Create the backing store. If the specified queue size is
        /// bogus, use the default queue size
        _backingStore = new Object[(queueSize > 0) ? queueSize : DEFAULT_QUEUE_SIZE];
        _queueEmpty = true;
        exposeManagementInterface();
    
Methods Summary
public synchronized voidadd(java.lang.Object item)

        long addWaitTime = 0;
        while (_suspended || _tail == -1) {
            long waitStart = System.currentTimeMillis();
            try {
                wait();
            } catch (InterruptedException e) {}
            addWaitTime += (System.currentTimeMillis() - waitStart);
        }
        _addWaitTime += addWaitTime;
        // add the item to the queue's backing store
        _backingStore[_tail] = item;
        _queueEmpty = false;
        _tail++;
        if (_tail >= _backingStore.length)
            _tail = 0;          // wrap
        if (_tail == _head) {
            _tail = -1;         // special case, we're full
        }
        _queueFull = (_tail == -1) ? true : false;
        notifyAll();
    
public synchronized voidaddConsumer()

        String consumerName = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.addConsumer(): INFO: " + "Adding consumer \'"
                    + consumerName + "\'.");
        _consumers.add(consumerName);
    
public synchronized voidaddSupplier()

        String supplierName = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.addSupplier(): INFO: " + "Adding supplier \'"
                    + supplierName + "\'.");
        _suppliers.add(supplierName);
    
private voidexposeManagementInterface()


        
        MBeanInfo parentInfo = super.getMBeanInfo();
        //
        // Attributes
        //
        MBeanAttributeInfo[] parentAttributes = parentInfo.getAttributes();
        int numberOfParentAttributes = parentAttributes.length;
        MBeanAttributeInfo[] attributeInfo = new MBeanAttributeInfo[numberOfParentAttributes
                + 10];
        System.arraycopy(parentAttributes, 0, attributeInfo, 0, numberOfParentAttributes);
        attributeInfo[numberOfParentAttributes + 0] = new MBeanAttributeInfo("QueueSize", 
                Integer.TYPE.getName(), "Maximum number of items the queue may contain at one time.", 
                true, true, false);
        attributeInfo[numberOfParentAttributes + 1] = new MBeanAttributeInfo("NumberOfConsumers", 
                Integer.TYPE.getName(), "The number of consumers pulling from this Queue.", 
                true, false, false);
        attributeInfo[numberOfParentAttributes + 2] = new MBeanAttributeInfo("NumberOfSuppliers", 
                Integer.TYPE.getName(), "The number of suppliers supplying to this Queue.", 
                true, false, false);
        attributeInfo[numberOfParentAttributes + 3] = new MBeanAttributeInfo("QueueFull", 
                Boolean.TYPE.getName(), "Indicates whether or not the Queue is full.", 
                true, false, true);
        attributeInfo[numberOfParentAttributes + 4] = new MBeanAttributeInfo("QueueEmpty", 
                Boolean.TYPE.getName(), "Indicates whether or not the Queue is empty.", 
                true, false, true);
        attributeInfo[numberOfParentAttributes + 5] = new MBeanAttributeInfo("Suspended", 
                Boolean.TYPE.getName(), "Indicates whether or not the Queue is currently suspended.", 
                true, false, true);
        attributeInfo[numberOfParentAttributes + 6] = new MBeanAttributeInfo("EndOfInput", 
                Boolean.TYPE.getName(), "Indicates whether or not the end-of-input has been signalled by all suppliers.", 
                true, false, true);
        attributeInfo[numberOfParentAttributes + 7] = new MBeanAttributeInfo("NumberOfItemsProcessed", 
                Long.TYPE.getName(), "The number of items that have been removed from the Queue.", 
                true, false, false);
        attributeInfo[numberOfParentAttributes + 8] = new MBeanAttributeInfo("AddWaitTime", 
                Long.TYPE.getName(), "The number of milliseconds spent waiting to add because the Queue was full.", 
                true, false, false);
        attributeInfo[numberOfParentAttributes + 9] = new MBeanAttributeInfo("RemoveWaitTime", 
                Long.TYPE.getName(), "The number of milliseconds spent waiting to remove because the Queue was empty.", 
                true, false, false);
        //
        // Constructors
        //
        Class[] signature =  {
            Integer.TYPE
        };
        Constructor constructor = null;
        MBeanConstructorInfo[] constructorInfo = new MBeanConstructorInfo[1];
        try {
            constructor = this.getClass().getConstructor(signature);
            constructorInfo[0] = new MBeanConstructorInfo("Custom constructor", 
                    constructor);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //
        // Operations
        //
        MBeanOperationInfo[] operationInfo = new MBeanOperationInfo[2];
        MBeanParameterInfo[] parms = new MBeanParameterInfo[0];
        operationInfo[0] = new MBeanOperationInfo("suspend", "Suspends processing of the Queue.", 
                parms, Void.TYPE.getName(), MBeanOperationInfo.ACTION);
        operationInfo[1] = new MBeanOperationInfo("resume", "Resumes processing of the Queue.", 
                parms, Void.TYPE.getName(), MBeanOperationInfo.ACTION);
        //
        // Notifications
        //
        MBeanNotificationInfo[] notificationInfo = new MBeanNotificationInfo[0];
        //
        // MBeanInfo
        //
        _MBeanInfo = new MBeanInfo("Queue", "Queue MBean", attributeInfo, constructorInfo, 
                operationInfo, notificationInfo);
    
public longgetAddWaitTime()

        return  _addWaitTime;
    
public java.lang.ObjectgetAttribute(java.lang.String attributeName)

        Object ret = null;
        // Nope, not on parent class, must be on this one
        if (attributeName.equals("QueueSize")) {
            ret = new Integer(getQueueSize());
        } 
        else if (attributeName.equals("NumberOfSuppliers")) {
            ret = new Integer(getNumberOfSuppliers());
        } 
        else if (attributeName.equals("NumberOfConsumers")) {
            ret = new Integer(getNumberOfConsumers());
        } 
        else if (attributeName.equals("QueueFull")) {
            ret = new Boolean(isQueueFull());
        } 
        else if (attributeName.equals("QueueEmpty")) {
            ret = new Boolean(isQueueEmpty());
        } 
        else if (attributeName.equals("Suspended")) {
            ret = new Boolean(isSuspended());
        } 
        else if (attributeName.equals("EndOfInput")) {
            ret = new Boolean(isEndOfInput());
        } 
        else if (attributeName.equals("NumberOfItemsProcessed")) {
            ret = new Long(getNumberOfItemsProcessed());
        } 
        else if (attributeName.equals("AddWaitTime")) {
            ret = new Long(getAddWaitTime());
        } 
        else if (attributeName.equals("RemoveWaitTime")) {
            ret = new Long(getRemoveWaitTime());
        } 
        else {
            ret = super.getAttribute(attributeName);
        }
        return  ret;
    
public javax.management.AttributeListgetAttributes(java.lang.String[] attributeNames)
put your documentation comment here

param
attributeNames
return

        //
        // Make sure the attribute names String array is not null, or all of
        /// the .equals() calls will toss a NullPointerException
        //
        AttributeList resultList = new AttributeList();
        for (int aa = 0; aa < attributeNames.length; aa++) {
            try {
                Object value = getAttribute((String)attributeNames[aa]);
                resultList.add(new Attribute(attributeNames[aa], value));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return  (resultList);
    
public java.lang.Object[]getBackingStore()

        return  _backingStore;
    
public javax.management.MBeanInfogetMBeanInfo()
put your documentation comment here

return

        return  (_MBeanInfo);
    
public intgetNumberOfConsumers()

        return  _consumers.size();
    
public longgetNumberOfItemsProcessed()

        return  _numberOfItemsProcessed;
    
public intgetNumberOfSuppliers()

        return  _suppliers.size();
    
public intgetQueueSize()

        return  _backingStore.length;
    
public longgetRemoveWaitTime()

        return  _removeWaitTime;
    
public java.lang.Objectinvoke(java.lang.String operationName, java.lang.Object[] params, java.lang.String[] signature)
put your documentation comment here

param
operationName
param
params[]
param
signature[]
return
exception
MBeanException, ReflectionException

        Object ret = Void.TYPE;
        //
        // Make sure the operation name String is not null, or all of
        /// the .equals() calls will toss a NullPointerException
        //
        ret = Void.TYPE;
        if (operationName.equals("suspend")) {
            suspend();
        } 
        else if (operationName.equals("resume")) {
            resume();
        } 
        else {
            super.invoke(operationName, params, signature);
        }
        return  ret;
    
public booleanisEndOfInput()

        return  _endOfInput;
    
public booleanisQueueEmpty()

        return  _queueEmpty;
    
public booleanisQueueFull()

        return  _queueFull;
    
public booleanisSuspended()

        return  _suspended;
    
public synchronized java.lang.Objectremove()

        return  (remove(WAIT_FOREVER));
    
public synchronized java.lang.Objectremove(long timeout)

        boolean timedOut = false;
        Object ret = null;
        long removeWaitTime = 0;
        if (!_endOfInput) {
            while (_suspended || _tail == _head) {
                long waitStart = System.currentTimeMillis();
                try {
                    if (timeout == WAIT_FOREVER)
                        wait(); 
                    else 
                        wait(timeout/1000);
                } catch (InterruptedException e) {}
                if (_endOfInput)
                    break;
                if (timeout != -1 && (System.currentTimeMillis() - waitStart) > timeout) {
                    timedOut = true;
                    break;
                }
                removeWaitTime += System.currentTimeMillis() - waitStart;
            }
            if (!timedOut) {
                _removeWaitTime += removeWaitTime;
                ret = _backingStore[_head];
                _queueFull = false;
                _numberOfItemsProcessed++;
                _backingStore[_head] = null;
                if (_tail == -1)
                    _tail = _head;              // point tail to new free spot
                _head++;
                if (_head >= _backingStore.length)
                    _head = 0;                  // wrap head pointer
                _queueEmpty = (_head == _tail) ? true : false;
            }
        }
        notifyAll();
        return  ret;
    
public synchronized voidremoveConsumer()

        String consumer = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.removeConsumer(): INFO: " + "Looking for consumer \'"
                    + consumer + "\' to remove it.");
        for (int aa = 0; aa < _consumers.size(); aa++) {
            String s = (String)_consumers.get(aa);
            if (consumer.equals(s)) {
                if (isTraceOn())
                    System.out.println("Queue.removeConsumer(): INFO: " + "Removing consumer \'"
                            + s + "\'.");
                _consumers.remove(aa);
                notifyAll();
                break;
            }
        }
        if (_consumers.size() == 0)
            _endOfInput = true;
    
public synchronized voidremoveSupplier()

        String supplier = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.removeSupplier(): INFO: " + "Looking for supplier \'"
                    + supplier + "\' to remove it.");
        for (int aa = 0; aa < _suppliers.size(); aa++) {
            String s = (String)_suppliers.get(aa);
            if (supplier.equals(s)) {
                if (isTraceOn())
                    System.out.println("Queue.removeSupplier(): INFO: " + "Removing supplier \'"
                            + s + "\'.");
                _suppliers.remove(aa);
                notifyAll();
                break;
            }
        }
        if (_suppliers.size() == 0)
            _endOfInput = true;
    
public voidreset()

        setNumberOfItemsProcessed(0);
        setAddWaitTime(0);
        setRemoveWaitTime(0);
        setNumberOfResets(getNumberOfResets() + 1);
    
public synchronized voidresume()

        _suspended = false;
        notifyAll();
    
public voidsetAddWaitTime(long value)

        _addWaitTime = value;
    
public voidsetAttribute(javax.management.Attribute attribute)
put your documentation comment here

param
attribute
exception
AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException

        String name = attribute.getName();
        Object value = attribute.getValue();
        if (name.equals("QueueSize")) {
            setQueueSize(((Integer)value).intValue());
        }
        // parent will throw exception if attribute not found
        super.setAttribute(attribute);
    
public javax.management.AttributeListsetAttributes(javax.management.AttributeList attributes)
put your documentation comment here

param
attributes
return

        for (int aa = 0; aa < attributes.size(); aa++) {
            try {
                setAttribute((Attribute)attributes.get(aa));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        String[] attributeNames = new String[attributes.size()];
        for (int aa = 0; aa < attributeNames.length; aa++) {
            attributeNames[aa] = ((Attribute)attributes.get(aa)).getName();
        }
        return  getAttributes(attributeNames);
    
public voidsetNumberOfItemsProcessed(long value)

        _numberOfItemsProcessed = value;
    
public synchronized voidsetQueueSize(int queueSize)

        // Can't set queue size on a suspended queue
        if (!_suspended) {
            // Only allow the queue to grow, not shrink...
            if (queueSize > _backingStore.length) {
                // allocate new array
                Object[] newStore = new Object[queueSize];
                System.arraycopy(_backingStore, 0, newStore, 0, _backingStore.length);
            }
        }
        notifyAll();
    
public voidsetRemoveWaitTime(long value)

        _removeWaitTime = value;
    
public synchronized voidsuspend()

        _suspended = true;