FileDocCategorySizeDatePackage
Queue.javaAPI DocExample7844Thu May 23 09:32:50 BST 2002 sample.standard

Queue

public class Queue extends Basic implements QueueMBean
A very simple queue. Implemented as a standard MBean

Fields Summary
private static final int
DEFAULT_QUEUE_SIZE
private static final int
WAIT_FOREVER
private int
_head
private int
_tail
private Object[]
_backingStore
private boolean
_suspended
private boolean
_queueFull
private boolean
_queueEmpty
private ArrayList
_suppliers
private ArrayList
_consumers
private boolean
_endOfInput
private long
_addWaitTime
private long
_removeWaitTime
private long
_numberOfItemsProcessed
Constructors Summary
public Queue()


       
        this(DEFAULT_QUEUE_SIZE);
    
public Queue(int queueSize)

        // Create the backing store. If the specified queue size is
        /// bogus, use the default queue size
        _backingStore = new Object[(queueSize > 0) ? queueSize : DEFAULT_QUEUE_SIZE];
        _queueEmpty = true;
    
Methods Summary
public synchronized voidadd(java.lang.Object item)

        long addWaitTime = 0;
        while (_suspended || _tail == -1) {
            long waitStart = System.currentTimeMillis();
            try {
                wait();
            } catch (InterruptedException e) {}
            addWaitTime += (System.currentTimeMillis() - waitStart);
        }
        _addWaitTime += addWaitTime;
        // add the item to the queue's backing store
        _backingStore[_tail] = item;
        _queueEmpty = false;
        _tail++;
        if (_tail >= _backingStore.length)
            _tail = 0;          // wrap
        if (_tail == _head) {
            _tail = -1;         // special case, we're full
        }
        _queueFull = (_tail == -1) ? true : false;
        notifyAll();
    
public synchronized voidaddConsumer()

        String consumerName = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.addConsumer(): INFO: " + "Adding consumer \'"
                    + consumerName + "\'.");
        _consumers.add(consumerName);
    
public synchronized voidaddSupplier()

        String supplierName = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.addSupplier(): INFO: " + "Adding supplier \'"
                    + supplierName + "\'.");
        _suppliers.add(supplierName);
    
public longgetAddWaitTime()

        return  _addWaitTime;
    
public java.lang.Object[]getBackingStore()

        return  _backingStore;
    
public intgetNumberOfConsumers()

        return  _consumers.size();
    
public longgetNumberOfItemsProcessed()

        return  _numberOfItemsProcessed;
    
public intgetNumberOfSuppliers()

        return  _suppliers.size();
    
public intgetQueueSize()

        return  _backingStore.length;
    
public longgetRemoveWaitTime()

        return  _removeWaitTime;
    
public booleanisEndOfInput()

        return  _endOfInput;
    
public booleanisQueueEmpty()

        return  _queueEmpty;
    
public booleanisQueueFull()

        return  _queueFull;
    
public booleanisSuspended()

        return  _suspended;
    
public synchronized java.lang.Objectremove()

        return  (remove(WAIT_FOREVER));
    
public synchronized java.lang.Objectremove(long timeout)

        boolean timedOut = false;
        Object ret = null;
        long removeWaitTime = 0;
        if (!(_queueEmpty && _endOfInput)) {
            while (_suspended || _queueEmpty) {
                long waitStart = System.currentTimeMillis();
                try {
                    if (timeout == WAIT_FOREVER)
                        wait(); 
                    else 
                        wait(timeout/1000);
                } catch (InterruptedException e) {}
                if (_endOfInput) {
                    break;
                }
                if (timeout != WAIT_FOREVER && (System.currentTimeMillis() - 
                        waitStart) > timeout) {
                    timedOut = true;
                    break;
                }
                removeWaitTime += System.currentTimeMillis() - waitStart;
            }
            if (!timedOut && !_queueEmpty) {
                _removeWaitTime += removeWaitTime;
                ret = _backingStore[_head];
                _queueFull = false;
                _numberOfItemsProcessed++;
                _backingStore[_head] = null;
                if (_tail == -1)
                    _tail = _head;              // point tail to new free spot
                _head++;
                if (_head >= _backingStore.length)
                    _head = 0;                  // wrap head pointer
                _queueEmpty = (_head == _tail) ? true : false;
            }
        }
        notifyAll();
        return  ret;
    
public synchronized voidremoveConsumer()

        String consumer = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.removeConsumer(): INFO: " + "Looking for consumer \'"
                    + consumer + "\' to remove it.");
        for (int aa = 0; aa < _consumers.size(); aa++) {
            String s = (String)_consumers.get(aa);
            if (consumer.equals(s)) {
                if (isTraceOn())
                    System.out.println("Queue.removeConsumer(): INFO: " + "Removing consumer \'"
                            + s + "\'.");
                _consumers.remove(aa);
                break;
            }
        }
        notifyAll();
    
public synchronized voidremoveSupplier()

        String supplier = Thread.currentThread().getName();
        if (isTraceOn())
            System.out.println("Queue.removeSupplier(): INFO: " + "Looking for supplier \'"
                    + supplier + "\' to remove it.");
        for (int aa = 0; aa < _suppliers.size(); aa++) {
            String s = (String)_suppliers.get(aa);
            if (supplier.equals(s)) {
                if (isTraceOn())
                    System.out.println("Queue.removeSupplier(): INFO: " + "Removing supplier \'"
                            + s + "\'.");
                _suppliers.remove(aa);
                break;
            }
        }
        if (_suppliers.size() == 0)
            _endOfInput = true;
        notifyAll();
    
public voidreset()

        setNumberOfItemsProcessed(0);
        setAddWaitTime(0);
        setRemoveWaitTime(0);
        setNumberOfResets(getNumberOfResets() + 1);
    
public synchronized voidresume()

        _suspended = false;
        notifyAll();
    
public voidsetAddWaitTime(long value)

        _addWaitTime = value;
    
public voidsetNumberOfItemsProcessed(long value)

        _numberOfItemsProcessed = value;
    
public synchronized voidsetQueueSize(int queueSize)

        // Can't set queue size on a suspended queue
        if (!_suspended) {
            // Only allow the queue to grow, not shrink...
            if (queueSize > _backingStore.length) {
                // allocate new array
                Object[] newStore = new Object[queueSize];
                System.arraycopy(_backingStore, 0, newStore, 0, _backingStore.length);
            }
        }
        notifyAll();
    
public voidsetRemoveWaitTime(long value)

        _removeWaitTime = value;
    
public synchronized voidsuspend()

        _suspended = true;