Fields Summary |
---|
private static final Logger | logger |
public static final String | Q_SIZE_PROPERTYSets the size of the Q's handling each of the tables. This
can be superseded for (END|START)Time table property
CONTAINER_TIME_PROPERTY (Since END|START Time table roughly fills
2-3x faster than other table |
private int | qSize |
public static final String | CONTAINER_TIME_Q_SIZE_PROPERTY |
private int | containerQSize |
public static AsyncHandlerProducer | _singleton |
CallflowProducerQueue | rsq |
CallflowProducerQueue | req |
CallflowProducerQueue | msq |
CallflowProducerQueue | meq |
CallflowProducerQueue | stq |
CallflowProducerQueue | etq |
boolean | traceOn |
private static final BlockingQueue | consumerQThe ConsumerQ. This queue is populated by instances of the ProducerQ.
The consumer thread goes over this Q and processes the Q to write
information to the database. |
HandlerChain | handlerChain |
private ConsumerQProcessor | consumerThread |
private Thread | consumer |
boolean | isThreadInitialized |
private boolean | enabled |
Methods Summary |
---|
private void | clearQs()
rsq = null;
req = null;
msq = null;
meq = null;
stq = null;
etq = null;
|
private java.lang.Thread | createThread()
Thread t = new Thread (consumerThread, "CallFlow Async Consumer");
t.setDaemon(true);
return t;
|
public synchronized void | disable()
if (enabled){
flushQs ();
consumerThread.stopConsumerThread();
isThreadInitialized = false;
enabled = false;
forceConsumption();
clearQs ();
}
if (traceOn){
logger.log (Level.INFO, "Callflow: (disable)AsynchProducerQ " +
"Consumer thread stopped!");
}
|
public synchronized void | enable()
if (!enabled){
try{
if (!isThreadInitialized){
consumer = null; // sanity
setupQs ();
consumer = createThread ();
consumer.start();
isThreadInitialized = true;
}
} catch (Exception e){
// if consumer thread is already on. We will get an exception.
// Should never get here.
logger.log(Level.INFO, "Callflow - attempting to " +
"start asynchronous thread, but it is already on." +
"Restart Server !" , e);
}
}
enabled = true;
if (traceOn){
logger.log(Level.INFO, "Callflow: enable(AsyncProducerQ) Consumer" +
" thread started!");
}
|
public void | flush()
flushQs ();
forceConsumption ();
|
private void | flushQs()
if(rsq != null )
rsq.flush ();
if (req != null)
req.flush ();
if (msq != null)
msq.flush ();
if(meq != null)
meq.flush ();
if (stq != null)
stq.flush ();
if (etq != null)
etq.flush ();
|
private void | forceConsumption()
int noOfQueues = consumerQ.size ();
if (noOfQueues == 0){
return;
}
if (traceOn){
logger.log(Level.INFO, "Callflow: disable(AsyncHandlerProducer) "+
"forcing write of Qs to the handlers");
}
Handler[] handler = handlerChain.getHandlers();
for (int i = 0; i<noOfQueues; i++){
CallflowProducerQueue q = (CallflowProducerQueue) consumerQ.poll();
if (q != null){
if (traceOn){
logger.log(Level.INFO, "Callflow: AsyncHandlerProducer.processProducerQ " +
"QName ="+
q.getName());
}
TransferObject[] to = q.getAndRemoveAll();
if(to != null){
for ( int j=0; j<handler.length; j++){
handler[j].handle(to);
}
}
}
}
|
public static final AsyncHandlerIntf | getInstance()
if (_singleton == null){
_singleton = new AsyncHandlerProducer ();
}
return _singleton;
|
public void | handleEndTime(java.lang.String requestId, long timeStamp, ContainerTypeOrApplicationType type)
EndTimeTO etto = new EndTimeTO();
etto.setRequestId(requestId);
etto.setTimeStamp(timeStamp);
etto.setContainerTypeOrApplicationType(type);
etq.add(etto);
if (traceOn){
logger.log(Level.INFO, "Callflow: EndTime(AsyncHandlerProducer) id="
+requestId);
}
|
public void | handleMethodEnd(java.lang.String requestId, long timeStamp, java.lang.Throwable exception)
MethodEndTO meto = new MethodEndTO();
meto.setRequestId(requestId);
meto.setTimeStamp(timeStamp);
meto.setException(((exception == null) ? null : exception.toString()));
meq.add (meto);
if (traceOn){
logger.log(Level.INFO, "Callflow: MethodEnd(AsyncHandlerProducer). " +
"id = "+requestId);
}
|
public void | handleMethodStart(java.lang.String requestId, long timeStamp, java.lang.String methodName, ComponentType componentType, java.lang.String applicationName, java.lang.String moduleName, java.lang.String componentName, java.lang.String threadId, java.lang.String transactionId, java.lang.String securityId)
MethodStartTO msto = new MethodStartTO();
msto.setRequestId(requestId);
msto.setTimeStamp(timeStamp);
msto.setMethodName(methodName);
msto.setComponentType(componentType);
msto.setAppName(applicationName);
msto.setModuleName(moduleName);
msto.setComponentName(componentName);
msto.setThreadId(threadId);
msto.setTransactionId(transactionId);
msto.setSecurityId(securityId);
msq.add(msto);
if (traceOn){
logger.log(Level.INFO, "Callflow: MethodStart(AsyncHandlerProducer) " +
" id = "+requestId+ " applicationName = "+ applicationName);
}
|
public void | handleRequestEnd(java.lang.String requestId, long timeStamp)
RequestEndTO reto = new RequestEndTO();
reto.setRequestId(requestId);
reto.setTimeStamp(timeStamp);
req.add(reto);
if (traceOn){
logger.log(Level.INFO, "Callflow: RequestEnd(AsyncHandlerProducer) id="
+requestId);
}
|
public void | handleRequestStart(java.lang.String requestId, long timeStamp, long timeStampMillis, RequestType requestType, java.lang.String callerIPAddress, java.lang.String remoteUser)
RequestStartTO rsto = new RequestStartTO();
rsto.setRequestId(requestId);
rsto.setTimeStamp(timeStamp);
rsto.setTimeStampMillis(timeStampMillis);
rsto.setRequestType(requestType);
rsto.setIpAddress(callerIPAddress);
//rsto.setRemoteUser(remoteUser); // not currently in db schema.
rsq.add (rsto);
if (traceOn){
logger.log(Level.INFO,
"Callflow: RequestStart (AsyncHandlerProducer) id = :"+
requestId +
" Type =" + requestType.toString());
}
|
public void | handleStartTime(java.lang.String requestId, long timeStamp, ContainerTypeOrApplicationType type)
StartTimeTO stto = new StartTimeTO();
stto.setRequestId(requestId);
stto.setTimeStamp(timeStamp);
stto.setContainerTypeOrApplicationType(type);
stq.add(stto);
if (traceOn){
logger.log(Level.INFO, "Callflow: StartTime(AsyncHandlerProducer)" +
"id = "+requestId);
}
|
private void | setupHandlerChain()
handlerChain = new HandlerChain ();
handlerChain.addHandler (new DbHandler (DbAccessObjectImpl.getInstance()));
|
private void | setupQSizes()
qSize = Math.abs(Integer.getInteger(Q_SIZE_PROPERTY, 1000));
containerQSize = Math.abs (
Integer.getInteger(CONTAINER_TIME_Q_SIZE_PROPERTY, 2000));
if(this.containerQSize == containerQSize){// default values, prop not set
if (qSize == this.qSize){// default values for qSize, not overridden
// do not override the containerQsize
} else{
// container qSize is roughly 2-4 times qSize
this.containerQSize = qSize * 3;
}
}
this.qSize = qSize;
if (traceOn){
logger.log(Level.INFO, "Callflow : AsyncHandlerProducer: QSize = " +
this.qSize +
" Container Time Q Size =" + this.containerQSize);
}
|
private void | setupQs()
rsq = CallflowProducerQueue.getInstance (consumerQ,
"RequestStartProducerQ", qSize);
req = CallflowProducerQueue.getInstance (consumerQ,
"RequestEndProducerQ", qSize);
msq = CallflowProducerQueue.getInstance (consumerQ,
"MethodStartProducerQ", qSize);
meq = CallflowProducerQueue.getInstance (consumerQ,
"MethodEndProducerQ", qSize);
stq = CallflowProducerQueue.getInstance (consumerQ,
"StartTimeProducerQ", containerQSize);
etq = CallflowProducerQueue.getInstance (consumerQ, "EndTimeProducerQ",
containerQSize);
|