Methods Summary |
---|
public synchronized long | addAndGetCurrentSize(long inc)
currentSize += inc;
return currentSize;
|
public boolean | addToQueue(org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.group.InterceptorPayload payload)
return queue.add(msg,destination,payload);
|
public long | getCurrentSize()
return currentSize;
|
public long | getMaxQueueSize()
return maxQueueSize;
|
public boolean | getUseDeepClone()
return useDeepClone;
|
public org.apache.catalina.tribes.transport.bio.util.LinkObject | removeFromQueue()
return queue.remove();
|
public void | run()
while ( run ) {
LinkObject link = removeFromQueue();
if ( link == null ) continue; //should not happen unless we exceed wait time
while ( link != null && run ) {
link = sendAsyncData(link);
}//while
}//while
|
protected org.apache.catalina.tribes.transport.bio.util.LinkObject | sendAsyncData(org.apache.catalina.tribes.transport.bio.util.LinkObject link)
ChannelMessage msg = link.data();
Member[] destination = link.getDestination();
try {
super.sendMessage(destination,msg,null);
try {
if ( link.getHandler() != null ) link.getHandler().handleCompletion(new UniqueId(msg.getUniqueId()));
} catch ( Exception ex ) {
log.error("Unable to report back completed message.",ex);
}
} catch ( Exception x ) {
ChannelException cx = null;
if ( x instanceof ChannelException ) cx = (ChannelException)x;
else cx = new ChannelException(x);
if ( log.isDebugEnabled() ) log.debug("Error while processing async message.",x);
try {
if (link.getHandler() != null) link.getHandler().handleError(cx, new UniqueId(msg.getUniqueId()));
} catch ( Exception ex ) {
log.error("Unable to report back error message.",ex);
}
} finally {
addAndGetCurrentSize(-msg.getMessage().getLength());
link = link.next();
}//try
return link;
|
public void | sendMessage(org.apache.catalina.tribes.Member[] destination, org.apache.catalina.tribes.ChannelMessage msg, org.apache.catalina.tribes.group.InterceptorPayload payload)
boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS;
if ( async && run ) {
if ( (getCurrentSize()+msg.getMessage().getLength()) > maxQueueSize ) {
if ( alwaysSend ) {
super.sendMessage(destination,msg,payload);
return;
} else {
throw new ChannelException("Asynchronous queue is full, reached its limit of " + maxQueueSize +" bytes, current:" + getCurrentSize() + " bytes.");
}//end if
}//end if
//add to queue
if ( useDeepClone ) msg = (ChannelMessage)msg.deepclone();
if (!addToQueue(msg, destination, payload) ) {
throw new ChannelException("Unable to add the message to the async queue, queue bug?");
}
addAndGetCurrentSize(msg.getMessage().getLength());
} else {
super.sendMessage(destination, msg, payload);
}
|
public synchronized long | setAndGetCurrentSize(long value)
currentSize = value;
return value;
|
public void | setMaxQueueSize(long maxQueueSize)
this.maxQueueSize = maxQueueSize;
|
public void | setOptionFlag(int flag)
if ( flag != Channel.SEND_OPTIONS_ASYNCHRONOUS ) log.warn("Warning, you are overriding the asynchronous option flag, this will disable the Channel.SEND_OPTIONS_ASYNCHRONOUS that other apps might use.");
super.setOptionFlag(flag);
|
public void | setUseDeepClone(boolean useDeepClone)
this.useDeepClone = useDeepClone;
|
public void | start(int svc)
//start the thread
if (!run ) {
synchronized (this) {
if ( !run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ) ) {//only start with the sender
startQueue();
}//end if
}//sync
}//end if
super.start(svc);
|
public void | startQueue()
msgDispatchThread = new Thread(this);
msgDispatchThread.setName("MessageDispatchInterceptor.MessageDispatchThread");
msgDispatchThread.setDaemon(true);
msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
queue.setEnabled(true);
run = true;
msgDispatchThread.start();
|
public void | stop(int svc)
//stop the thread
if ( run ) {
synchronized (this) {
if ( run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ)) {
stopQueue();
}//end if
}//sync
}//end if
super.stop(svc);
|
public void | stopQueue()
run = false;
msgDispatchThread.interrupt();
queue.setEnabled(false);
setAndGetCurrentSize(0);
|