FileDocCategorySizeDatePackage
Queue.javaAPI DocExample14087Thu May 23 09:32:50 BST 2002 sample.notification

Queue

public class Queue extends Basic implements QueueMBean, NotificationBroadcaster, MBeanRegistration
A very simple queue. Implemented as a standard MBean

Fields Summary
private static final int
DEFAULT_QUEUE_SIZE
private int
_head
private int
_tail
private Object[]
_backingStore
private boolean
_suspended
private boolean
_queueFull
private boolean
_queueEmpty
private ArrayList
_suppliers
private ArrayList
_consumers
private boolean
_endOfInput
private long
_addWaitTime
private long
_removeWaitTime
private long
_numberOfItemsProcessed
private GenericBroadcaster
_broadcaster
private StalledQueueWatcher
_queueWatcher
private static final String
STALLED_QUEUE_FULL
private static final String
STALLED_QUEUE_EMPTY
Constructors Summary
public Queue()

        this(DEFAULT_QUEUE_SIZE);
    
public Queue(int queueSize)

        // Create the backing store. If the specified queue size is
        /// bogus, use the default queue size
        _backingStore = new Object[(queueSize > 0) ? queueSize : DEFAULT_QUEUE_SIZE];
        _queueEmpty = true;
        //
        // Create a timer to monitor potential queue
        /// stalled conditions.
        //
        (new Thread(_queueWatcher)).start();
    
Methods Summary
public synchronized voidadd(java.lang.Object item)

        long addWaitTime = 0;
        while (_suspended || _tail == -1) {
            long waitStart = System.currentTimeMillis();
            try {
                wait();
            } catch (InterruptedException e) {}
            addWaitTime += (System.currentTimeMillis() - waitStart);
        }
        _addWaitTime += addWaitTime;
        // add the item to the queue's backing store
        _backingStore[_tail] = item;
        _queueWatcher.reset();
        _queueEmpty = false;
        _tail++;
        if (_tail >= _backingStore.length)
            _tail = 0;          // wrap
        if (_tail == _head) {
            _tail = -1;         // special case, we're full
        }
        _queueFull = (_tail == -1) ? true : false;
        //    if (_queueFull)
        //      trace("Queue is now full.");
        notifyAll();
    
public synchronized voidaddConsumer()

        String consumerName = Thread.currentThread().getName();
        trace("Queue.addConsumer(): INFO: " + "Adding consumer \'" + consumerName
                + "\'.");
        _consumers.add(consumerName);
    
public voidaddNotificationListener(javax.management.NotificationListener listener, javax.management.NotificationFilter filter, java.lang.Object handback)
Adds a listener to a registered MBean.

param
listener The listener object which will handle the notifications emitted by the registered MBean.
param
filter The filter object. If filter is null, no filtering will be performed before handling notifications.
param
handback An opaque object to be sent back to the listener when a notification is emitted. This object cannot be used by the Notification broadcaster object. It should be resent unchanged with the notification to the listener.
exception
IllegalArgumentException Listener parameter is null.


    //
    // Implement NotificationBroadcaster interface via containment
    //
                                                                                            
           
                
        _broadcaster.addNotificationListener(listener, filter, handback);
    
public synchronized voidaddSupplier()

        String supplierName = Thread.currentThread().getName();
        trace("Queue.addSupplier(): INFO: " + "Adding supplier \'" + supplierName
                + "\'.");
        _suppliers.add(supplierName);
    
public longgetAddWaitTime()

        return  _addWaitTime;
    
public java.lang.Object[]getBackingStore()

        return  _backingStore;
    
public javax.management.MBeanNotificationInfo[]getNotificationInfo()
Returns an MBeanNotificationInfo object contaning the name of the Java class of the notification and the notification types sent.

        return  _broadcaster.getNotificationInfo();
    
public intgetNumberOfConsumers()

        return  _consumers.size();
    
public longgetNumberOfItemsProcessed()

        return  _numberOfItemsProcessed;
    
public intgetNumberOfSuppliers()

        return  _suppliers.size();
    
public intgetQueueSize()

        return  _backingStore.length;
    
public longgetRemoveWaitTime()

        return  _removeWaitTime;
    
public booleanisEndOfInput()

        return  _endOfInput;
    
public booleanisQueueEmpty()

        //    trace("Queue.isQueueEmpty(): INFO: returning " + _queueEmpty);
        return  _queueEmpty;
    
public booleanisQueueFull()

        //    trace("Queue.isQueueFull(): INFO: returning " + _queueFull);
        return  _queueFull;
    
