FileDocCategorySizeDatePackage
CallflowProducerQueue.javaAPI DocGlassfish v2 API7886Fri May 04 22:24:18 BST 2007com.sun.enterprise.admin.monitor.callflow

CallflowProducerQueue

public class CallflowProducerQueue extends Object
This is a ProducerQ that is populated by the Agent. A different asynchronous thread is responsible for consuming items produced in this queue. CallflowProducerQueue is created for each callflow table in the database i.e. RequestStart, RequestEnd, MethodStart, MethodEnd, StartTime and EndTime tables. An instance of the ConsumerQ is passed to this object on creation. The producerQ contains TransferObjects that are eventually written to the database.
todo
Make this class JMX capable.
author
Harpreet Singh

Fields Summary
private static final Logger
logger
boolean
traceOn
int
qSize
private String
name
double
THRESHOLD_PERCENTAGE
Consumer notified when threshold is reached. Default is 80%.
long
threshold
AtomicInteger
CURRENT_SIZE
AtomicBoolean
NOTIFIED_CONSUMER
AtomicLong
entriesProcessed
ConcurrentLinkedQueue
producerQ
Contains TransferObjects that are written to the database.
private BlockingQueue
consumerQ
ConsumerQueue. Producer pushes TransferObjects to ConsumerQ and notifies the consumer thread to process the queue.
Constructors Summary
private CallflowProducerQueue(BlockingQueue consumerQ, String name, int qSize)
Creates a new instance of CallflowProducerQueue

        this.consumerQ = consumerQ;
        this.name = name;
        this.qSize = qSize; 
        traceOn = TraceOnHelper.isTraceOn();
    
Methods Summary
public voidadd(TransferObject to)
Add a TransferObject to the producerQ. Notifies the consumer if Q is above threshold percentage. Adds after reaching threshold percentage are allowed.

        producerQ.add(to);
        int current = CURRENT_SIZE.incrementAndGet();
        if (traceOn){
            logger.log(Level.INFO, "Callflow:CallflowProducerQ.add : " + name +
                    " adding row ; QSize = "+ current +
                    " ThresholdSize = "+ threshold);
        }
        if (current >= threshold){
            flush();
        }
    
private voidcalculateThreshold()

        threshold = Math.round(THRESHOLD_PERCENTAGE * qSize);
    
public voidflush()
Allows flushing the producerQ's. This will be called when callflow is to be disabled and all collected data needs to be explicitly flushed out.

        if (CURRENT_SIZE.get () <=0){
            return;
        }
        boolean notified = NOTIFIED_CONSUMER.get();
        
        if (!notified){
            try {
                consumerQ.put(this);
                NOTIFIED_CONSUMER.set(true);
                if (traceOn){
                    logger.log(Level.INFO, "Callflow: CallflowProducerQ.flush:"
                            + name +" notifying ConsumerQ ");
                }
            } catch (InterruptedException ex) {
            }
        } else {
            if (traceOn){
                logger.log(Level.INFO, "Callflow: CallflowProducerQ.flush : "
                        + name + " "+ "ConsumerPreviouslyNotified? "+ notified+
                        " Not renotifying the consumer.");
            }
        }
    
public TransferObject[]getAndRemoveAll()
Empties the producerQ and returns all the TransferObjects.

        int current = CURRENT_SIZE.get();
        if (current == 0){
            return null;
        }
        
        TransferObject[] to = new TransferObject[current];
        for (int i=0; i<current; i++){
            to[i] = producerQ.poll();
        }
        long numOfEntries = CURRENT_SIZE.longValue();
        CURRENT_SIZE.set(CURRENT_SIZE.get() - current);
        NOTIFIED_CONSUMER.set(false);
        entriesProcessed.addAndGet(numOfEntries);
        if (traceOn){
            logger.log(Level.INFO, "Callflow: CallflowProducerQ.getAndRemoveAll:"
                    + name + " Old Q Size = "+ current + " New Q Size = "+
                    CURRENT_SIZE.get() + " Notified ConsumerQ reset to false."+
                    " Entries Processed so far = "+ entriesProcessed.longValue());
        }
        return to;
    
public intgetCurrentSize()

        return CURRENT_SIZE.intValue();
    
public longgetEntriesProcessed()

        return entriesProcessed.longValue();
    
public static com.sun.enterprise.admin.monitor.callflow.CallflowProducerQueuegetInstance(java.util.concurrent.BlockingQueue consumerQ, java.lang.String name, int qSize)

    
      
              
               
        return new CallflowProducerQueue(consumerQ, name, qSize);
    
public java.lang.StringgetName()

        return name;
    
public intgetQSize()

        return this.qSize;
    
public voidsetName(java.lang.String name)

        this.name = name;
    
public voidsetQSize(int size)

        this.qSize = size;
        calculateThreshold();