FileDocCategorySizeDatePackage
DefaultReadTask.javaAPI DocGlassfish v2 API16373Fri May 04 22:37:04 BST 2007com.sun.enterprise.web.connector.grizzly

DefaultReadTask

public class DefaultReadTask extends TaskBase implements ReadTask
Read available data on a non blocking SocketChannel. StreamAlgorithm stategy will decide if more bytes are required or not. Once the StreamAlgorithm is ready, the ProcessorTask attached to this class will be executed.
author
Scott Oaks
author
Jean-Francois Arcand

Fields Summary
private long
idleTime
The time in milliseconds before this Task can stay idle. This is only used when the SelectionKey.attach() is used.
protected TaskContext
taskContext
The TaskContext instance associated with this object. The TaskContext is initialized at startup and then recycled.
protected TaskEvent
taskEvent
The TaskEvent instance used by this object to notify its listeners
protected ByteBuffer
byteBuffer
The ByteBuffer used by this task to buffer the request stream.
protected ProcessorTask
processorTask
The ProcessorTask used by this class.
protected int
maxPostSize
Max post size.
protected ByteBufferInputStream
inputStream
The recycled OutputStream used by this buffer.
protected StreamAlgorithm
algorithm
The Algorithm used to parse the request and determine of the bytes has been all read from the SocketChannel
protected boolean
bytesAvailable
true only when another object has already read bytes from the channel.
protected boolean
useDirectByteBuffer
Is the ByteBuffer used by the ReadTask use direct ByteBuffer or not.
protected boolean
useByteBufferView
Create view ByteBuffer from another ByteBuffer
Constructors Summary
public DefaultReadTask()



    // ----------------------------------------------------- Constructor ----/

    
     
        ;//
    
Methods Summary
public voidattachProcessor(ProcessorTask processorTask)
Force this task to always use the same ProcessorTask instance.

        this.processorTask = processorTask;        
        configureProcessorTask();
    
protected voidconfigureProcessorTask()
Set appropriate attribute on the ProcessorTask.

        // Disable blocking keep-alive mechanism. Keep-Alive mechanism
        // will be managed by this class instead.
        processorTask.setSelectionKey(key);
        processorTask.setSocket(((SocketChannel)key.channel()).socket());
        processorTask.setHandler(algorithm.getHandler());
    
public voiddetachProcessor()
Return the ProcessorTask to the pool.

        if (processorTask != null){
            processorTask.recycle();           
        }
        
        // Notify listeners
        if ( listeners != null ) {
            for (int i=listeners.size()-1; i > -1; i--){
                if ( taskEvent == null ) {
                    taskEvent = new TaskEvent<ReadTask>();
                }
                taskEvent.attach(this);            
                taskEvent.setStatus(TaskEvent.COMPLETED);           
                listeners.get(i).taskEvent(taskEvent);
            }                
            clearTaskListeners();
        }
        
        if (recycle && processorTask != null){
            selectorThread.returnTask(processorTask);
            processorTask = null;
        }
    
public voiddoTask()
Read data from the socket and process it using this thread, and only if the StreamAlgorith stategy determine no more bytes are are needed.

   
        if ( byteBuffer == null) {
            WorkerThread workerThread = (WorkerThread)Thread.currentThread();
            byteBuffer = workerThread.getByteBuffer();

            if ( workerThread.getByteBuffer() == null){
                byteBuffer = algorithm.allocate(useDirectByteBuffer,
                        useByteBufferView,selectorThread.getBufferSize());
                workerThread.setByteBuffer(byteBuffer);
            }
        }
        doTask(byteBuffer);
    
protected voiddoTask(java.nio.ByteBuffer byteBuffer)
Pull data from the socket and store it inside the ByteBuffer. The StreamAlgorithM implementation will take care of determining if we need to register again to the main Selector or execute the request using temporary Selector

param
byteBuffer

        int count = 0;
        Socket socket = null;
        SocketChannel socketChannel = null;
        boolean keepAlive = false;
        Exception exception = null;
        key.attach(null);
     
        try {
            socketChannel = (SocketChannel)key.channel();
            socket = socketChannel.socket();
            algorithm.setSocketChannel(socketChannel);            
           
            int loop = 0;
            int bufferSize = 0;
            while ( socketChannel.isOpen() && (bytesAvailable || 
                    ((count = socketChannel.read(byteBuffer))> -1))){

                // Avoid calling the Selector.
                if ( count == 0 && !bytesAvailable){
                    loop++;
                    if (loop > 2){
                        break;
                    }
                    continue;
                }
                if (bytesAvailable){
                    count = byteBuffer.position();
                }
                bytesAvailable = false;
                
                byteBuffer = algorithm.preParse(byteBuffer);
                inputStream.setByteBuffer(byteBuffer);
                inputStream.setSelectionKey(key);
                
                // try to predict which HTTP method we are processing
                if ( algorithm.parse(byteBuffer) ){ 
                    keepAlive = executeProcessorTask();
                    if (!keepAlive) {
                        break;
                    }
                } else {
                    // We must call the Selector since we don't have all the 
                    // bytes
                    keepAlive = true;
                }
            } 
        // Catch IO AND NIO exception
        } catch (IOException ex) {
            exception = ex;
        } catch (RuntimeException ex) {
            exception = ex;    
        } finally {                   
            manageKeepAlive(keepAlive,count,exception);
        }
    
