FileDocCategorySizeDatePackage
Queue.javaAPI DocExample17072Thu May 23 09:32:50 BST 2002 sample.dynamic

Queue.java

package  sample.dynamic;

import  java.util.ArrayList;
import  javax.management.*;
import  java.lang.reflect.*;


/**
 * A very simple queue.
 * Implemented as a dynamic MBean.
 */
public class Queue extends Basic
        implements DynamicMBean {
    private static final int DEFAULT_QUEUE_SIZE = 10;
    private static final int WAIT_FOREVER = -1;
    // Queue guts
    private int _head;
    private int _tail;
    private Object[] _backingStore;
    private boolean _suspended;
    private boolean _queueFull;
    private boolean _queueEmpty;
    private ArrayList _suppliers = new ArrayList();
    private ArrayList _consumers = new ArrayList();
    private boolean _endOfInput;
    // Metrics
    private long _addWaitTime;
    private long _removeWaitTime;
    private long _numberOfItemsProcessed;
    private MBeanInfo _MBeanInfo;

    public Queue () {
        this(DEFAULT_QUEUE_SIZE);
    }
    public Queue (int queueSize) {
        super();
        // Create the backing store. If the specified queue size is
        /// bogus, use the default queue size
        _backingStore = new Object[(queueSize > 0) ? queueSize : DEFAULT_QUEUE_SIZE];
        _queueEmpty = true;
        exposeManagementInterface();
    }

    public synchronized void addSupplier () {
        String supplierName = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.addSupplier(): INFO: " + "Adding supplier \'"
                    + supplierName + "\'.");
        _suppliers.add(supplierName);
    }

    public synchronized void removeSupplier () {
        String supplier = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.removeSupplier(): INFO: " + "Looking for supplier \'"
                    + supplier + "\' to remove it.");
        for (int aa = 0; aa < _suppliers.size(); aa++) {
            String s = (String)_suppliers.get(aa);
            if (supplier.equals(s)) {
                if (isTraceOn())
                    System.out.println("Queue.removeSupplier(): INFO: " + "Removing supplier \'"
                            + s + "\'.");
                _suppliers.remove(aa);
                notifyAll();
                break;
            }
        }
        if (_suppliers.size() == 0)
            _endOfInput = true;
    }

    public synchronized void addConsumer () {
        String consumerName = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.addConsumer(): INFO: " + "Adding consumer \'"
                    + consumerName + "\'.");
        _consumers.add(consumerName);
    }

    public synchronized void removeConsumer () {
        String consumer = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.removeConsumer(): INFO: " + "Looking for consumer \'"
                    + consumer + "\' to remove it.");
        for (int aa = 0; aa < _consumers.size(); aa++) {
            String s = (String)_consumers.get(aa);
            if (consumer.equals(s)) {
                if (isTraceOn())
                    System.out.println("Queue.removeConsumer(): INFO: " + "Removing consumer \'"
                            + s + "\'.");
                _consumers.remove(aa);
                notifyAll();
                break;
            }
        }
        if (_consumers.size() == 0)
            _endOfInput = true;
    }

    public synchronized void add (Object item) {
        long addWaitTime = 0;
        while (_suspended || _tail == -1) {
            long waitStart = System.currentTimeMillis();
            try {
                wait();
            } catch (InterruptedException e) {}
            addWaitTime += (System.currentTimeMillis() - waitStart);
        }
        _addWaitTime += addWaitTime;
        // add the item to the queue's backing store
        _backingStore[_tail] = item;
        _queueEmpty = false;
        _tail++;
        if (_tail >= _backingStore.length)
            _tail = 0;          // wrap
        if (_tail == _head) {
            _tail = -1;         // special case, we're full
        }
        _queueFull = (_tail == -1) ? true : false;
        notifyAll();
    }

    public synchronized Object remove () {
        return  (remove(WAIT_FOREVER));
    }

    public synchronized Object remove (long timeout) {
        boolean timedOut = false;
        Object ret = null;
        long removeWaitTime = 0;
        if (!_endOfInput) {
            while (_suspended || _tail == _head) {
                long waitStart = System.currentTimeMillis();
                try {
                    if (timeout == WAIT_FOREVER)
                        wait(); 
                    else 
                        wait(timeout/1000);
                } catch (InterruptedException e) {}
                if (_endOfInput)
                    break;
                if (timeout != -1 && (System.currentTimeMillis() - waitStart) > timeout) {
                    timedOut = true;
                    break;
                }
                removeWaitTime += System.currentTimeMillis() - waitStart;
            }
            if (!timedOut) {
                _removeWaitTime += removeWaitTime;
                ret = _backingStore[_head];
                _queueFull = false;
                _numberOfItemsProcessed++;
                _backingStore[_head] = null;
                if (_tail == -1)
                    _tail = _head;              // point tail to new free spot
                _head++;
                if (_head >= _backingStore.length)
                    _head = 0;                  // wrap head pointer
                _queueEmpty = (_head == _tail) ? true : false;
            }
        }
        notifyAll();
        return  ret;
    }
    public int getQueueSize () {
        return  _backingStore.length;
    }
    public synchronized void setQueueSize (int queueSize) {
        // Can't set queue size on a suspended queue
        if (!_suspended) {
            // Only allow the queue to grow, not shrink...
            if (queueSize > _backingStore.length) {
                // allocate new array
                Object[] newStore = new Object[queueSize];
                System.arraycopy(_backingStore, 0, newStore, 0, _backingStore.length);
            }
        }
        notifyAll();
    }

    public long getNumberOfItemsProcessed () {
        return  _numberOfItemsProcessed;
    }
    public void setNumberOfItemsProcessed (long value) {
        _numberOfItemsProcessed = value;
    }

    public long getAddWaitTime () {
        return  _addWaitTime;
    }
    public void setAddWaitTime (long value) {
        _addWaitTime = value;
    }

    public long getRemoveWaitTime () {
        return  _removeWaitTime;
    }
    public void setRemoveWaitTime (long value) {
        _removeWaitTime = value;
    }

    public Object[] getBackingStore () {
        return  _backingStore;
    }

    public boolean isQueueFull () {
        return  _queueFull;
    }

    public boolean isQueueEmpty () {
        return  _queueEmpty;
    }

    public boolean isSuspended () {
        return  _suspended;
    }

    public boolean isEndOfInput () {
        return  _endOfInput;
    }

    public int getNumberOfSuppliers () {
        return  _suppliers.size();
    }

    public int getNumberOfConsumers () {
        return  _consumers.size();
    }

    public synchronized void suspend () {
        _suspended = true;
    }

    public synchronized void resume () {
        _suspended = false;
        notifyAll();
    }

    public void reset () {
        setNumberOfItemsProcessed(0);
        setAddWaitTime(0);
        setRemoveWaitTime(0);
        setNumberOfResets(getNumberOfResets() + 1);
    }
    private String[] _stateAsStringArray = new String[10];

    private void exposeManagementInterface () {
        MBeanInfo parentInfo = super.getMBeanInfo();
        //
        // Attributes
        //
        MBeanAttributeInfo[] parentAttributes = parentInfo.getAttributes();
        int numberOfParentAttributes = parentAttributes.length;
        MBeanAttributeInfo[] attributeInfo = new MBeanAttributeInfo[numberOfParentAttributes
                + 10];
        System.arraycopy(parentAttributes, 0, attributeInfo, 0, numberOfParentAttributes);
        attributeInfo[numberOfParentAttributes + 0] = new MBeanAttributeInfo("QueueSize", 
                Integer.TYPE.getName(), "Maximum number of items the queue may contain at one time.", 
                true, true, false);
        attributeInfo[numberOfParentAttributes + 1] = new MBeanAttributeInfo("NumberOfConsumers", 
                Integer.TYPE.getName(), "The number of consumers pulling from this Queue.", 
                true, false, false);
        attributeInfo[numberOfParentAttributes + 2] = new MBeanAttributeInfo("NumberOfSuppliers", 
                Integer.TYPE.getName(), "The number of suppliers supplying to this Queue.", 
                true, false, false);
        attributeInfo[numberOfParentAttributes + 3] = new MBeanAttributeInfo("QueueFull", 
                Boolean.TYPE.getName(), "Indicates whether or not the Queue is full.", 
                true, false, true);
        attributeInfo[numberOfParentAttributes + 4] = new MBeanAttributeInfo("QueueEmpty", 
                Boolean.TYPE.getName(), "Indicates whether or not the Queue is empty.", 
                true, false, true);
        attributeInfo[numberOfParentAttributes + 5] = new MBeanAttributeInfo("Suspended", 
                Boolean.TYPE.getName(), "Indicates whether or not the Queue is currently suspended.", 
                true, false, true);
        attributeInfo[numberOfParentAttributes + 6] = new MBeanAttributeInfo("EndOfInput", 
                Boolean.TYPE.getName(), "Indicates whether or not the end-of-input has been signalled by all suppliers.", 
                true, false, true);
        attributeInfo[numberOfParentAttributes + 7] = new MBeanAttributeInfo("NumberOfItemsProcessed", 
                Long.TYPE.getName(), "The number of items that have been removed from the Queue.", 
                true, false, false);
        attributeInfo[numberOfParentAttributes + 8] = new MBeanAttributeInfo("AddWaitTime", 
                Long.TYPE.getName(), "The number of milliseconds spent waiting to add because the Queue was full.", 
                true, false, false);
        attributeInfo[numberOfParentAttributes + 9] = new MBeanAttributeInfo("RemoveWaitTime", 
                Long.TYPE.getName(), "The number of milliseconds spent waiting to remove because the Queue was empty.", 
                true, false, false);
        //
        // Constructors
        //
        Class[] signature =  {
            Integer.TYPE
        };
        Constructor constructor = null;
        MBeanConstructorInfo[] constructorInfo = new MBeanConstructorInfo[1];
        try {
            constructor = this.getClass().getConstructor(signature);
            constructorInfo[0] = new MBeanConstructorInfo("Custom constructor", 
                    constructor);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //
        // Operations
        //
        MBeanOperationInfo[] operationInfo = new MBeanOperationInfo[2];
        MBeanParameterInfo[] parms = new MBeanParameterInfo[0];
        operationInfo[0] = new MBeanOperationInfo("suspend", "Suspends processing of the Queue.", 
                parms, Void.TYPE.getName(), MBeanOperationInfo.ACTION);
        operationInfo[1] = new MBeanOperationInfo("resume", "Resumes processing of the Queue.", 
                parms, Void.TYPE.getName(), MBeanOperationInfo.ACTION);
        //
        // Notifications
        //
        MBeanNotificationInfo[] notificationInfo = new MBeanNotificationInfo[0];
        //
        // MBeanInfo
        //
        _MBeanInfo = new MBeanInfo("Queue", "Queue MBean", attributeInfo, constructorInfo, 
                operationInfo, notificationInfo);
    }

    // DynamicMBean Implementation
    public Object getAttribute (String attributeName) throws AttributeNotFoundException, 
            MBeanException, ReflectionException {
        Object ret = null;
        // Nope, not on parent class, must be on this one
        if (attributeName.equals("QueueSize")) {
            ret = new Integer(getQueueSize());
        } 
        else if (attributeName.equals("NumberOfSuppliers")) {
            ret = new Integer(getNumberOfSuppliers());
        } 
        else if (attributeName.equals("NumberOfConsumers")) {
            ret = new Integer(getNumberOfConsumers());
        } 
        else if (attributeName.equals("QueueFull")) {
            ret = new Boolean(isQueueFull());
        } 
        else if (attributeName.equals("QueueEmpty")) {
            ret = new Boolean(isQueueEmpty());
        } 
        else if (attributeName.equals("Suspended")) {
            ret = new Boolean(isSuspended());
        } 
        else if (attributeName.equals("EndOfInput")) {
            ret = new Boolean(isEndOfInput());
        } 
        else if (attributeName.equals("NumberOfItemsProcessed")) {
            ret = new Long(getNumberOfItemsProcessed());
        } 
        else if (attributeName.equals("AddWaitTime")) {
            ret = new Long(getAddWaitTime());
        } 
        else if (attributeName.equals("RemoveWaitTime")) {
            ret = new Long(getRemoveWaitTime());
        } 
        else {
            ret = super.getAttribute(attributeName);
        }
        return  ret;
    }

    /**
     * put your documentation comment here
     * @param attribute
     * @exception AttributeNotFoundException, InvalidAttributeValueException, MBeanException, 
     ReflectionException
     */
    public void setAttribute (Attribute attribute) throws AttributeNotFoundException, 
            InvalidAttributeValueException, MBeanException, ReflectionException {
        String name = attribute.getName();
        Object value = attribute.getValue();
        if (name.equals("QueueSize")) {
            setQueueSize(((Integer)value).intValue());
        }
        // parent will throw exception if attribute not found
        super.setAttribute(attribute);
    }

    /**
     * put your documentation comment here
     * @param attributeNames
     * @return 
     */
    public AttributeList getAttributes (String[] attributeNames) {
        //
        // Make sure the attribute names String array is not null, or all of
        /// the .equals() calls will toss a NullPointerException
        //
        AttributeList resultList = new AttributeList();
        for (int aa = 0; aa < attributeNames.length; aa++) {
            try {
                Object value = getAttribute((String)attributeNames[aa]);
                resultList.add(new Attribute(attributeNames[aa], value));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return  (resultList);
    }

    /**
     * put your documentation comment here
     * @param attributes
     * @return 
     */
    public AttributeList setAttributes (AttributeList attributes) {
        for (int aa = 0; aa < attributes.size(); aa++) {
            try {
                setAttribute((Attribute)attributes.get(aa));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        String[] attributeNames = new String[attributes.size()];
        for (int aa = 0; aa < attributeNames.length; aa++) {
            attributeNames[aa] = ((Attribute)attributes.get(aa)).getName();
        }
        return  getAttributes(attributeNames);
    }

    /**
     * put your documentation comment here
     * @param operationName
     * @param params[]
     * @param signature[]
     * @return 
     * @exception MBeanException, ReflectionException
     */
    public Object invoke (String operationName, Object params[], String signature[]) throws MBeanException, 
            ReflectionException {
        Object ret = Void.TYPE;
        //
        // Make sure the operation name String is not null, or all of
        /// the .equals() calls will toss a NullPointerException
        //
        ret = Void.TYPE;
        if (operationName.equals("suspend")) {
            suspend();
        } 
        else if (operationName.equals("resume")) {
            resume();
        } 
        else {
            super.invoke(operationName, params, signature);
        }
        return  ret;
    }

    /**
     * put your documentation comment here
     * @return 
     */
    public MBeanInfo getMBeanInfo () {
        return  (_MBeanInfo);
    }
}