BioReplicationTaskpublic class BioReplicationTask extends org.apache.catalina.tribes.transport.AbstractRxTask A worker thread class which can drain channels and echo-back the input. Each
instance is constructed with a reference to the owning thread pool object.
When started, the thread loops forever waiting to be awakened to service the
channel associated with a SelectionKey object. The worker is tasked by
calling its serviceChannel() method with a SelectionKey object. The
serviceChannel() method stores the key reference in the thread object then
calls notify() to wake it up. When the channel has been drained, the worker
thread returns itself to its parent pool. |
Fields Summary |
---|
protected static org.apache.juli.logging.Log | log | protected Socket | socket | protected org.apache.catalina.tribes.io.ObjectReader | reader |
Methods Summary |
---|
public void | close()
setDoRun(false);
try {socket.close();}catch ( Exception ignore){}
try {reader.close();}catch ( Exception ignore){}
reader = null;
socket = null;
super.close();
| protected void | drainSocket()The actual code which drains the channel associated with
the given key. This method assumes the key has been
modified prior to invocation to turn off selection
interest in OP_READ. When this method completes it
re-enables OP_READ and calls wakeup() on the selector
so the selector will resume watching this channel.
InputStream in = socket.getInputStream();
// loop while data available, channel is non-blocking
byte[] buf = new byte[1024];
int length = in.read(buf);
while ( length >= 0 ) {
int count = reader.append(buf,0,length,true);
if ( count > 0 ) execute(reader);
length = in.read(buf);
}
| protected void | execute(org.apache.catalina.tribes.io.ObjectReader reader)
int pkgcnt = reader.count();
if ( pkgcnt > 0 ) {
ChannelMessage[] msgs = reader.execute();
for ( int i=0; i<msgs.length; i++ ) {
/**
* Use send ack here if you want to ack the request to the remote
* server before completing the request
* This is considered an asynchronized request
*/
if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
try {
//process the message
getCallback().messageDataReceived(msgs[i]);
/**
* Use send ack here if you want the request to complete on this
* server before sending the ack to the remote server
* This is considered a synchronized request
*/
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
}catch ( Exception x ) {
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
log.error("Error thrown from messageDataReceived.",x);
}
if ( getUseBufferPool() ) {
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
msgs[i].setMessage(null);
}
}
}
| public synchronized void | run()
if ( socket == null ) return;
try {
drainSocket();
} catch ( Exception x ) {
log.error("Unable to service bio socket");
}finally {
try {socket.close();}catch ( Exception ignore){}
try {reader.close();}catch ( Exception ignore){}
reader = null;
socket = null;
}
// done, ready for more, return to pool
if ( getTaskPool() != null ) getTaskPool().returnWorker (this);
| protected void | sendAck(byte[] command)send a reply-acknowledgement (6,2,3)
try {
OutputStream out = socket.getOutputStream();
out.write(command);
out.flush();
if (log.isTraceEnabled()) {
log.trace("ACK sent to " + socket.getPort());
}
} catch ( java.io.IOException x ) {
log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
}
| public synchronized void | serviceSocket(java.net.Socket socket, org.apache.catalina.tribes.io.ObjectReader reader)
this.socket = socket;
this.reader = reader;
this.notify(); // awaken the thread
|
|