public booleanexecuteProcessorTask()
Execute the ProcessorTask only if the request has been fully read. Guest the size of the request by using the content-type HTTP headers.

return
false if the request wasn't fully read by the channel. so we need to respin the key on the Selector.

                  
        boolean registerKey = false;
        
        if (SelectorThread.logger().isLoggable(Level.FINEST))
            SelectorThread.logger().log(Level.FINEST,"executeProcessorTask");
        
        if (  algorithm.getHandler() != null && algorithm.getHandler()
                .handle(null, Handler.REQUEST_BUFFERED) == Handler.BREAK ){
            return true;
        }
        
        // Get a processor task. If the processorTask != null, that means we
        // failed to load all the bytes in a single channel.read().
        if (processorTask == null){
            attachProcessor(selectorThread.getProcessorTask());
        }
        
        try {           
            // The socket might not have been read entirely and the parsing
            // will fail, so we need to respin another event.
            registerKey = processorTask.process(inputStream,null);           
        } catch (Exception e) {
            SelectorThread.logger()
                .log(Level.SEVERE,"readTask.processException", e);
        } 
        detachProcessor();
        return registerKey;
    
protected voidfinishConnection()
Cancel the SelectionKey and close its underlying SocketChannel. Add this Task to the Keep-Alive sub-system.

        
        if (SelectorThread.logger().isLoggable(Level.FINEST))
            SelectorThread.logger().log(Level.FINEST,"finishConnection"); 
        
        try{
            if (taskContext != null){
                taskContext.recycle();
            }
        } catch (IOException ioe){
            ;
        }
        
        selectorThread.cancelKey(key);
    
public java.nio.ByteBuffergetByteBuffer()
Return the underlying ByteBuffer used by this class.

        if ( byteBuffer == null) {
            byteBuffer = algorithm.allocate(useDirectByteBuffer,
                    useByteBufferView,selectorThread.getBufferSize());
        }
        return byteBuffer;
    
public longgetIdleTime()
Return the time in milliseconds this Task is allowed to be idle.

        return idleTime;
    
public ProcessorTaskgetProcessorTask()
Return the associated ProcessorTask.

return
the associated ProcessorTask, null if not used.

        return processorTask;
    
public voidinitialize(StreamAlgorithm algorithm, boolean useDirectByteBuffer, boolean useByteBufferView)

        type = READ_TASK;    
        this.algorithm = algorithm;       
        inputStream = new ByteBufferInputStream();
        
        this.useDirectByteBuffer = useDirectByteBuffer;
        this.useByteBufferView = useByteBufferView;
    
protected voidmanageKeepAlive(boolean keepAlive, int count, java.lang.Exception exception)
Evaluate if the SelectionKey needs to be registered to the main Selector

         

        // The key is invalid when the Task has been cancelled.
        if ( count == -1 || !key.isValid() || exception != null ){
            keepAlive = false;
            
            if ( exception != null){
                // Make sure we have detached the processorTask
                detachProcessor();
                if (SelectorThread.logger().isLoggable(Level.FINE)){
                    SelectorThread.logger().log
                       (Level.FINE, 
                           "SocketChannel Read Exception:",exception);
                }
            }
        }

        if (keepAlive) {    
            registerKey(); 
        } 
            
        terminate(keepAlive);
    
public voidrecycle()
Clear the current state and make this object ready for another request.

        if (byteBuffer != null){ 
            try{
                final WorkerThread workerThread = 
                        (WorkerThread)Thread.currentThread();   
                if (workerThread.getByteBuffer() == null){
                    workerThread.setByteBuffer(byteBuffer);
                }
            } catch (ClassCastException ex){
                // Avoid failling if the Grizzly extension doesn't support
                // the WorkerThread interface.
                if (SelectorThread.logger().isLoggable(Level.FINEST))
                    SelectorThread.logger().log(Level.FINEST,"recycle",ex);                
            } finally{
                byteBuffer = algorithm.postParse(byteBuffer);   
                byteBuffer.clear();                 
            }
        }
        inputStream.recycle();
        algorithm.recycle();
        key = null;
        inputStream.setSelectionKey(null);       
        byteBuffer = null;
    
public voidregisterKey()
Register the SelectionKey with the Selector

        if (key.isValid()){            
            if (SelectorThread.logger().isLoggable(Level.FINEST))
                SelectorThread.logger().log(Level.FINEST,"registerKey");           

            selectorThread.registerKey(key);
        } 
    
protected voidreturnTask()
Return this object to the pool

        if (recycle) {
            recycle();
            selectorThread.returnTask(this);
        } 
    
public voidsetByteBuffer(java.nio.ByteBuffer byteBuffer)
Set the underlying ByteBuffer used by this class.

        this.byteBuffer = byteBuffer;
    
public voidsetBytesAvailable(boolean bytesAvailable)
If the attached byteBuffer was already filled, tell the Algorithm to re-use the bytes.

        this.bytesAvailable = bytesAvailable;
    
public voidsetIdleTime(long idleTime)
Set the time in milliseconds this Task is allowed to be idle.

        this.idleTime = idleTime;
    
public voidtaskEvent(TaskEvent event)

        if (event.getStatus() == TaskEvent.COMPLETED 
                || event.getStatus() == TaskEvent.ERROR){
            terminate(processorTask.isKeepAlive());
        }  
    
public voidterminate(boolean keepAlive)
Complete the processing.

          
        if ( !keepAlive ){
            finishConnection();
        }
        returnTask();