NioReplicationTaskpublic class NioReplicationTask extends org.apache.catalina.tribes.transport.AbstractRxTask A worker thread class which can drain channels and echo-back the input. Each
instance is constructed with a reference to the owning thread pool object.
When started, the thread loops forever waiting to be awakened to service the
channel associated with a SelectionKey object. The worker is tasked by
calling its serviceChannel() method with a SelectionKey object. The
serviceChannel() method stores the key reference in the thread object then
calls notify() to wake it up. When the channel has been drained, the worker
thread returns itself to its parent pool. |
Fields Summary |
---|
private static org.apache.juli.logging.Log | log | private ByteBuffer | buffer | private SelectionKey | key | private int | rxBufSize | private NioReceiver | receiver |
Methods Summary |
---|
private void | cancelKey(java.nio.channels.SelectionKey key)
if ( log.isTraceEnabled() )
log.trace("Adding key for cancel event:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) {
reader.setCancelled(true);
reader.finish();
}
Runnable cx = new Runnable() {
public void run() {
if ( log.isTraceEnabled() )
log.trace("Cancelling key:"+key);
NioReceiver.cancelledKey(key);
}
};
receiver.addEvent(cx);
| protected void | drainChannel(java.nio.channels.SelectionKey key, org.apache.catalina.tribes.io.ObjectReader reader)The actual code which drains the channel associated with
the given key. This method assumes the key has been
modified prior to invocation to turn off selection
interest in OP_READ. When this method completes it
re-enables OP_READ and calls wakeup() on the selector
so the selector will resume watching this channel.
reader.setLastAccess(System.currentTimeMillis());
reader.access();
SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear(); // make buffer empty
// loop while data available, channel is non-blocking
while ((count = channel.read (buffer)) > 0) {
buffer.flip(); // make buffer readable
if ( buffer.hasArray() )
reader.append(buffer.array(),0,count,false);
else
reader.append(buffer,count,false);
buffer.clear(); // make buffer empty
//do we have at least one package?
if ( reader.hasPackage() ) break;
}
int pkgcnt = reader.count();
if (count < 0 && pkgcnt == 0 ) {
//end of stream, and no more packages to process
remoteEof(key);
return;
}
ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks
for ( int i=0; i<msgs.length; i++ ) {
/**
* Use send ack here if you want to ack the request to the remote
* server before completing the request
* This is considered an asynchronized request
*/
if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
try {
if ( Logs.MESSAGES.isTraceEnabled() ) {
try {
Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
}catch ( Throwable t ) {}
}
//process the message
getCallback().messageDataReceived(msgs[i]);
/**
* Use send ack here if you want the request to complete on this
* server before sending the ack to the remote server
* This is considered a synchronized request
*/
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
}catch ( RemoteProcessException e ) {
if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
}catch ( Exception e ) {
log.error("Processing of cluster message failed.",e);
if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
}
if ( getUseBufferPool() ) {
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
msgs[i].setMessage(null);
}
}
if (count < 0) {
remoteEof(key);
return;
}
| public int | getRxBufSize()
return rxBufSize;
| protected void | registerForRead(java.nio.channels.SelectionKey key, org.apache.catalina.tribes.io.ObjectReader reader)
if ( log.isTraceEnabled() )
log.trace("Adding key for read event:"+key);
reader.finish();
//register our OP_READ interest
Runnable r = new Runnable() {
public void run() {
try {
if (key.isValid()) {
// cycle the selector so this key is active again
key.selector().wakeup();
// resume interest in OP_READ, OP_WRITE
int resumeOps = key.interestOps() | SelectionKey.OP_READ;
key.interestOps(resumeOps);
if ( log.isTraceEnabled() )
log.trace("Registering key for read:"+key);
}
} catch (CancelledKeyException ckx ) {
NioReceiver.cancelledKey(key);
if ( log.isTraceEnabled() )
log.trace("CKX Cancelling key:"+key);
} catch (Exception x) {
log.error("Error registering key for read:"+key,x);
}
}
};
receiver.addEvent(r);
| private void | remoteEof(java.nio.channels.SelectionKey key)
// close channel on EOF, invalidates the key
if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
cancelKey(key);
| public synchronized void | run()
if ( buffer == null ) {
if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) {
buffer = ByteBuffer.allocateDirect(getRxBufSize());
} else {
buffer = ByteBuffer.allocate(getRxBufSize());
}
} else {
buffer.clear();
}
if (key == null) {
return; // just in case
}
if ( log.isTraceEnabled() )
log.trace("Servicing key:"+key);
try {
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader == null ) {
if ( log.isTraceEnabled() )
log.trace("No object reader, cancelling:"+key);
cancelKey(key);
} else {
if ( log.isTraceEnabled() )
log.trace("Draining channel:"+key);
drainChannel(key, reader);
}
} catch (Exception e) {
//this is common, since the sockets on the other
//end expire after a certain time.
if ( e instanceof CancelledKeyException ) {
//do nothing
} else if ( e instanceof IOException ) {
//dont spew out stack traces for IO exceptions unless debug is enabled.
if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
} else if ( log.isErrorEnabled() ) {
//this is a real error, log it.
log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
}
cancelKey(key);
} finally {
}
key = null;
// done, ready for more, return to pool
getTaskPool().returnWorker (this);
| protected void | sendAck(java.nio.channels.SelectionKey key, java.nio.channels.SocketChannel channel, byte[] command)send a reply-acknowledgement (6,2,3)
try {
ByteBuffer buf = ByteBuffer.wrap(command);
int total = 0;
while ( total < command.length ) {
total += channel.write(buf);
}
if (log.isTraceEnabled()) {
log.trace("ACK sent to " + channel.socket().getPort());
}
} catch ( java.io.IOException x ) {
log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
}
| public synchronized void | serviceChannel(java.nio.channels.SelectionKey key)Called to initiate a unit of work by this worker thread
on the provided SelectionKey object. This method is
synchronized, as is the run() method, so only one key
can be serviced at a given time.
Before waking the worker thread, and before returning
to the main selection loop, this key's interest set is
updated to remove OP_READ. This will cause the selector
to ignore read-readiness for this channel while the
worker thread is servicing it.
if ( log.isTraceEnabled() ) log.trace("About to service key:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
this.key = key;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
| public void | setRxBufSize(int rxBufSize)
this.rxBufSize = rxBufSize;
|
|