FileDocCategorySizeDatePackage
AsyncHandlerProducer.javaAPI DocGlassfish v2 API13665Fri May 04 22:24:18 BST 2007com.sun.enterprise.admin.monitor.callflow

AsyncHandlerProducer

public class AsyncHandlerProducer extends Object implements AsyncHandlerIntf
A performant implementation for AsyncHandler. It holds reference to six producer queues; one for each type of callflow table.
author
Harpreet Singh

Fields Summary
private static final Logger
logger
public static final String
Q_SIZE_PROPERTY
Sets the size of the Q's handling each of the tables. This can be superseded for (END|START)Time table property CONTAINER_TIME_PROPERTY (Since END|START Time table roughly fills 2-3x faster than other table
private int
qSize
public static final String
CONTAINER_TIME_Q_SIZE_PROPERTY
private int
containerQSize
public static AsyncHandlerProducer
_singleton
CallflowProducerQueue
rsq
CallflowProducerQueue
req
CallflowProducerQueue
msq
CallflowProducerQueue
meq
CallflowProducerQueue
stq
CallflowProducerQueue
etq
boolean
traceOn
private static final BlockingQueue
consumerQ
The ConsumerQ. This queue is populated by instances of the ProducerQ. The consumer thread goes over this Q and processes the Q to write information to the database.
HandlerChain
handlerChain
private ConsumerQProcessor
consumerThread
private Thread
consumer
boolean
isThreadInitialized
private boolean
enabled
Constructors Summary
private AsyncHandlerProducer()
Creates a new instance of AsyncHandlerProducer

           
      
        setupQSizes ();        
        setupHandlerChain ();
        // create the chain that will be processed
        consumerThread = new ConsumerQProcessor (consumerQ, handlerChain);
        traceOn = TraceOnHelper.isTraceOn();        
    
Methods Summary
private voidclearQs()

        rsq = null;
        req = null;
        msq = null;
        meq = null;
        stq = null;
        etq = null;
    
private java.lang.ThreadcreateThread()

        Thread t = new Thread (consumerThread, "CallFlow Async Consumer"); 
        t.setDaemon(true);
        return t;
    
public synchronized voiddisable()

        if (enabled){
            flushQs ();            
            consumerThread.stopConsumerThread();
            isThreadInitialized = false;
            enabled = false;
            forceConsumption();
            clearQs ();
        }
        if (traceOn){            
            logger.log (Level.INFO, "Callflow: (disable)AsynchProducerQ " +
                    "Consumer thread stopped!");
        }
    
public synchronized voidenable()

        if (!enabled){
            try{
                if (!isThreadInitialized){
                    consumer = null; // sanity
                    setupQs ();
                    consumer = createThread ();
                    consumer.start();
                    isThreadInitialized = true;
                }
            } catch (Exception e){ 
                // if consumer thread is already on. We will get an exception.
                // Should never get here.
                logger.log(Level.INFO, "Callflow - attempting to " +
                        "start asynchronous thread, but it is already on." +
                        "Restart Server !" , e);
            }
        }
        enabled = true;
        if (traceOn){            
            logger.log(Level.INFO, "Callflow: enable(AsyncProducerQ) Consumer" +
                    " thread started!");
        }
    
public voidflush()

        flushQs ();
        forceConsumption ();
    
private voidflushQs()

        if(rsq != null )
           rsq.flush ();
        if (req != null)
         req.flush ();
        if (msq != null)
            msq.flush ();
        if(meq != null)
            meq.flush ();
        if (stq != null)
          stq.flush ();
        if (etq != null)
            etq.flush ();
    
private voidforceConsumption()

        int noOfQueues = consumerQ.size ();
        if (noOfQueues == 0){
            return;
        }
        if (traceOn){
            logger.log(Level.INFO, "Callflow: disable(AsyncHandlerProducer) "+
                    "forcing write of Qs to the handlers");
        }

        Handler[] handler = handlerChain.getHandlers();
        for (int i = 0; i<noOfQueues; i++){
            CallflowProducerQueue q =  (CallflowProducerQueue) consumerQ.poll();
            if (q != null){
                if (traceOn){
                    logger.log(Level.INFO, "Callflow: AsyncHandlerProducer.processProducerQ " +
                            "QName ="+
                            q.getName());
                }
                TransferObject[] to = q.getAndRemoveAll();
                if(to != null){
                    for ( int j=0; j<handler.length; j++){
                        handler[j].handle(to);                        
                    }
                }
            }
                
        }        
    
public static final AsyncHandlerIntfgetInstance()

        if (_singleton == null){
            _singleton = new AsyncHandlerProducer ();
        }
        return _singleton;
    
public voidhandleEndTime(java.lang.String requestId, long timeStamp, ContainerTypeOrApplicationType type)

        EndTimeTO etto = new EndTimeTO();
        etto.setRequestId(requestId);
        etto.setTimeStamp(timeStamp);
        etto.setContainerTypeOrApplicationType(type);
        etq.add(etto);        
        if (traceOn){
            logger.log(Level.INFO, "Callflow: EndTime(AsyncHandlerProducer) id="
                    +requestId);
        }
        
    
