FileDocCategorySizeDatePackage
ConsumerQProcessor.javaAPI DocGlassfish v2 API8451Fri May 04 22:24:18 BST 2007com.sun.enterprise.admin.monitor.callflow

ConsumerQProcessor

public class ConsumerQProcessor extends Object implements Runnable
todo
Add logging code
todo
replace this thread by the ORB Thread Pool Thread
author
Harpreet Singh

Fields Summary
BlockingQueue
consumerQ
private DbAccessObject
dbAccessObject
boolean
processQ
private int
sleepTime
private static final Logger
logger
private static final String
CONSUMER_SLEEP_TIME
boolean
traceOn
private HandlerChain
chain
private long
numOfQueuesProcessed
private long
numOfTimesQSlept
Constructors Summary
public ConsumerQProcessor(BlockingQueue consumerQ, HandlerChain chain)
Creates a new instance of ConsumerQProcessor

               
       
              
        this.consumerQ = consumerQ;
        this.chain = chain;
        dbAccessObject = DbAccessObjectImpl.getInstance();
        traceOn = TraceOnHelper.isTraceOn();
        sleepTime = Math.abs(Integer.getInteger(CONSUMER_SLEEP_TIME, 5000));
    
Methods Summary
voidconsume()

        int noOfQueues = consumerQ.size ();
        numOfQueuesProcessed += noOfQueues;
        if (traceOn){
            logger.log(Level.INFO, "Callflow: ConsumerQProcessor.consume. "+
                    "ConsumerQ.length ="+ noOfQueues + ". Total number of " +
                    " Q's Processed = "+ numOfQueuesProcessed);
        }

        for (int i = 0; i<noOfQueues; i++){
            CallflowProducerQueue q =  (CallflowProducerQueue) consumerQ.poll();
            if (q != null){
                if (traceOn){
                    logger.log(Level.INFO, "Callflow: ConsumerQProcessor.processProducerQ "+
                            q.getName());
                }
                TransferObject[] to = q.getAndRemoveAll();
                if (to != null){
                    Handler[] handler = chain.getHandlers();
                    for (int j=0;j<handler.length;j++ ){
                        if (traceOn){
                            logger.log(Level.INFO, "Callflow: ConsumerQProcessor.processProducerQ "+
                                    handler[j]);
                        }
                        handler[j].handle(to);
                    }
                }
            }
        }
   
public java.lang.StringgetCONSUMER_SLEEP_TIME()

        return CONSUMER_SLEEP_TIME;
    
public longgetNumOfQueuesProcessed()

        return numOfQueuesProcessed;
    
public longgetNumOfTimesQSlept()

        return numOfTimesQSlept;
    
public intgetSleepTime()
consumes the ConsumerQ. Returns the number of Q's processed

        return sleepTime;
    
public voidrun()

        if (traceOn)
            logger.log(Level.INFO, "Callflow: ConsumerQProcessor.entering run ");

        while (processQ){
            if(!consumerQ.isEmpty()){
                consume ();
            } else{
                try {
                    if (traceOn){
                        logger.log(Level.INFO, "Callflow: ConsumerQProcessor." +
                                "Q Empty, sleeping for "+ getSleepTime () + 
                                " ms. Sleeping for the "+getNumOfTimesQSlept()
                                + " th time.");
                    }
                    this.numOfTimesQSlept++;
                    Thread.sleep(getSleepTime());
                } catch (InterruptedException e){
                    if (processQ == false){
                        // do not quit abruptly. Process any items remaining in
                        // queues before quitting.
                        if (traceOn)
                            logger.log(Level.INFO, "Callflow: ConsumerQProcessor." +
                                "Disable Called, forcing consumption");

                        break;
                    }
                }
            }
            
        }
        // force consumption
        consume();
        if (traceOn){
            logger.log(Level.INFO, "Callflow: ConsumerQProcessor.exiting run. "+
                    " Num of Entries Processed for each table \n"+
                    this.dbAccessObject.getNumOfRequestsProcessedAsString()
                    +" \n");
            
        }
    
public voidsetSleepTime(int sleepTime)

        this.sleepTime = sleepTime;
    
public voidstopConsumerThread()

        processQ = false;
        if (traceOn){
            logger.log (Level.INFO, "Callflow: ConsumerQProcessor.stopConsumerThread ");
        }