XAReadTaskpublic class XAReadTask extends com.sun.enterprise.web.connector.grizzly.DefaultReadTask Read available data on a non blocking SocketChannel .
StreamAlgorithm stategy will decide if more bytes are required
or not. Once the StreamAlgorithm is ready, the
ProcessorTask attached to this class will be executed. |
Fields Summary |
---|
protected boolean | inKeepAliveProcessIf the last request was processed successfully and we need to
keep-alive the connection, unattach thsi object from the
SelectionKey and return it to the pool. |
Methods Summary |
---|
public void | doTask()Read data from the socket and process it using this thread, and only if
the StreamAlgorith stategy determine no more bytes are
are needed.
int count = 0;
Socket socket = null;
SocketChannel socketChannel = null;
boolean keepAlive = false;
Exception exception = null;
key.attach(null);
if ( byteBuffer == null ){
byteBuffer = algorithm
.allocate(useDirectByteBuffer,useByteBufferView
,selectorThread.getBufferSize());
}
try {
inKeepAliveProcess = true;
socketChannel = (SocketChannel)key.channel();
socket = socketChannel.socket();
algorithm.setSocketChannel(socketChannel);
int loop = 0;
int bufferSize = 0;
while ( socketChannel.isOpen() && (bytesAvailable ||
((count = socketChannel.read(byteBuffer))> -1))){
// Avoid calling the Selector.
if ( count == 0 && !bytesAvailable){
loop++;
if (loop > 2){
break;
}
continue;
}
if (bytesAvailable){
count = byteBuffer.position();
}
bytesAvailable = false;
byteBuffer = algorithm.preParse(byteBuffer);
inputStream.setByteBuffer(byteBuffer);
inputStream.setSelectionKey(key);
// try to predict which HTTP method we are processing
if ( algorithm.parse(byteBuffer) ){
keepAlive = executeProcessorTask();
if (!keepAlive) {
inKeepAliveProcess = false;
break;
}
} else {
// We must call the Selector since we don't have all the
// bytes
keepAlive = true;
// We should not detach this task from the SelectionKey
// since the bytes weren't all read.
inKeepAliveProcess = false;
break;
}
// If the content-length is found, and if it higher than
// maxPostSize, cancel the task to avoid DoS.
if ( algorithm.contentLength() > maxPostSize ){
cancelTask("Maximum POST size reached: " + maxPostSize,
HtmlHelper.OK);
keepAlive = false;
break;
}
}
// Catch IO AND NIO exception
} catch (IOException ex) {
exception = ex;
} catch (RuntimeException ex) {
exception = ex;
} finally {
manageKeepAlive(keepAlive,count,exception);
}
| public boolean | executeProcessorTask()Execute the ProcessorTask only if the request has
been fully read. Guest the size of the request by using the
content-type HTTP headers.
boolean registerKey = false;
if (SelectorThread.logger().isLoggable(Level.FINEST))
SelectorThread.logger().log(Level.FINEST,"executeProcessorTask");
if ( algorithm.getHandler() != null && algorithm.getHandler()
.handle(null, Handler.REQUEST_BUFFERED) == Handler.BREAK ){
return true;
}
// Get a processor task. If the processorTask != null, that means we
// failed to load all the bytes in a single channel.read().
if (processorTask == null){
attachProcessor(selectorThread.getProcessorTask());
}
try {
// The socket might not have been read entirely and the parsing
// will fail, so we need to respin another event.
registerKey = processorTask.process(inputStream,null);
} catch (Exception e) {
SelectorThread.logger().log(Level.SEVERE,"readTask.processException", e);
registerKey = true;
} finally {
// if registerKey, that means we were'nt able to parse the request
// properly because the bytes were not all read, so we need to
// call again the Selector.
if (registerKey && processorTask.isError()) {
byteBuffer = algorithm.rollbackParseState(byteBuffer);
inKeepAliveProcess = false;
return registerKey;
}
}
detachProcessor();
return registerKey;
| protected void | manageKeepAlive(boolean keepAlive, int count, java.lang.Exception exception)
// The key is invalid when the Task has been cancelled.
if ( count == -1 || !key.isValid() || exception != null ){
inKeepAliveProcess = false;
keepAlive = false;
if ( exception != null){
// Make sure we have detached the processorTask
detachProcessor();
SelectorThread.logger().log(Level.FINEST,
"SocketChannel Read Exception: ",exception);
}
}
final boolean attached = !inKeepAliveProcess;
if (keepAlive) {
// (1) First remove the attachement.
if ( inKeepAliveProcess ) {
key.attach(null);
} else {
key.attach(this);
}
// (2) Register the key
registerKey();
// We must return since the key has been registered and this
// task can be reused.
if ( attached ) return;
// (3) Return that task to the pool.
if ( inKeepAliveProcess ) {
terminate(keepAlive);
}
} else {
terminate(keepAlive);
}
| public void | recycle()Clear the current state and make this object ready for another request.
byteBuffer = algorithm.postParse(byteBuffer);
byteBuffer.clear();
inputStream.recycle();
algorithm.recycle();
key = null;
inputStream.setSelectionKey(null);
| public void | terminate(boolean keepAlive)Complete the processing.
super.terminate(keepAlive);
|
|