FileDocCategorySizeDatePackage
Queue.javaAPI DocExample7844Thu May 23 09:32:50 BST 2002 sample.standard

Queue.java

package  sample.standard;

import  java.util.ArrayList;

/**
 * A very simple queue.
 * Implemented as a standard MBean
 */
public class Queue extends Basic implements QueueMBean {
    
    private static final int DEFAULT_QUEUE_SIZE = 10;
    private static final int WAIT_FOREVER = -1;
    // Queue guts
    private int _head;
    private int _tail;
    private Object[] _backingStore;
    private boolean _suspended;
    private boolean _queueFull;
    private boolean _queueEmpty;
    private ArrayList _suppliers = new ArrayList();
    private ArrayList _consumers = new ArrayList();
    private boolean _endOfInput;
    // Metrics
    private long _addWaitTime;
    private long _removeWaitTime;
    private long _numberOfItemsProcessed;

    public Queue () {
        this(DEFAULT_QUEUE_SIZE);
    }
    public Queue (int queueSize) {
        // Create the backing store. If the specified queue size is
        /// bogus, use the default queue size
        _backingStore = new Object[(queueSize > 0) ? queueSize : DEFAULT_QUEUE_SIZE];
        _queueEmpty = true;
    }

    public int getQueueSize () {
        return  _backingStore.length;
    }
    public synchronized void setQueueSize (int queueSize) {
        // Can't set queue size on a suspended queue
        if (!_suspended) {
            // Only allow the queue to grow, not shrink...
            if (queueSize > _backingStore.length) {
                // allocate new array
                Object[] newStore = new Object[queueSize];
                System.arraycopy(_backingStore, 0, newStore, 0, _backingStore.length);
            }
        }
        notifyAll();
    }


    public long getNumberOfItemsProcessed () {
        return  _numberOfItemsProcessed;
    }
    public void setNumberOfItemsProcessed (long value) {
        _numberOfItemsProcessed = value;
    }

    public long getAddWaitTime () {
        return  _addWaitTime;
    }
    public void setAddWaitTime (long value) {
        _addWaitTime = value;
    }

    public long getRemoveWaitTime () {
        return  _removeWaitTime;
    }
    public void setRemoveWaitTime (long value) {
        _removeWaitTime = value;
    }

    public Object[] getBackingStore () {
        return  _backingStore;
    }

    public boolean isQueueFull () {
        return  _queueFull;
    }

    public boolean isQueueEmpty () {
        return  _queueEmpty;
    }

    public boolean isSuspended () {
        return  _suspended;
    }

    public boolean isEndOfInput () {
        return  _endOfInput;
    }

    public int getNumberOfSuppliers () {
        return  _suppliers.size();
    }

    public int getNumberOfConsumers () {
        return  _consumers.size();
    }

    public synchronized void suspend () {
        _suspended = true;
    }

    public synchronized void resume () {
        _suspended = false;
        notifyAll();
    }

    public void reset () {
        setNumberOfItemsProcessed(0);
        setAddWaitTime(0);
        setRemoveWaitTime(0);
        setNumberOfResets(getNumberOfResets() + 1);
    }
    public synchronized void addSupplier () {
        String supplierName = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.addSupplier(): INFO: " + "Adding supplier \'"
                    + supplierName + "\'.");
        _suppliers.add(supplierName);
    }

    public synchronized void removeSupplier () {
        String supplier = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.removeSupplier(): INFO: " + "Looking for supplier \'"
                    + supplier + "\' to remove it.");
        for (int aa = 0; aa < _suppliers.size(); aa++) {
            String s = (String)_suppliers.get(aa);
            if (supplier.equals(s)) {
                if (isTraceOn())
                    System.out.println("Queue.removeSupplier(): INFO: " + "Removing supplier \'"
                            + s + "\'.");
                _suppliers.remove(aa);
                break;
            }
        }
        if (_suppliers.size() == 0)
            _endOfInput = true;
        notifyAll();
    }

    public synchronized void addConsumer () {
        String consumerName = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.addConsumer(): INFO: " + "Adding consumer \'"
                    + consumerName + "\'.");
        _consumers.add(consumerName);
    }

    public synchronized void removeConsumer () {
        String consumer = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.removeConsumer(): INFO: " + "Looking for consumer \'"
                    + consumer + "\' to remove it.");
        for (int aa = 0; aa < _consumers.size(); aa++) {
            String s = (String)_consumers.get(aa);
            if (consumer.equals(s)) {
                if (isTraceOn())
                    System.out.println("Queue.removeConsumer(): INFO: " + "Removing consumer \'"
                            + s + "\'.");
                _consumers.remove(aa);
                break;
            }
        }
        notifyAll();
    }

    public synchronized void add (Object item) {
        long addWaitTime = 0;
        while (_suspended || _tail == -1) {
            long waitStart = System.currentTimeMillis();
            try {
                wait();
            } catch (InterruptedException e) {}
            addWaitTime += (System.currentTimeMillis() - waitStart);
        }
        _addWaitTime += addWaitTime;
        // add the item to the queue's backing store
        _backingStore[_tail] = item;
        _queueEmpty = false;
        _tail++;
        if (_tail >= _backingStore.length)
            _tail = 0;          // wrap
        if (_tail == _head) {
            _tail = -1;         // special case, we're full
        }
        _queueFull = (_tail == -1) ? true : false;
        notifyAll();
    }

    public synchronized Object remove () {
        return  (remove(WAIT_FOREVER));
    }

    public synchronized Object remove (long timeout) {
        boolean timedOut = false;
        Object ret = null;
        long removeWaitTime = 0;
        if (!(_queueEmpty && _endOfInput)) {
            while (_suspended || _queueEmpty) {
                long waitStart = System.currentTimeMillis();
                try {
                    if (timeout == WAIT_FOREVER)
                        wait(); 
                    else 
                        wait(timeout/1000);
                } catch (InterruptedException e) {}
                if (_endOfInput) {
                    break;
                }
                if (timeout != WAIT_FOREVER && (System.currentTimeMillis() - 
                        waitStart) > timeout) {
                    timedOut = true;
                    break;
                }
                removeWaitTime += System.currentTimeMillis() - waitStart;
            }
            if (!timedOut && !_queueEmpty) {
                _removeWaitTime += removeWaitTime;
                ret = _backingStore[_head];
                _queueFull = false;
                _numberOfItemsProcessed++;
                _backingStore[_head] = null;
                if (_tail == -1)
                    _tail = _head;              // point tail to new free spot
                _head++;
                if (_head >= _backingStore.length)
                    _head = 0;                  // wrap head pointer
                _queueEmpty = (_head == _tail) ? true : false;
            }
        }
        notifyAll();
        return  ret;
    }

}