public booleanisSuspended()

        return  _suspended;
    
public voidpostDeregister()

        trace("Queue.postDeregister(): INFO: Invocation complete");
    
public voidpostRegister(java.lang.Boolean registrationDone)

        trace("Queue.postRegister(): INFO: Invocation complete");
    
public voidpreDeregister()

        trace("Queue.preDeregister(): INFO: Invocation complete");
    
public javax.management.ObjectNamepreRegister(javax.management.MBeanServer server, javax.management.ObjectName name)

        ObjectName ret = name;
        if (name == null)
            ret = new ObjectName(server.getDefaultDomain() + ":" + "name=Queue,selfRegister=true");
        trace("Queue.preRegister(): INFO: Invocation complete");
        return  ret;
    
public synchronized java.lang.Objectremove()

        Object ret = null;
        long removeWaitTime = 0;
        if (!(_queueEmpty && _endOfInput)) {
            while (_suspended || _queueEmpty) {
                long waitStart = System.currentTimeMillis();
                try {
                    wait();
                } catch (InterruptedException e) {}
                if (_endOfInput) {
                    break;
                }
                removeWaitTime += System.currentTimeMillis() - waitStart;
            }
            if (!_queueEmpty) {
                _removeWaitTime += removeWaitTime;
                ret = _backingStore[_head];
                _queueWatcher.reset();
                _queueFull = false;
                _numberOfItemsProcessed++;
                _backingStore[_head] = null;
                if (_tail == -1)
                    _tail = _head;              // point tail to new free spot
                _head++;
                if (_head >= _backingStore.length)
                    _head = 0;                  // wrap head pointer
                _queueEmpty = (_head == _tail) ? true : false;
                //        if (_queueEmpty)
                //          trace("Queue is now empty.");
            }
        }
        notifyAll();
        return  ret;
    
public synchronized voidremoveConsumer()

        String consumer = Thread.currentThread().getName();
        trace("Queue.removeConsumer(): INFO: " + "Looking for consumer \'" + 
                consumer + "\' to remove it.");
        for (int aa = 0; aa < _consumers.size(); aa++) {
            String s = (String)_consumers.get(aa);
            if (consumer.equals(s)) {
                trace("Queue.removeConsumer(): INFO: " + "Removing consumer \'"
                        + s + "\'.");
                _consumers.remove(aa);
                break;
            }
        }
        notifyAll();
    
public voidremoveNotificationListener(javax.management.NotificationListener listener)
Removes a listener from a registered MBean.

param
name The name of the MBean on which the listener should be removed.
param
listener The listener object which will handle the notifications emitted by the registered MBean. This method will remove all the information related to this listener.
exception
ListenerNotFoundException The listener is not registered in the MBean.

        _broadcaster.removeNotificationListener(listener);
    
public synchronized voidremoveSupplier()

        String supplier = Thread.currentThread().getName();
        trace("Queue.removeSupplier(): INFO: " + "Looking for supplier \'" + 
                supplier + "\' to remove it.");
        for (int aa = 0; aa < _suppliers.size(); aa++) {
            String s = (String)_suppliers.get(aa);
            if (supplier.equals(s)) {
                trace("Queue.removeSupplier(): INFO: " + "Removing supplier \'"
                        + s + "\'.");
                _suppliers.remove(aa);
                break;
            }
        }
        if (_suppliers.size() == 0)
            _endOfInput = true;
        notifyAll();
    
public voidreset()

        setNumberOfItemsProcessed(0);
        setAddWaitTime(0);
        setRemoveWaitTime(0);
        setNumberOfResets(getNumberOfResets() + 1);
    
public synchronized voidresume()

        _suspended = false;
        notifyAll();
    
public voidsetAddWaitTime(long value)

        _addWaitTime = value;
    
public voidsetNumberOfItemsProcessed(long value)

        _numberOfItemsProcessed = value;
    
public synchronized voidsetQueueSize(int queueSize)

        // Can't set queue size on a suspended queue
        if (!_suspended) {
            // Only allow the queue to grow, not shrink...
            if (queueSize > _backingStore.length) {
                // allocate new array
                Object[] newStore = new Object[queueSize];
                System.arraycopy(_backingStore, 0, newStore, 0, _backingStore.length);
            }
        }
        notifyAll();
    
public voidsetRemoveWaitTime(long value)

        _removeWaitTime = value;
    
public synchronized voidsuspend()

        _suspended = true;
    
private voidtrace(java.lang.String message)

        if (isTraceOn()) {
            System.out.println(Thread.currentThread().getName() + ":" + message);
        }