public voidhandleMethodEnd(java.lang.String requestId, long timeStamp, java.lang.Throwable exception)

        MethodEndTO meto = new MethodEndTO();
        meto.setRequestId(requestId);
        meto.setTimeStamp(timeStamp);
        meto.setException(((exception == null) ? null : exception.toString()));
        meq.add (meto);
        if (traceOn){
            logger.log(Level.INFO, "Callflow: MethodEnd(AsyncHandlerProducer). " +
                    "id = "+requestId);
        }

    
public voidhandleMethodStart(java.lang.String requestId, long timeStamp, java.lang.String methodName, ComponentType componentType, java.lang.String applicationName, java.lang.String moduleName, java.lang.String componentName, java.lang.String threadId, java.lang.String transactionId, java.lang.String securityId)

        
        MethodStartTO msto =  new MethodStartTO();
        msto.setRequestId(requestId);
        msto.setTimeStamp(timeStamp);
        msto.setMethodName(methodName);
        msto.setComponentType(componentType);
        msto.setAppName(applicationName);
        msto.setModuleName(moduleName);
        msto.setComponentName(componentName);
        msto.setThreadId(threadId);
        msto.setTransactionId(transactionId);
        msto.setSecurityId(securityId);        
        msq.add(msto);        
        if (traceOn){
            logger.log(Level.INFO, "Callflow: MethodStart(AsyncHandlerProducer) " +
                    " id = "+requestId+ " applicationName = "+ applicationName);
        }
        
    
public voidhandleRequestEnd(java.lang.String requestId, long timeStamp)

        RequestEndTO reto = new RequestEndTO();
        reto.setRequestId(requestId);
        reto.setTimeStamp(timeStamp);
        req.add(reto);
        if (traceOn){
            logger.log(Level.INFO, "Callflow: RequestEnd(AsyncHandlerProducer) id="
                    +requestId);
        }
        
    
public voidhandleRequestStart(java.lang.String requestId, long timeStamp, long timeStampMillis, RequestType requestType, java.lang.String callerIPAddress, java.lang.String remoteUser)

        
        RequestStartTO rsto = new RequestStartTO();
        rsto.setRequestId(requestId);
        rsto.setTimeStamp(timeStamp);
        rsto.setTimeStampMillis(timeStampMillis);
        rsto.setRequestType(requestType);
        rsto.setIpAddress(callerIPAddress);
        //rsto.setRemoteUser(remoteUser); // not currently in db schema.
        rsq.add (rsto);
        if (traceOn){
            logger.log(Level.INFO, 
                    "Callflow: RequestStart (AsyncHandlerProducer) id = :"+
                    requestId +
                    " Type =" + requestType.toString());
        }
    
public voidhandleStartTime(java.lang.String requestId, long timeStamp, ContainerTypeOrApplicationType type)

        StartTimeTO stto = new StartTimeTO();
        stto.setRequestId(requestId);
        stto.setTimeStamp(timeStamp);
        stto.setContainerTypeOrApplicationType(type);
        stq.add(stto);        
        if (traceOn){
            logger.log(Level.INFO, "Callflow: StartTime(AsyncHandlerProducer)" +
                    "id = "+requestId);
        }

    
private voidsetupHandlerChain()

        handlerChain = new HandlerChain ();
        handlerChain.addHandler (new DbHandler (DbAccessObjectImpl.getInstance()));
    
private voidsetupQSizes()

        qSize = Math.abs(Integer.getInteger(Q_SIZE_PROPERTY, 1000));
        containerQSize = Math.abs (
                Integer.getInteger(CONTAINER_TIME_Q_SIZE_PROPERTY, 2000));
                
        if(this.containerQSize == containerQSize){// default values, prop not set
            if (qSize == this.qSize){// default values for qSize, not overridden
                // do not override the containerQsize
            } else{
                // container qSize is roughly 2-4 times qSize
                this.containerQSize = qSize * 3;                
            }
        }
        this.qSize = qSize;    
        if (traceOn){
            logger.log(Level.INFO, "Callflow : AsyncHandlerProducer: QSize = " +
                    this.qSize +
                    " Container Time Q Size =" + this.containerQSize);
        }
    
private voidsetupQs()

        rsq = CallflowProducerQueue.getInstance (consumerQ, 
                "RequestStartProducerQ", qSize);
        req = CallflowProducerQueue.getInstance (consumerQ, 
                "RequestEndProducerQ", qSize);
        msq = CallflowProducerQueue.getInstance (consumerQ, 
                "MethodStartProducerQ", qSize);
        meq = CallflowProducerQueue.getInstance (consumerQ, 
                "MethodEndProducerQ", qSize);
        stq = CallflowProducerQueue.getInstance (consumerQ, 
                "StartTimeProducerQ", containerQSize);
        etq = CallflowProducerQueue.getInstance (consumerQ, "EndTimeProducerQ", 
                containerQSize);