FileDocCategorySizeDatePackage
SelectorThread.javaAPI DocGlassfish v2 API87344Mon Aug 27 15:07:16 BST 2007com.sun.enterprise.web.connector.grizzly

SelectorThread.java

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 * 
 * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
 * 
 * The contents of this file are subject to the terms of either the GNU
 * General Public License Version 2 only ("GPL") or the Common Development
 * and Distribution License("CDDL") (collectively, the "License").  You
 * may not use this file except in compliance with the License. You can obtain
 * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 * language governing permissions and limitations under the License.
 * 
 * When distributing the software, include this License Header Notice in each
 * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 * Sun designates this particular file as subject to the "Classpath" exception
 * as provided by Sun in the GPL Version 2 section of the License file that
 * accompanied this code.  If applicable, add the following below the License
 * Header, with the fields enclosed by brackets [] replaced by your own
 * identifying information: "Portions Copyrighted [year]
 * [name of copyright owner]"
 * 
 * Contributor(s):
 * 
 * If you wish your version of this file to be governed by only the CDDL or
 * only the GPL Version 2, indicate your decision by adding "[Contributor]
 * elects to include this software in this distribution under the [CDDL or GPL
 * Version 2] license."  If you don't indicate a single choice of license, a
 * recipient has the option to distribute your version of this file under
 * either the CDDL, the GPL Version 2 or to extend the choice of license to
 * its licensees as provided above.  However, if you add GPL Version 2 code
 * and therefore, elected the GPL Version 2 license, then the option applies
 * only if the new code is made subject to such option by the copyright
 * holder.
 */
package com.sun.enterprise.web.connector.grizzly;

import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Logger;
import java.util.logging.Level;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

import org.apache.coyote.Adapter;
import org.apache.coyote.RequestGroupInfo;

import javax.management.ObjectName;
import javax.management.MBeanServer;
import javax.management.MBeanRegistration;

import com.sun.enterprise.web.connector.grizzly.algorithms.NoParsingAlgorithm;
import com.sun.enterprise.web.connector.grizzly.async.DefaultAsyncHandler;
import com.sun.enterprise.web.connector.grizzly.comet.CometAsyncFilter;
import com.sun.enterprise.web.connector.grizzly.FileCache.FileCacheEntry;
import java.util.Enumeration;

/**
 * This class implement an NIO socket HTTP Listener. This class 
 * supports three stagegy:
 *
 * Mode Blocking: This mode uses NIO blocking mode, and doesn't uses any of the 
 *                java.nio.* classes.
 *
 *
 * Mode Non-Blocking: This mode uses NIO non blocking mode and read the entire 
 *         request stream before processing the request. The stragegy used is 
 *         to find the content-lenght header and buffer bytes until the end of 
 *         the stream is read.
 *
 * @author Jean-Francois Arcand
 */
public class SelectorThread extends Thread implements MBeanRegistration{
            
    public final static String SERVER_NAME = 
            System.getProperty("product.name") != null 
                ? System.getProperty("product.name") : "grizzly";
    
        
    private Object[] lock = new Object[0];

    
    protected int serverTimeout = Constants.DEFAULT_SERVER_SOCKET_TIMEOUT;

    protected InetAddress inet;
    protected int port;

    protected ServerSocket serverSocket;
    
    /**
     * The <code>ServerSocketChannel</code> used in blocking mode.
     */
    protected ServerSocketChannel serverSocketChannel;
    
    protected boolean initialized = false;    
    protected volatile boolean running = false;    
    // ----------------------------------------------------- JMX Support ---/
    
    
    protected String domain;
    protected ObjectName oname;
    protected ObjectName globalRequestProcessorName;
    private ObjectName keepAliveMbeanName;
    private ObjectName pwcConnectionQueueMbeanName;
    private ObjectName pwcFileCacheMbeanName;
    protected MBeanServer mserver;
    protected ObjectName processorWorkerThreadName;


    // ------------------------------------------------------Socket setting --/

    protected boolean tcpNoDelay=false;
    
    
    protected int linger=100;
    
    
    protected int socketTimeout=-1;
    
    
    protected int maxKeepAliveRequests = Constants.DEFAULT_MAX_KEEP_ALIVE;
    
    
    protected boolean oOBInline = false;
    // ------------------------------------------------------ Compression ---/


    /**
     * Compression value.
     */
    protected String compression = "off";
    protected String noCompressionUserAgents = null;
    protected String restrictedUserAgents = null;
    protected String compressableMimeTypes = "text/html,text/xml,text/plain";
    protected int compressionMinSize    = 2048;
       
    // ------------------------------------------------------ Properties----/
    
    
    /**
     * Is the socket reuse socket enabled.
     */
    private boolean reuseAddress = true;


    /**
     * Buffer the response until the buffer is full.
     */
    protected boolean bufferResponse = false;
    
    
    /**
     * Default HTTP header buffer size.
     */
    protected int maxHttpHeaderSize = Constants.DEFAULT_HEADER_SIZE;


    /**
     * Number of polled <code>Read*Task</code> instance.
     */
    protected int minReadQueueLength = 10;


    /**
     * Number of polled <code>ProcessorTask</code> instance.
     */
    protected int minProcessorQueueLength = 10;
    
    
    protected int maxPostSize = 2 * 1024 * 1024;


    /**
     * The <code>Selector</code> used by the connector.
     */
    protected Selector selector;


    /**
     * Associated adapter.
     */
    protected Adapter adapter = null;

    
    /**
     * The queue shared by this thread and code>ReadTask</code>
     */ 
    protected Pipeline readPipeline;
    

    /**
     * The queue shared by this thread and the code>ProcessorTask</code>.
     */ 
    protected Pipeline processorPipeline;
    
  
    /**
     * Placeholder for <code>Pipeline</code> statistic.
     */
    protected PipelineStatistic pipelineStat;
    
    /**
     * The default <code>Pipeline</code> used.
     */
    protected String pipelineClassName = 
        com.sun.enterprise.web.connector.grizzly.
            LinkedListPipeline.class.getName();
    
    /**
     * Maximum number of <code>WorkerThread</code>
     */
    protected int maxProcessorWorkerThreads = 5; // By default
    
    
    /**
     * Maximum number of <code>ReadWorkerThread</code>
     */
    protected int maxReadWorkerThreads = -1; // By default

    
    /**
     * Minimum numbers of <code>WorkerThread</code> created
     */
    protected int minWorkerThreads = 5;
    

    /**
     * Minimum numbers of <code>WorkerThread</code> 
     * before creating new thread.
     * <implementation-note>
     * Not used in 9.x
     * </implementation-note>
     */
    protected int minSpareThreads = 2;

    
    /**
     * The number used when increamenting the <code>Pipeline</code> 
     * thread pool.
     */
    protected int threadsIncrement = 1;
    
    
    /**
     * The timeout used by the thread when processing a request.
     */
    protected int threadsTimeout = Constants.DEFAULT_TIMEOUT;

    
    /**
     * Is the <code>ByteBuffer</code> used by the <code>ReadTask</code> use
     * direct <code>ByteBuffer</code> or not.
     */
    protected boolean useDirectByteBuffer = false;
    
  
    /**
     * Monitoring object used to store information.
     */
    protected RequestGroupInfo globalRequestProcessor= new RequestGroupInfo();
    
    
    /**
     * Keep-alive stats
     */
    private KeepAliveStats keepAliveStats = new KeepAliveStats();


    /**
     * If <code>true</code>, display the NIO configuration information.
     */
    protected boolean displayConfiguration = false;
    
    
    /**
     * Is monitoring already started.
     */
    protected boolean isMonitoringEnabled = false;
    

    /**
     * The current number of simulatenous connection.
     */
    protected int currentConnectionNumber;


    /**
     * Is this Selector currently in Wating mode?
     */
    protected volatile boolean isWaiting = false;
    

    /**
     * The input request buffer size.
     */
    protected int requestBufferSize = Constants.DEFAULT_REQUEST_BUFFER_SIZE;
    
    
    /**
     * Create view <code>ByteBuffer</code> from another <code>ByteBuffer</code>
     */
    protected boolean useByteBufferView = false;
    

    /*
     * Number of seconds before idle keep-alive connections expire
     */
    protected int keepAliveTimeoutInSeconds = Constants.DEFAULT_TIMEOUT;

    
    /**
     * Number of seconds before idle keep-alive connections expire
     */
    private int kaTimeout = Constants.DEFAULT_TIMEOUT * 1000;
    
    
    /**
     * Recycle the <code>Task</code> after running them
     */
    protected boolean recycleTasks = Constants.DEFAULT_RECYCLE;
    
    
    /**
     * The <code>Selector</code> timeout value. By default, it is set to 60000
     * miliseconds (as in the j2se 1.5 ORB).
     */
    protected static int selectorTimeout = 1000;


    /**
     * Maximum pending connection before refusing requests.
     */
    protected int maxQueueSizeInBytes = Constants.DEFAULT_QUEUE_SIZE;


    /**
     * The <code>Algorithm</code> used to predict the end of the NIO stream
     */
    protected Class algorithmClass;
    
    
    /**
     * The <code>Algorithm</code> used to parse the NIO stream.
     */
    protected String algorithmClassName = DEFAULT_ALGORITHM;
    
    
    /**
     * The default NIO stream algorithm.
     */
    public final static String DEFAULT_ALGORITHM =
        com.sun.enterprise.web.connector.grizzly.algorithms.
            NoParsingAlgorithm.class.getName();

    
    /**
     * Server socket backlog.
     */
    protected int ssBackLog = 4096;
    
    
    /**
     * Next time the exprireKeys() will delete keys.
     */    
    private long nextKeysExpiration = 0;
    
    
    /**
     * The default response-type
     */
    protected String defaultResponseType = Constants.DEFAULT_RESPONSE_TYPE;


    /**
     * The forced request-type
     */
    protected String forcedRequestType = Constants.FORCED_REQUEST_TYPE;
    
    
    /**
     * The root folder where application are deployed
     */
    protected static String rootFolder = "";
    
    
    // ----------------------------------------------------- Collections --//
    
    
    /**
     * List of <code>SelectionKey</code> event to register next time the 
     * <code>Selector</code> wakeups. This is needed since there a bug
     * in j2se 1.4.x that prevent registering selector event if the call
     * is done on another thread.
     */
    private ConcurrentLinkedQueue<SelectionKey> keysToEnable =
        new ConcurrentLinkedQueue<SelectionKey>();
         
    
    // ---------------------------------------------------- Object pools --//


    /**
     * <code>ConcurrentLinkedQueue</code> used as an object pool.
     * If the list becomes empty, new <code>ProcessorTask</code> will be
     * automatically added to the list.
     */
    protected ConcurrentLinkedQueue<ProcessorTask> processorTasks =
        new ConcurrentLinkedQueue<ProcessorTask>();
              
    
    /**
     * <code>ConcurrentLinkedQueue</code> used as an object pool.
     * If the list becomes empty, new <code>ReadTask</code> will be
     * automatically added to the list.
     */
    protected ConcurrentLinkedQueue<ReadTask> readTasks =
        new ConcurrentLinkedQueue<ReadTask>();

    
    /**
     * List of active <code>ProcessorTask</code>.
     */
    protected ConcurrentLinkedQueue<ProcessorTask> activeProcessorTasks =
        new ConcurrentLinkedQueue<ProcessorTask>();
    
    // -----------------------------------------  Multi-Selector supports --//

    /**
     * The number of <code>SelectorReadThread</code>
     */
    protected int multiSelectorsCount = 0;

    
    /**
     * The <code>Selector</code> used to register OP_READ
     */    
    protected MultiSelectorThread[] readThreads;
    
    
    /**
     * The current <code>readThreads</code> used to process OP_READ.
     */
    int curReadThread;

    
    /**
     * The logger used by the grizzly classes.
     */
    protected static Logger logger = Logger.getLogger("GRIZZLY");
    
    
    /**
     * Flag to disable setting a different time-out on uploads.
     */
    protected boolean disableUploadTimeout = true;    
    
    
    /**
     * Maximum timeout on uploads. 5 minutes as in Apache HTTPD server.
     */
    protected int uploadTimeout = 30000;
            
            
    // -----------------------------------------  Keep-Alive subsystems --//
    
     
    /**
     * Keep-Alive subsystem. If a client opens a socket but never close it,
     * the <code>SelectionKey</code> will stay forever in the 
     * <code>Selector</code> keys, and this will eventualy produce a 
     * memory leak.
     */
    protected KeepAlivePipeline keepAlivePipeline;
    
    
    // ------------------------------------------------- FileCache support --//
   
    
    /**
     * The FileCacheFactory associated with this Selector
     */ 
    protected FileCacheFactory fileCacheFactory;
    
        /**
     * Timeout before remove the static resource from the cache.
     */
    protected int secondsMaxAge = -1;
    
    
    /**
     * The maximum entries in the <code>fileCache</code>
     */
    protected int maxCacheEntries = 1024;
    
 
    /**
     * The maximum size of a cached resources.
     */
    protected long minEntrySize = 2048;
            
               
    /**
     * The maximum size of a cached resources.
     */
    protected long maxEntrySize = 537600;
    
    
    /**
     * The maximum cached bytes
     */
    protected long maxLargeFileCacheSize = 10485760;
 
    
    /**
     * The maximum cached bytes
     */
    protected long maxSmallFileCacheSize = 1048576;
    
    
    /**
     * Is the FileCache enabled.
     */
    protected boolean isFileCacheEnabled = true;
    
    
    /**
     * Is the large FileCache enabled.
     */
    protected boolean isLargeFileCacheEnabled = true;    
    
    // --------------------------------------------- Asynch supports -----//
    
    /**
     * Is asynchronous mode enabled?
     */
    protected boolean asyncExecution = false;
    
    
    /**
     * When the asynchronous mode is enabled, the execution of this object
     * will be delegated to the <code>AsyncHandler</code>
     */
    protected AsyncHandler asyncHandler;
    
    
    /**
     * Is the DEFAULT_ALGORITHM used.
     */
    protected static boolean defaultAlgorithmInstalled = true;
    
    
    /**
     * The JMX Management class.
     */
    private Management jmxManagement = null;
    
    
    /**
     * The Classloader used to load instance of StreamAlgorithm.
     */
    private ClassLoader classLoader;
    
    
    /**
     * Grizzly own debug flag.
     */
    protected boolean enableNioLogging = false;
    
    
    /**
     * Static list of current instance of this class.
     */
    private final static ConcurrentHashMap<Integer,SelectorThread> 
            selectorThreads = new ConcurrentHashMap<Integer,SelectorThread>();
    
    
    /**
     * Banned SelectionKey registration.
     */
    protected ConcurrentLinkedQueue<SelectionKey> bannedKeys =
        new ConcurrentLinkedQueue<SelectionKey>();    
    
    
    // ---------------------------------------------------- Constructor --//
    
    
    /**
     * Create the <code>Selector</code> object. Each instance of this class
     * will listen to a specific port.
     */
    public SelectorThread(){
    }
    
    // ------------------------------------------------------ Selector hook --/
    
    
    /**
     * Return the <code>SelectorThread</code> which listen on port, or null
     * if there is no <code>SelectorThread</code>.
     */
    public final static SelectorThread getSelector(int port){
        return selectorThreads.get(port);
    }
    
    
    /**
     * Return an <code>Enumeration</code> of the active 
     * <code>SelectorThread</code>s
     */
    public final static Enumeration<SelectorThread> getSelectors(){
        return selectorThreads.elements();
    }
    
    
    // ----------------------------------------------------------------------/
    
   /**
     * Enable all registered interestOps. Due a a NIO bug, all interestOps
     * invokation needs to occurs on the same thread as the selector thread.
     */
    public void enableSelectionKeys(){
        SelectionKey selectionKey;
        int size = keysToEnable.size();
        long currentTime = 0L;
        if (size > 0){
            currentTime = (Long)System.currentTimeMillis();
        }

        for (int i=0; i < size; i++) {
            selectionKey = keysToEnable.poll();
            
            // If the SelectionKey is used for continuation, do not allow
            // the key to be registered.
            if (asyncExecution && !bannedKeys.isEmpty() 
                    && bannedKeys.remove(selectionKey)){
                continue;
            }
            
            if (!selectionKey.isValid() || !keepAlivePipeline.trap(selectionKey)){
                cancelKey(selectionKey);
                continue;
            }

            selectionKey.interestOps(
                    selectionKey.interestOps() | SelectionKey.OP_READ);

            if (selectionKey.attachment() == null)
                selectionKey.attach(currentTime);
        } 
    } 
    
    
    /**
     * Add a <code>SelectionKey</code> to the banned list of SelectionKeys. 
     * A SelectionKey is banned when new registration aren't allowed on the 
     * Selector.
     */
    public void addBannedSelectionKey(SelectionKey key){
        bannedKeys.offer(key);
    }
    
    
    /**
     * Register a <code>SelectionKey</code> to this <code>Selector</code>
     * running of this thread.
     */
    public void registerKey(SelectionKey key){
        if (key == null) return;
        
        if (keepAlivePipeline.dropConnection()) {
            cancelKey(key);
            return;
        }
        
        if (defaultAlgorithmInstalled){
            key.attach(null);
        }
        
        if (enableNioLogging){
            logger.log(Level.INFO,
                    "Registering SocketChannel for keep alive " +  
                    key.channel());
        }         
        // add SelectionKey & Op to list of Ops to enable
        keysToEnable.add(key);
        // tell the Selector Thread there's some ops to enable
        selector.wakeup();
        // wakeup() will force the SelectorThread to bail out
        // of select() to process your registered request
    } 

   // -------------------------------------------------------------- Init // 


    /**
     * initialized the endpoint by creating the <code>ServerScoketChannel</code>
     * and by initializing the server socket.
     */
    public void initEndpoint() throws IOException, InstantiationException {
        SelectorThreadConfig.configure(this);
        
        initFileCacheFactory();
        initAlgorithm();
        initPipeline();
        initMonitoringLevel();
        
        setName("SelectorThread-" + port);
        
        try{
            // Create the socket listener
            serverSocketChannel = ServerSocketChannel.open();
            selector = Selector.open();

            serverSocket = serverSocketChannel.socket();
            serverSocket.setReuseAddress(reuseAddress);
            if ( inet == null)
                serverSocket.bind(new InetSocketAddress(port),ssBackLog);
            else
                serverSocket.bind(new InetSocketAddress(inet,port),ssBackLog);

            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (SocketException ex){
            throw new BindException(ex.getMessage() + ": " + port);
        }
        
        serverSocket.setSoTimeout(serverTimeout);
        
        if ( multiSelectorsCount > 1 ){
            readThreads = new MultiSelectorThread[multiSelectorsCount];
            initMultiSelectors();
        } 
        initProcessorTask(maxProcessorWorkerThreads);
        initReadTask(minReadQueueLength);                
        SelectorFactory.maxSelectors = maxProcessorWorkerThreads;

        initialized = true;           
        logger.log(Level.FINE,"Initializing Grizzly Non-Blocking Mode");                     
    }
     
    
    /**
     * Create a new <code>Pipeline</code> instance using the 
     * <code>pipelineClassName</code> value.
     */
    protected Pipeline newPipeline(int maxThreads,
                                   int minThreads,
                                   String name, 
                                   int port,
                                   int priority){
        
        Class className = null;                               
        Pipeline pipeline = null;                               
        try{           
            if ( classLoader == null ){
                className = Class.forName(pipelineClassName);
            } else {
                className = classLoader.loadClass(pipelineClassName);
            }
            pipeline = (Pipeline)className.newInstance();
        } catch (ClassNotFoundException ex){
            logger.log(Level.WARNING,
                       "Unable to load Pipeline: " + pipelineClassName);
            pipeline = new LinkedListPipeline();
        } catch (InstantiationException ex){
            logger.log(Level.WARNING,
                       "Unable to instantiate Pipeline: "
                       + pipelineClassName);
            pipeline = new LinkedListPipeline();
        } catch (IllegalAccessException ex){
            logger.log(Level.WARNING,
                       "Unable to instantiate Pipeline: "
                       + pipelineClassName);
            pipeline = new LinkedListPipeline();
        }
        
        if (logger.isLoggable(Level.FINE)){
            logger.log(Level.FINE,
                       "http-listener " + port + " uses pipeline: "
                       + pipeline.getClass().getName());
        }
        
        pipeline.setMaxThreads(maxThreads);
        pipeline.setMinThreads(minThreads);    
        pipeline.setName(name);
        pipeline.setPort(port);
        pipeline.setPriority(priority);
        pipeline.setQueueSizeInBytes(maxQueueSizeInBytes);
        pipeline.setThreadsIncrement(threadsIncrement);
        pipeline.setThreadsTimeout(threadsTimeout);
        
        return pipeline;
    }
    
    
    /**
     * Initialize the fileCacheFactory associated with this instance
     */
    protected void initFileCacheFactory(){        
        if (asyncExecution){
            isFileCacheEnabled = false;
            isLargeFileCacheEnabled = false;
        }
        
        fileCacheFactory = FileCacheFactory.getFactory(port);
        fileCacheFactory.setIsEnabled(isFileCacheEnabled);
        fileCacheFactory.setLargeFileCacheEnabled(isLargeFileCacheEnabled);
        fileCacheFactory.setSecondsMaxAge(secondsMaxAge);
        fileCacheFactory.setMaxCacheEntries(maxCacheEntries);
        fileCacheFactory.setMinEntrySize(minEntrySize);
        fileCacheFactory.setMaxEntrySize(maxEntrySize);
        fileCacheFactory.setMaxLargeCacheSize(maxLargeFileCacheSize);
        fileCacheFactory.setMaxSmallCacheSize(maxSmallFileCacheSize);         
        fileCacheFactory.setIsMonitoringEnabled(isMonitoringEnabled);
        fileCacheFactory.setHeaderBBSize(requestBufferSize);
    }
       
    
    /**
     * Injects <code>PipelineStatistic</code> into every
     * <code>Pipeline</code>, for monitoring purposes.
     */
    protected void enablePipelineStats(){
        pipelineStat.start();

        processorPipeline.setPipelineStatistic(pipelineStat);       
        pipelineStat.setProcessorPipeline(processorPipeline);

        if (keepAlivePipeline != null){
            keepAlivePipeline.setKeepAliveStats(keepAliveStats);
        }
    }
    

    /**
     * Removes <code>PipelineStatistic</code> from every
     * <code>Pipeline</code>, when monitoring has been turned off.
     */
    protected void disablePipelineStats(){
        pipelineStat.stop();
        
        processorPipeline.setPipelineStatistic(null);
        pipelineStat.setProcessorPipeline(null);

        if (keepAlivePipeline != null){
            keepAlivePipeline.setKeepAliveStats(null);
        }

    }

    
    /**
     * Load using reflection the <code>Algorithm</code> class.
     */
    protected void initAlgorithm(){
        try{    
            if (classLoader == null){
                algorithmClass = Class.forName(algorithmClassName);
            } else {
                algorithmClass = classLoader.loadClass(algorithmClassName);
            }
            logger.log(Level.FINE,
                       "Using Algorithm: " + algorithmClassName);   
        } catch (ClassNotFoundException ex){
            logger.log(Level.FINE,
                       "Unable to load Algorithm: " + algorithmClassName);        
        }  finally {
            if ( algorithmClass == null ){
                algorithmClass = NoParsingAlgorithm.class;
            }
        }

        defaultAlgorithmInstalled = 
                algorithmClassName.equals(DEFAULT_ALGORITHM) ? true:false;
    }
    
    
    /**
     * Initialize the keep-alive mechanism.
     */
    protected void initKeepAlivePipeline(){
        keepAlivePipeline = new KeepAlivePipeline();
        keepAlivePipeline.setMaxKeepAliveRequests(maxKeepAliveRequests);
        keepAlivePipeline
            .setKeepAliveTimeoutInSeconds(keepAliveTimeoutInSeconds);
        keepAlivePipeline.setPort(port);
        keepAlivePipeline.setThreadsTimeout(threadsTimeout);

        keepAliveStats.setMaxConnections(maxKeepAliveRequests);
        keepAliveStats.setSecondsTimeouts(keepAliveTimeoutInSeconds);        
    }
    
    
    /**
     * Init the <code>Pipeline</code>s used by the <code>WorkerThread</code>s.
     */
    protected void initPipeline(){     
                
        selectorThreads.put(port,this);
        
        initKeepAlivePipeline();
        
        processorPipeline = newPipeline(maxProcessorWorkerThreads, 
                                        minWorkerThreads, "http",
                                        port,Thread.MAX_PRIORITY);  
        processorPipeline.initPipeline();

        if ( maxReadWorkerThreads == 0){
            maxReadWorkerThreads = -1;
            logger.log(Level.WARNING,
                       "http-listener " + port + 
                       " is security-enabled and needs at least 2 threads");
        }
        
        // Only creates the pipeline if the max > 0, and the async mechanism
        // must not be enabled.
        if ( maxReadWorkerThreads > 0 && !asyncExecution){                        
            readPipeline = newPipeline(maxReadWorkerThreads, 
                                       minWorkerThreads, "read", 
                                       port,Thread.NORM_PRIORITY);
            readPipeline.initPipeline();
        } else {
            readPipeline = (maxReadWorkerThreads == 0 ? null:processorPipeline);
        }
    }
    
    
    /**
     * Create a pool of <code>ReadTask</code>
     */
    protected void initReadTask(int size){         
        ReadTask task;
        for (int i=0; i < size; i++){
            task = newReadTask();             
            readTasks.offer(task);    
        }
    }
    
    
    /**
     * Return a new <code>ReadTask</code> instance
     */
    protected ReadTask newReadTask(){
        StreamAlgorithm streamAlgorithm = null;
        
        try{
            streamAlgorithm = (StreamAlgorithm)algorithmClass.newInstance();
        } catch (InstantiationException ex){
            logger.log(Level.WARNING,
                       "Unable to instantiate Algorithm: "+ algorithmClassName);
        } catch (IllegalAccessException ex){
            logger.log(Level.WARNING,
                       "Unable to instantiate Algorithm: " + algorithmClassName);
        } finally {
            if ( streamAlgorithm == null)
                streamAlgorithm = new NoParsingAlgorithm();
        }       
        streamAlgorithm.setPort(port);
        
        ReadTask task = null;
        
        /**
         * For performance reason, we need to avoid calling newInstance() when
         * the default configuration is used.
         */
        if ( !defaultAlgorithmInstalled ) {
            try{
                task = (ReadTask)streamAlgorithm.getReadTask(this).newInstance();
            } catch (InstantiationException ex){
                logger.log(Level.WARNING,
                           "Unable to instantiate Algorithm: "
                        + algorithmClassName);
            } catch (IllegalAccessException ex){
                logger.log(Level.WARNING,
                           "Unable to instantiate Algorithm: " 
                        + algorithmClassName);
            } finally {
                if ( task == null)
                    task = new DefaultReadTask();
            }  
        } else if ( maxReadWorkerThreads > 0 || asyncExecution ){
            task = new AsyncReadTask();
        } else {
            task = new DefaultReadTask();
        }
        task.setSelectorThread(this);     
        task.setPipeline(readPipeline);  
        task.setRecycle(recycleTasks);
        task.initialize(streamAlgorithm, useDirectByteBuffer,useByteBufferView);
       
        return task;
    }


    /**
     * Initialize <code>SelectorReadThread</code> used to process
     * OP_READ operations.
     */
    protected void initMultiSelectors() throws IOException, 
                                                 InstantiationException {
        for (int i = 0; i < readThreads.length; i++) {
            readThreads[i] = new SelectorReadThread();
            ((SelectorReadThread)readThreads[i]).countName = i;
            configureReadThread((SelectorReadThread)readThreads[i]);
        }
        curReadThread = 0;
    }
    
    
    protected void configureReadThread(SelectorThread multiSelector)
            throws IOException, InstantiationException {
        multiSelector.setMaxThreads(maxProcessorWorkerThreads);
        multiSelector.setBufferSize(requestBufferSize);
        multiSelector.setMaxKeepAliveRequests(maxKeepAliveRequests);
        multiSelector
                .setKeepAliveTimeoutInSeconds(keepAliveTimeoutInSeconds);
        multiSelector.maxQueueSizeInBytes = maxQueueSizeInBytes;
        multiSelector.fileCacheFactory = fileCacheFactory;     
        multiSelector.maxReadWorkerThreads = maxReadWorkerThreads;
        multiSelector.defaultResponseType = defaultResponseType;
        multiSelector.forcedRequestType = forcedRequestType;          
        multiSelector.minReadQueueLength = minReadQueueLength;
        multiSelector.maxHttpHeaderSize = maxHttpHeaderSize;
        multiSelector.isMonitoringEnabled = isMonitoringEnabled();
        multiSelector.pipelineStat = pipelineStat;
        multiSelector.globalRequestProcessor = globalRequestProcessor;

        if ( asyncExecution ) {
            multiSelector.asyncExecution = asyncExecution;
            multiSelector.asyncHandler = asyncHandler;
        }

        multiSelector.threadsIncrement = threadsIncrement;
        multiSelector.setPort(port);
        multiSelector.setAdapter(adapter);

        multiSelector.processorPipeline = processorPipeline;
        multiSelector.readPipeline = readPipeline;            
        multiSelector.readTasks = readTasks;
        multiSelector.processorTasks = processorTasks;   
        multiSelector.keepAlivePipeline = keepAlivePipeline;
        multiSelector.domain = domain;
        multiSelector.bufferResponse = bufferResponse;

        multiSelector.initEndpoint();
        multiSelector.start();
    }


    /**
     * Return an instance of <code>SelectorReadThread</code> to use
     * for registering OP_READ
     */
    private synchronized MultiSelectorThread getSelectorReadThread() {
        if (curReadThread == readThreads.length)
            curReadThread = 0;
        return readThreads[curReadThread++];
    }
    
    
    /**
     * Create a pool of <code>ProcessorTask</code>
     */
    protected void initProcessorTask(int size){
        for (int i=0; i < size; i++){           
            processorTasks.offer(newProcessorTask(false));
        }
    }  


    /**
     * Initialize <code>ProcessorTask</code>
     */
    protected void rampUpProcessorTask(){
        Iterator<ProcessorTask> iterator = processorTasks.iterator();
        while (iterator.hasNext()) {
            iterator.next().initialize();
        }
    }  
    

    /**
     * Create <code>ProcessorTask</code> objects and configure it to be ready
     * to proceed request.
     */
    protected ProcessorTask newProcessorTask(boolean initialize){                                                      
        DefaultProcessorTask task = 
                new DefaultProcessorTask(initialize, bufferResponse);
        return configureProcessorTask(task);       
    }
    
    
    protected ProcessorTask configureProcessorTask(DefaultProcessorTask task){
        task.setAdapter(adapter);
        task.setMaxHttpHeaderSize(maxHttpHeaderSize);
        task.setBufferSize(requestBufferSize);
        task.setSelectorThread(this);               
        task.setRecycle(recycleTasks);
        task.setDefaultResponseType(defaultResponseType);
        task.setForcedRequestType(forcedRequestType);
        task.setMaxPostSize(maxPostSize);
        task.setTimeout(uploadTimeout);
        task.setDisableUploadTimeout(disableUploadTimeout);
 
        if ( keepAlivePipeline.dropConnection() ) {
            task.setDropConnection(true);
        }
        
        // Asynch extentions
        if ( asyncExecution ) {
            task.setEnableAsyncExecution(asyncExecution);
            task.setAsyncHandler(asyncHandler);          
        }
                
        task.setPipeline(processorPipeline);         
        configureCompression(task);
        
        return (ProcessorTask)task;        
    }
 
    
    /**
     * Reconfigure Grizzly Asynchronous Request Processing(ARP) internal 
     * objects.
     */
    protected void reconfigureAsyncExecution(){
        for(ProcessorTask task :processorTasks){
            if (task instanceof DefaultProcessorTask) {
                ((DefaultProcessorTask)task)
                    .setEnableAsyncExecution(asyncExecution);
                ((DefaultProcessorTask)task).setAsyncHandler(asyncHandler);  
            }
        }
        
        readTasks.clear();
        initReadTask(minReadQueueLength);   
    }
    
 
    /**
     * Return a <code>ProcessorTask</code> from the pool. If the pool is empty,
     * create a new instance.
     */
    public ProcessorTask getProcessorTask(){
        ProcessorTask processorTask = null;
        if (recycleTasks) {
            processorTask = processorTasks.poll();
        }
        
        if (processorTask == null){
            processorTask = newProcessorTask(false);
        } 
        
        if ( isMonitoringEnabled() ){
           activeProcessorTasks.offer(processorTask); 
        }

        
        return processorTask;
    }
        
    
    /**
     * Return a <code>ReadTask</code> from the pool. If the pool is empty,
     * create a new instance.
     */
    public ReadTask getReadTask(SelectionKey key) throws IOException{
        ReadTask task = null;
        if ( recycleTasks ) {
            task = readTasks.poll();
        }
        
        if (task == null){
            task = newReadTask(); 
        }           

        task.setSelectionKey(key);
        return task;
    }
 
    
    // --------------------------------------------------------- Thread run --/
    
    
    /**
     * Start the endpoint (this)
     */
    public void run(){
        try{
            startEndpoint();
        } catch (Exception ex){
            logger.log(Level.SEVERE,"selectorThread.errorOnRequest", ex);
        }
    }

    
    // ------------------------------------------------------------Start ----/
    
    
    /**
     * Start the Acceptor Thread and wait for incoming connection, in a non
     * blocking mode.
     */
    public void startEndpoint() throws IOException, InstantiationException {
        running = true;
        
        kaTimeout = keepAliveTimeoutInSeconds * 1000;
        rampUpProcessorTask();
        registerComponents();
 
        displayConfiguration();

        startPipelines();
        startListener();
    }
    
    
    /**
     * Starts the <code>Pipeline</code> used by this <code>Selector</code>
     */
    protected void startPipelines(){
        if (readPipeline != null){
            readPipeline.startPipeline();
        }

        processorPipeline.startPipeline();        
    }

    
    /**
     * Stop the <code>Pipeline</code> used by this <code>Selector</code>
     */
    protected void stopPipelines(){
        if ( keepAlivePipeline != null )
            keepAlivePipeline.stopPipeline();        

        if (readPipeline != null){
            readPipeline.stopPipeline();
        }
        
        processorPipeline.stopPipeline();

    }
    
    /**
     * Start a non blocking <code>Selector</code> object.
     */
    protected void startListener(){
        synchronized(lock){
            while (running && selector.isOpen()) {
                doSelect();
            }       

            try{
                closeActiveConnections();
                stopPipelines();
                stopSelectorReadThread();

                try{
                    if ( serverSocket != null )
                        serverSocket.close();
                } catch (Throwable ex){
                    logger.log(Level.SEVERE,
                            "selectorThread.closeSocketException",ex);
                }

                try{
                    if ( serverSocketChannel != null)
                        serverSocketChannel.close();
                } catch (Throwable ex){
                    logger.log(Level.SEVERE,
                            "selectorThread.closeSocketException",ex);
                }

                try{
                    if ( selector != null)
                        selector.close();
                } catch (Throwable ex){
                    logger.log(Level.SEVERE,
                            "selectorThread.closeSocketException",ex);
                }
                unregisterComponents();
            } catch (Throwable t){
                logger.log(Level.SEVERE,"selectorThread.stopException",t);
            } 
        }
    }
    
    
    /**
     * Execute a <code>Selector.select()</code> operation.
     */
    protected void doSelect(){
        SelectionKey key = null;
        Set readyKeys;
        Iterator<SelectionKey> iterator;
        int selectorState; 

        try{
            selectorState = 0;
            enableSelectionKeys();                

            try{                
                selectorState = selector.select(selectorTimeout);
            } catch (CancelledKeyException ex){
                ;
            }

            if (!running) return;

            readyKeys = selector.selectedKeys();
            iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                key = iterator.next();
                iterator.remove();
                if (key.isValid()) {
                    handleConnection(key);
                } else {
                    cancelKey(key);
                }
            }
            
            expireIdleKeys();
            
            if (selectorState <= 0){
                selector.selectedKeys().clear();
                return;
            }
        } catch (Throwable t){
            if (key != null && key.isValid()){
                logger.log(Level.SEVERE,"selectorThread.errorOnRequest",t);
            } else {
                logger.log(Level.FINE,"selectorThread.errorOnRequest",t);
            }

            if ( key != null ){
                key.attach(null);
                key.cancel();
            }
        }
    }
    
    
    /**
     * Cancel keep-alive connections.
     */
    protected void expireIdleKeys(){
        if ( keepAliveTimeoutInSeconds <= 0 || !selector.isOpen()) return;
        long current = System.currentTimeMillis();

        if (current < nextKeysExpiration) {
            return;
        }
        nextKeysExpiration = current + kaTimeout;
        
        Set<SelectionKey> readyKeys = selector.keys();
        if (readyKeys.isEmpty()){
            return;
        }
        Iterator<SelectionKey> iterator = readyKeys.iterator();
        SelectionKey key;
        while (running && iterator.hasNext()) {
            key = iterator.next();          
            // Keep-alive expired
            Object attachment = key.attachment();
            if (attachment != null) {
                
                if ( !defaultAlgorithmInstalled 
                        && !(attachment instanceof Long)) { 
                    continue; 
                }
                    
                try{
                    long expire = (Long)attachment;
                    if (current - expire >= kaTimeout) {
                        if (enableNioLogging){
                            logger.log(Level.INFO,
                                    "Keep-Alive expired for SocketChannel " + 
                                    key.channel());
                        }          
                        cancelKey(key);
                    } else if (expire + kaTimeout < nextKeysExpiration){
                        nextKeysExpiration = expire + kaTimeout;
                    }
                } catch (ClassCastException ex){                            
                    if (logger.isLoggable(Level.FINEST)){
                        logger.log(Level.FINEST,
                                   "Invalid SelectionKey attachment",ex);
                    }
                }
            }
        }                    
    }  

    
    /**
     * Handle an incoming operation on the channel. It is always an ACCEPT or
     * a READ.
     */
    protected void handleConnection(SelectionKey key) throws IOException,
                                                           InterruptedException{
                                                    
        Task task = null;
        if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT){
            handleAccept(key);
            return;
        } else if ((key.readyOps() & SelectionKey.OP_READ) 
                                                    == SelectionKey.OP_READ) {
            task = handleRead(key);
        } 
 
        if ( ((SocketChannel)key.channel()).isOpen() ) {
            task.execute();
        } else {
            cancelKey(key);
        }
    }
        
      
    
    /**
     * Handle OP_ACCEPT
     */
    protected void handleAccept(SelectionKey key) throws IOException{
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel channel = server.accept();

        if (enableNioLogging){
            logger.log(Level.INFO,"Handling OP_ACCEPT on SocketChannel " + 
                    channel);
        }
        
        if (channel != null) {
            if ( multiSelectorsCount > 1 ) {
                MultiSelectorThread srt = getSelectorReadThread();
                srt.addChannel(channel);
            } else {
                channel.configureBlocking(false);
                SelectionKey readKey = 
                        channel.register(selector, SelectionKey.OP_READ);
                setSocketOptions(((SocketChannel)readKey.channel()).socket());
                readKey.attach(System.currentTimeMillis());
            }

            if (isMonitoringEnabled()) {
                getRequestGroupInfo().increaseCountOpenConnections();
                pipelineStat.incrementTotalAcceptCount();
            }
        }
    }
    
    
    /**
     * Handle OP_READ
     */ 
    protected ReadTask handleRead(SelectionKey key) throws IOException{                   
        // disable OP_READ on key before doing anything else 
        key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
        
        if (enableNioLogging){
            logger.log(Level.INFO,"Handling OP_READ on SocketChannel " + 
                    key.channel());
        }      
        
        Object attach = key.attachment();
        if (!defaultAlgorithmInstalled) {
            if (key.isValid() && attach != null && attach instanceof ReadTask){
                key.attach(null);
                return (ReadTask)attach;
            }
        } 
        return getReadTask(key);
    }
    
    
    /**
     * Cancel the current <code>SelectionKey</code>
     */
    public void cancelKey(SelectionKey key){
        if (key == null){
            return;
        }

        keepAlivePipeline.untrap(key);       
        if (!processorPipeline.expireKey(key)){
            return;
        }

        if (enableNioLogging){
            logger.log(Level.INFO,"Closing SocketChannel " + 
                    key.channel());
        }

        Socket socket = ((SocketChannel)key.channel()).socket();
        try{
            if (!socket.isInputShutdown()){
                socket.shutdownInput();
            }
        } catch (IOException ex){
            ;
        }
        
        try{
            if (!socket.isOutputShutdown()){
                socket.shutdownOutput();
            }
        } catch (IOException ex){
            ;
        }

        try{
            if (!socket.isClosed()){
                socket.close();
            }
        } catch (IOException ex){
            ;
        } finally {
            try{
                // This is not needed but just to make sure the JDK isn't 
                // leaking any fd.
                if (key.channel().isOpen()){
                    key.channel().close();
                }
            } catch (IOException ex){
                logger.log(Level.FINEST,"selectorThread.unableToCloseKey", key);
            }
            if (isMonitoringEnabled()) {
                getRequestGroupInfo().decreaseCountOpenConnections();
            }
        }

        
        // Set the attachement to null so the Selector.java handleConnection
        // stop processing this key.
        key.attach(null);
        key.cancel();
        key = null;
    }
    
    
    /**
     * Returns the <code>Task</code> object to the pool.
     */
    public void returnTask(Task task){
        // Returns the object to the pool.
        if (task != null) {
            if (task.getType() == Task.PROCESSOR_TASK){
                                
                if ( isMonitoringEnabled() ){
                   activeProcessorTasks.remove(((DefaultProcessorTask)task));
                }  
                
                processorTasks.offer((DefaultProcessorTask)task);
            } else if (task.getType() == Task.READ_TASK){
                readTasks.offer((ReadTask)task);               
            }
        }
    }
    

    /**
     * Wakes up the <code>Selector</code> associated with this thread.
     */
    public void wakeup(){
        selector.wakeup();
    }

    
    /**
     * Clear all cached <code>Tasks</code> 
     */
    protected void clearTasks(){
        processorTasks.clear();
        readTasks.clear();
    }

    
    /**
     * Cancel the <code>threadID</code> execution. Return <code>true</code>
     * if it is successful.
     *
     * @param id the thread name to cancel
     */
    public boolean cancelThreadExecution(long cancelThreadID){
   
        if ( multiSelectorsCount > 1 ){
            boolean cancelled = false;
            for (MultiSelectorThread readSelector : readThreads) {
                cancelled = ((SelectorReadThread)readSelector).
                        cancelThreadExecution(cancelThreadID);   
                if (cancelled) return true;
            }       
            return false;
        }
                       
        if (activeProcessorTasks.size() == 0) return false;
        
        Iterator<ProcessorTask> iterator = activeProcessorTasks.iterator();
        ProcessorTask processorTask;
        long threadID;
        while( iterator.hasNext() ){
            processorTask = iterator.next();
            threadID = processorTask.getWorkerThreadID();
            if (threadID == cancelThreadID){
                processorTask.cancelTask("Request cancelled.","500");
                logger.log(Level.WARNING,
                        "Thread Request Cancelled: " + threadID);     
                return processorTask.getPipeline().interruptThread(threadID);
            }
        }
        return false;
    }

    
    /**
     * Stop the extra <code>SelectorReadThread</code>
     */
    private void stopSelectorReadThread(){
        if ( readThreads != null ){
            for (int i = 0; i < readThreads.length; i++) {
                readThreads[i].stopEndpoint();       
            }
        }
    }
    
    
    private void closeActiveConnections(){
        Set<SelectionKey> readyKeys = selector.keys();
        if (readyKeys.isEmpty()){
            return;
        }
        Iterator<SelectionKey> iterator = readyKeys.iterator();
        SelectionKey key;
        while (iterator.hasNext()) {
            key = iterator.next();
            if (key.channel() instanceof SocketChannel){
                cancelKey(key);
            } else {
                key.cancel();
            }
        }        
    }
    
    
    // ---------------------------------------------------Endpoint Lifecycle --/
 
    public void stopEndpoint() {
        if (!running) return;
        running = false;
        synchronized(lock){
            // Wait for the main thread to stop.
            clearTasks();
        }
    }
    
    // ------------------------------------------------------Public methods--/
 

    public void setMaxThreads(int maxThreads) {
        if ( maxThreads == 1 ) {
            maxProcessorWorkerThreads = 5;
        } else {
            maxProcessorWorkerThreads = maxThreads;
        }
    }

    public int getMaxThreads() {
        return maxProcessorWorkerThreads;
    }

    public void setMaxSpareThreads(int maxThreads) {
    }

    public int getMaxSpareThreads() {
        return maxProcessorWorkerThreads;
    }

    public void setMinSpareThreads(int minSpareThreads) {
        this.minSpareThreads = minSpareThreads;
    }

    public int getMinSpareThreads() {
        return  minSpareThreads;
    }


    public int getPort() {
        return port;
    }

    public void setPort(int port ) {
        this.port=port;
    }

    public InetAddress getAddress() {
        return inet;
    }

    public void setAddress(InetAddress inet) {
        this.inet=inet;
    }


    public boolean isRunning() {
        return running;
    }
 
    
    /**
     * Provides the count of request threads that are currently
     * being processed by the container
     *
     * @return The count of busy threads 
     */
    public int getCurrentBusyProcessorThreads() {
        int busy = 0;
        
        // multi selector support
        if (multiSelectorsCount > 1) {
            for (MultiSelectorThread readSelector : readThreads) {
                busy += ((SelectorReadThread)readSelector)
                                .getCurrentBusyProcessorThreads();   
            }

        } else {
            busy = processorPipeline.getCurrentThreadsBusy();
        }

        return busy;  
    }


    /**
     * Sets the timeout in ms of the server sockets created by this
     * server. This method allows the developer to make servers
     * more or less responsive to having their server sockets
     * shut down.
     *
     * <p>By default this value is 1000ms.
     */
    public void setServerTimeout(int timeout) {
        this.serverTimeout = timeout;
    }

    public boolean getTcpNoDelay() {
        return tcpNoDelay;
    }
    
    public void setTcpNoDelay( boolean b ) {
        tcpNoDelay=b;
    }

    public int getSoLinger() {
        return linger;
    }
    
    public void setSoLinger( int i ) {
        linger=i;
    }

    public int getSoTimeout() {
        return socketTimeout;
    }
    
    public void setSoTimeout( int i ) {
        socketTimeout=i;
    }
    
    public int getServerSoTimeout() {
        return serverTimeout;
    }  
    
    public void setServerSoTimeout( int i ) {
        serverTimeout=i;
    }
 
    // ------------------------------------------------------ Connector Methods


    /**
     * Get the maximum pending connection this <code>Pipeline</code>
     * can handle.
     */
    public int getQueueSizeInBytes(){
        return maxQueueSizeInBytes;
    }
    
    
    
    public int getMaxKeepAliveRequests() {
        return maxKeepAliveRequests;
    }
    
    
    /** 
     * Set the maximum number of Keep-Alive requests that we will honor.
     */
    public void setMaxKeepAliveRequests(int mkar) {
        maxKeepAliveRequests = mkar;
    }
    

    /** 
     * Sets the number of seconds before a keep-alive connection that has
     * been idle times out and is closed.
     *
     * @param timeout Keep-alive timeout in number of seconds
     */    
    public void setKeepAliveTimeoutInSeconds(int timeout) {
        keepAliveTimeoutInSeconds = timeout;
        keepAliveStats.setSecondsTimeouts(timeout);
    }


    /** 
     * Gets the number of seconds before a keep-alive connection that has
     * been idle times out and is closed.
     *
     * @return Keep-alive timeout in number of seconds
     */    
    public int getKeepAliveTimeoutInSeconds() {
        return keepAliveTimeoutInSeconds;
    }


    /** 
     * Sets the number of keep-alive threads.
     *
     * @param threadCount Number of keep-alive threads
     */    
    public void setKeepAliveThreadCount(int threadCount) {
        keepAlivePipeline.setMaxThreads(threadCount);
    }


    /**
     * Set the associated adapter.
     * 
     * @param adapter the new adapter
     */
    public void setAdapter(Adapter adapter) {
        this.adapter = adapter;
    }


    /**
     * Get the associated adapter.
     * 
     * @return the associated adapter
     */
    public Adapter getAdapter() {
        return adapter;
    }
    
    
    protected void setSocketOptions(Socket socket){
        try{
            if(linger >= 0 ) {
                socket.setSoLinger( true, linger);
            }
        } catch (SocketException ex){
            logger.log(Level.WARNING,
                        "setSoLinger exception ",ex);
        }
        
        try{
            if( tcpNoDelay )
                socket.setTcpNoDelay(tcpNoDelay);
        } catch (SocketException ex){
            logger.log(Level.WARNING,
                        "setTcpNoDelay exception ",ex);
        }
        
        try{
            if ( maxReadWorkerThreads != 0)
                socket.setReuseAddress(reuseAddress);
        } catch (SocketException ex){
            logger.log(Level.WARNING,
                        "setReuseAddress exception ",ex);
        }   
        
        try{
            if(oOBInline){
                socket.setOOBInline(oOBInline);
            }
        } catch (SocketException ex){
            logger.log(Level.WARNING,
                        "setOOBInline exception ",ex);
        }        
        
    }
    
    // ------------------------------- JMX and Monnitoring support --------//
    
    public ObjectName getObjectName() {
        return oname;
    }

    public String getDomain() {
        return domain;
    }

    public ObjectName preRegister(MBeanServer server,
                                  ObjectName name) throws Exception {
        oname=name;
        mserver=server;
        domain=name.getDomain();
        return name;
    }

    public void postRegister(Boolean registrationDone) {
        // Do nothing
    }

    public void preDeregister() throws Exception {
        // Do nothing
    }

    public void postDeregister() {
        // Do nothing
    }  


    /**
     * Register JMX components.
     */
    protected void registerComponents(){

        if( this.domain != null  && jmxManagement != null) {

            try {
                globalRequestProcessorName = new ObjectName(
                    domain + ":type=GlobalRequestProcessor,name=http" + port);
                jmxManagement.registerComponent(globalRequestProcessor,
                                                globalRequestProcessorName,
                                                null);
 
                keepAliveMbeanName = new ObjectName(
                    domain + ":type=PWCKeepAlive,name=http" + port);
                jmxManagement.registerComponent(keepAliveStats,
                                                keepAliveMbeanName,
                                                null);

                pwcConnectionQueueMbeanName = new ObjectName(
                    domain + ":type=PWCConnectionQueue,name=http" + port);
                jmxManagement.registerComponent(pipelineStat,
                                                pwcConnectionQueueMbeanName,
                                                null);
                
                pwcFileCacheMbeanName = new ObjectName(
                    domain + ":type=PWCFileCache,name=http" + port);
                jmxManagement.registerComponent(fileCacheFactory,
                                                pwcFileCacheMbeanName,
                                                null);                
            } catch (Exception ex) {
                logger.log(Level.WARNING,
                           "selectorThread.mbeanRegistrationException",
                           new Object[]{new Integer(port),ex});
            }
        }

    }
    
    
    /**
     * Unregister components.
     **/
    protected void unregisterComponents(){

        if (this.domain != null && jmxManagement != null) {
            try {
                if (globalRequestProcessorName != null) {
                    jmxManagement.unregisterComponent(globalRequestProcessorName);
                }
                if (keepAliveMbeanName != null) {
                    jmxManagement.unregisterComponent(keepAliveMbeanName);
                }
                if (pwcConnectionQueueMbeanName != null) {
                    jmxManagement.unregisterComponent(pwcConnectionQueueMbeanName);
                }
                if (pwcFileCacheMbeanName != null) {
                    jmxManagement.unregisterComponent(pwcFileCacheMbeanName);
                }                    
            } catch (Exception ex) {
                logger.log(Level.WARNING,
                           "mbeanDeregistrationException",
                           new Object[]{new Integer(port),ex});
            }
        }
    }

    
    /**
     * Enable gathering of monitoring datas.
     */
    public void enableMonitoring(){
        isMonitoringEnabled = true;
        enablePipelineStats();      
        if ( readThreads != null ) {
            for (int i = 0; i < readThreads.length; i++) {
                ((SelectorReadThread)readThreads[i]).isMonitoringEnabled = true;
            }
        }
        fileCacheFactory.setIsMonitoringEnabled(isMonitoringEnabled);
    }
    
    
    /**
     * Disable gathering of monitoring datas. 
     */
    public void disableMonitoring(){
        disablePipelineStats();  
        if ( readThreads != null ) {
            for (int i = 0; i < readThreads.length; i++) {
                ((SelectorReadThread)readThreads[i]).isMonitoringEnabled = false;
            }
        }
        fileCacheFactory.setIsMonitoringEnabled(isMonitoringEnabled);        
    }

    
    /**
     * Returns <code>true</code> if monitoring has been enabled, 
     * <code>false</code> otherwise.
     */
    public boolean isMonitoringEnabled() {
        return isMonitoringEnabled;
    }

    
    public RequestGroupInfo getRequestGroupInfo() {
        return globalRequestProcessor;
    }


    public KeepAliveStats getKeepAliveStats() {
        return keepAliveStats;
    }


    /*
     * Initializes the web container monitoring level from the domain.xml.
     */
    protected void initMonitoringLevel() {
        pipelineStat = new PipelineStatistic(port);
        pipelineStat.setQueueSizeInBytes(maxQueueSizeInBytes);
    }
 
    
    public int getMaxHttpHeaderSize() {
        return maxHttpHeaderSize;
    }
    
    public void setMaxHttpHeaderSize(int maxHttpHeaderSize) {
        this.maxHttpHeaderSize = maxHttpHeaderSize;
    }
    
        
    /**
     * The minimun threads created at startup.
     */ 
    public void setMinThreads(int minWorkerThreads){
        this.minWorkerThreads = minWorkerThreads;
    }


    /**
     * Set the request input buffer size
     */
    public void setBufferSize(int requestBufferSize){
        this.requestBufferSize = requestBufferSize;
    }
    

    /**
     * Return the request input buffer size
     */
    public int getBufferSize(){
        return requestBufferSize;
    }
    
    
    public Selector getSelector(){
        return selector;
    }

    /************************* PWCThreadPool Stats *************************/

    public int getCountThreadsStats() {

        int ret = processorPipeline.getCurrentThreadCount();

        if (readPipeline != null
                && readPipeline != processorPipeline) {
            ret += readPipeline.getCurrentThreadCount();
        }

        return ret;
    }


    public int getCountThreadsIdleStats() {

        int ret = processorPipeline.getWaitingThread();

        if (readPipeline != null
                && readPipeline != processorPipeline) {
            ret += readPipeline.getWaitingThread();
        }

        return ret;
    }


    /************************* HTTPListener Stats *************************/

    public int getCurrentThreadCountStats() {

        int ret = processorPipeline.getCurrentThreadCount();

        if (readPipeline != null
                && readPipeline != processorPipeline) {
            ret += readPipeline.getCurrentThreadCount();
        }

        return ret;
    }


    public int getCurrentThreadsBusyStats() {

        int ret = processorPipeline.getCurrentThreadsBusy();

        if (readPipeline != null
                && readPipeline != processorPipeline) {
            ret += readPipeline.getCurrentThreadsBusy();
        }
 
        return ret;
    }

    public int getMaxSpareThreadsStats() {

        int ret = processorPipeline.getMaxSpareThreads();
 
        if (readPipeline != null
                && readPipeline != processorPipeline) {
            ret += readPipeline.getMaxSpareThreads();
        }

        return ret;
    }


    public int getMinSpareThreadsStats() {

        int ret = processorPipeline.getMinSpareThreads();

        if (readPipeline != null
                && readPipeline != processorPipeline) {
            ret += readPipeline.getMinSpareThreads();
        }

        return ret;
    }


    public int getMaxThreadsStats() {

        int ret = processorPipeline.getMaxThreads();

        if (readPipeline != null
                && readPipeline != processorPipeline) {
            ret += readPipeline.getMaxThreads();
        }
        
        return ret;
    }


    //------------------------------------------------- FileCache config -----/

    
    /**
     * Remove a context path from the <code>FileCache</code>.
     */
    public void removeCacheEntry(String contextPath){  
        ConcurrentHashMap<String,FileCacheEntry> 
                cachedEntries = fileCacheFactory.getCache();
        
        if ( cachedEntries == null){
            return;
        }
        
        Iterator<String> iterator = cachedEntries.keySet().iterator();
        String cachedPath;
        while (iterator.hasNext()){
            cachedPath = iterator.next();
            if ( cachedPath.startsWith(contextPath) ){
                cachedEntries.remove(cachedPath).run();
            }            
        }
    }
   
    
    /**
     * The timeout in seconds before remove a <code>FileCacheEntry</code>
     * from the <code>fileCache</code>
     */
    public void setSecondsMaxAge(int sMaxAges){
        secondsMaxAge = sMaxAges;
    }
    
    
    /**
     * Set the maximum entries this cache can contains.
     */
    public void setMaxCacheEntries(int mEntries){
        maxCacheEntries = mEntries;
    }

    
    /**
     * Return the maximum entries this cache can contains.
     */    
    public int getMaxCacheEntries(){
        return maxCacheEntries;
    }
    
    
    /**
     * Set the maximum size a <code>FileCacheEntry</code> can have.
     */
    public void setMinEntrySize(long mSize){
        minEntrySize = mSize;
    }
    
    
    /**
     * Get the maximum size a <code>FileCacheEntry</code> can have.
     */
    public long getMinEntrySize(){
        return minEntrySize;
    }
     
    
    /**
     * Set the maximum size a <code>FileCacheEntry</code> can have.
     */
    public void setMaxEntrySize(long mEntrySize){
        maxEntrySize = mEntrySize;
    }
    
    
    /**
     * Get the maximum size a <code>FileCacheEntry</code> can have.
     */
    public long getMaxEntrySize(){
        return maxEntrySize;
    }
    
    
    /**
     * Set the maximum cache size
     */ 
    public void setMaxLargeCacheSize(long mCacheSize){
        maxLargeFileCacheSize = mCacheSize;
    }

    
    /**
     * Get the maximum cache size
     */ 
    public long getMaxLargeCacheSize(){
        return maxLargeFileCacheSize;
    }
    
    
    /**
     * Set the maximum cache size
     */ 
    public void setMaxSmallCacheSize(long mCacheSize){
        maxSmallFileCacheSize = mCacheSize;
    }
    
    
    /**
     * Get the maximum cache size
     */ 
    public long getMaxSmallCacheSize(){
        return maxSmallFileCacheSize;
    }    

    
    /**
     * Is the fileCache enabled.
     */
    public boolean isFileCacheEnabled(){
        return isFileCacheEnabled;
    }

    
    /**
     * Is the file caching mechanism enabled.
     */
    public void setFileCacheIsEnabled(boolean isFileCacheEnabled){
        this.isFileCacheEnabled = isFileCacheEnabled;
    }
   
    
    /**
     * Is the large file cache support enabled.
     */
    public void setLargeFileCacheEnabled(boolean isLargeEnabled){
        this.isLargeFileCacheEnabled = isLargeEnabled;
    }
   
    
    /**
     * Is the large file cache support enabled.
     */
    public boolean getLargeFileCacheEnabled(){
        return isLargeFileCacheEnabled;
    }    

    // --------------------------------------------------------------------//
    
    /**
     * Enable the <code>AsyncHandler</code> used when asynchronous
     */
    public void setEnableAsyncExecution(boolean asyncExecution){
        this.asyncExecution = asyncExecution;     
        if (running){
            reconfigureAsyncExecution();
        }
    }
    
       
    /**
     * Return true when asynchronous execution is 
     * enabled.
     */    
    public boolean getEnableAsyncExecution(){
        return asyncExecution;
    }
    
    
    /**
     * Set the <code>AsyncHandler</code> used when asynchronous execution is 
     * enabled.
     */
    public void setAsyncHandler(AsyncHandler asyncHandler){
        this.asyncHandler = asyncHandler;     
    }
    
       
    /**
     * Return the <code>AsyncHandler</code> used when asynchronous execution is 
     * enabled.
     */    
    public AsyncHandler getAsyncHandler(){
        return asyncHandler;
    }
    
    
    /**
     * Set the logger used by this instance.
     */
    public static void setLogger(Logger l){
        if ( l != null )
            logger = l;
    }

    
    /**
     * Return the logger used by the Grizzly classes.
     */
    public static Logger logger(){
        return logger;
    }
    
    
    /**
     * Set the document root folder
     */
    public static void setWebAppRootPath(String rf){
        rootFolder = rf;
    }
    
    
    /**
     * Return the folder's root where application are deployed.
     */
    public static String getWebAppRootPath(){
        return rootFolder;
    }
    
    
    public int getMaxReadWorkerThreads(){
        return maxReadWorkerThreads;
    }
    
    
    /**
     * Return the <code>Pipeline</code> used to handle OP_READ.
     */
    public Pipeline getReadPipeline(){
        return readPipeline;
    }
    
    // ------------------------------------------------------ Compression ---//


    protected void configureCompression(DefaultProcessorTask processorTask){
        processorTask.setCompression(compression);
        processorTask.addNoCompressionUserAgent(noCompressionUserAgents);
        processorTask.addCompressableMimeType(compressableMimeTypes);
        processorTask.setCompressionMinSize(compressionMinSize);
        processorTask.addRestrictedUserAgent(restrictedUserAgents);
    }


    // ------------------------------------------------------ Debug ---------//
    
        
    /**
     * Display the Grizzly configuration parameters.
     */
    private void displayConfiguration(){
       if (displayConfiguration){
            logger.log(Level.INFO,
                    "\n Grizzly configuration for port " 
                    + port 
                    + "\n\t maxThreads: " 
                    + maxProcessorWorkerThreads 
                    + "\n\t minThreads: " 
                    + minWorkerThreads 
                    + "\n\t ByteBuffer size: " 
                    + requestBufferSize 
                    + "\n\t useDirectByteBuffer: "
                    + useDirectByteBuffer                       
                    + "\n\t useByteBufferView: "                        
                    + useByteBufferView                   
                    + "\n\t maxHttpHeaderSize: " 
                    + maxHttpHeaderSize
                    + "\n\t maxKeepAliveRequests: "
                    + maxKeepAliveRequests
                    + "\n\t keepAliveTimeoutInSeconds: "
                    + keepAliveTimeoutInSeconds
                    + "\n\t Static File Cache enabled: "                        
                    + isFileCacheEnabled                    
                    + "\n\t Stream Algorithm : "                        
                    + algorithmClassName        
                    + "\n\t Pipeline : "                        
                    + pipelineClassName                    
                    + "\n\t Round Robin Selector Algorithm enabled: "                        
                    + ( multiSelectorsCount > 1 )
                    + "\n\t Round Robin Selector pool size: "
                    + multiSelectorsCount
                    + "\n\t recycleTasks: "                        
                    + recycleTasks
                    + "\n\t Asynchronous Request Processing enabled: " 
                    + asyncExecution); 
        }
    }

    
    /**
     * Return <code>true</code> if the reponse is buffered.
     */
    public boolean getBufferResponse() {
        return bufferResponse;
    }

    
    /**
     * <code>true</code>if the reponse willk be buffered.
     */
    public void setBufferResponse(boolean bufferResponse) {
        this.bufferResponse = bufferResponse;
    }
    
    
    /**
     * Enable Comet/Poll request support.
     */
    public void enableCometSupport(boolean enableComet){
        if ( enableComet ){
            asyncExecution = true;
            setBufferResponse(false);    
            isFileCacheEnabled = false;
            isLargeFileCacheEnabled = false;
            asyncHandler = new DefaultAsyncHandler();
            asyncHandler.addAsyncFilter(new CometAsyncFilter()); 
            
            SelectorThread.logger()
                .log(Level.INFO,"Enabling Grizzly ARP Comet support.");
            
        } else {
            asyncExecution = false;
        }
    }
    
       
    /**
     * Enable Application Resource Allocation Grizzly Extension.
     */
    public void enableRcmSupport(boolean rcmSupport){
        if (rcmSupport){
            // Avoid dependency on that extension. If the package is renamed
            // an exception will be logged later when the SelectorThread start.
            pipelineClassName = "com.sun.enterprise.web.ara.IsolationPipeline";
        }        
    }
    

    // ----------------------------------------------Setter/Getter-----------//
    
    
    public int getServerTimeout() {
        return serverTimeout;
    }

    public InetAddress getInet() {
        return inet;
    }

    public void setInet(InetAddress inet) {
        this.inet = inet;
    }

    public ServerSocket getServerSocket() {
        return serverSocket;
    }

    public void setServerSocket(ServerSocket serverSocket) {
        this.serverSocket = serverSocket;
    }

    public ServerSocketChannel getServerSocketChannel() {
        return serverSocketChannel;
    }

    public void setServerSocketChannel(ServerSocketChannel serverSocketChannel){
        this.serverSocketChannel = serverSocketChannel;
    }

    public boolean isInitialized() {
        return initialized;
    }

    public void setInitialized(boolean initialized) {
        this.initialized = initialized;
    }

    public void setRunning(boolean running) {
        this.running = running;
    }

    public void setDomain(String domain) {
        this.domain = domain;
    }

    public ObjectName getOname() {
        return oname;
    }

    public void setOname(ObjectName oname) {
        this.oname = oname;
    }

    public ObjectName getGlobalRequestProcessorName() {
        return globalRequestProcessorName;
    }

    public void setGlobalRequestProcessorName
            (ObjectName globalRequestProcessorName) {
        this.globalRequestProcessorName = globalRequestProcessorName;
    }

    public ObjectName getKeepAliveMbeanName() {
        return keepAliveMbeanName;
    }

    public void setKeepAliveMbeanName(ObjectName keepAliveMbeanName) {
        this.keepAliveMbeanName = keepAliveMbeanName;
    }

    public ObjectName getPwcConnectionQueueMbeanName() {
        return pwcConnectionQueueMbeanName;
    }

    public void setPwcConnectionQueueMbeanName
            (ObjectName pwcConnectionQueueMbeanName) {
        this.pwcConnectionQueueMbeanName = pwcConnectionQueueMbeanName;
    }

    public ObjectName getPwcFileCacheMbeanName() {
        return pwcFileCacheMbeanName;
    }

    public void setPwcFileCacheMbeanName(ObjectName pwcFileCacheMbeanName) {
        this.pwcFileCacheMbeanName = pwcFileCacheMbeanName;
    }

    public MBeanServer getMserver() {
        return mserver;
    }

    public void setMserver(MBeanServer mserver) {
        this.mserver = mserver;
    }

    public ObjectName getProcessorWorkerThreadName() {
        return processorWorkerThreadName;
    }

    public void setProcessorWorkerThreadName
            (ObjectName processorWorkerThreadName) {
        this.processorWorkerThreadName = processorWorkerThreadName;
    }

    public boolean isTcpNoDelay() {
        return tcpNoDelay;
    }

    public int getLinger() {
        return linger;
    }

    public void setLinger(int linger) {
        this.linger = linger;
    }

    public int getSocketTimeout() {
        return socketTimeout;
    }

    public void setSocketTimeout(int socketTimeout) {
        this.socketTimeout = socketTimeout;
    }

    public String getCompression() {
        return compression;
    }

    public void setCompression(String compression) {
        this.compression = compression;
    }

    public String getNoCompressionUserAgents() {
        return noCompressionUserAgents;
    }

    public void setNoCompressionUserAgents(String noCompressionUserAgents) {
        this.noCompressionUserAgents = noCompressionUserAgents;
    }

    public String getRestrictedUserAgents() {
        return restrictedUserAgents;
    }

    public void setRestrictedUserAgents(String restrictedUserAgents) {
        this.restrictedUserAgents = restrictedUserAgents;
    }

    public String getCompressableMimeTypes() {
        return compressableMimeTypes;
    }

    public void setCompressableMimeTypes(String compressableMimeTypes) {
        this.compressableMimeTypes = compressableMimeTypes;
    }

    public int getCompressionMinSize() {
        return compressionMinSize;
    }

    public void setCompressionMinSize(int compressionMinSize) {
        this.compressionMinSize = compressionMinSize;
    }

    public boolean isBufferResponse() {
        return bufferResponse;
    }

    public int getMinReadQueueLength() {
        return minReadQueueLength;
    }

    public void setMinReadQueueLength(int minReadQueueLength) {
        this.minReadQueueLength = minReadQueueLength;
    }

    public int getMinProcessorQueueLength() {
        return minProcessorQueueLength;
    }

    public void setMinProcessorQueueLength(int minProcessorQueueLength) {
        this.minProcessorQueueLength = minProcessorQueueLength;
    }

    public void setSelector(Selector selector) {
        this.selector = selector;
    }

    public int getSelectorReadThreadsCount() {
        return multiSelectorsCount;
    }

    public void setSelectorReadThreadsCount(int multiSelectorsCount) {
        this.multiSelectorsCount = multiSelectorsCount;
    }

    public void setReadPipeline(Pipeline readPipeline) {
        this.readPipeline = readPipeline;
    }

    public Pipeline getProcessorPipeline() {
        return processorPipeline;
    }

    public void setProcessorPipeline(Pipeline processorPipeline) {
        this.processorPipeline = processorPipeline;
    }

    public PipelineStatistic getPipelineStat() {
        return pipelineStat;
    }

    public void setPipelineStat(PipelineStatistic pipelineStat) {
        this.pipelineStat = pipelineStat;
    }

    public String getPipelineClassName() {
        return pipelineClassName;
    }

    public void setPipelineClassName(String pipelineClassName) {
        this.pipelineClassName = pipelineClassName;
    }

    public int getMaxProcessorWorkerThreads() {
        return maxProcessorWorkerThreads;
    }

    public void setMaxProcessorWorkerThreads(int maxProcessorWorkerThreads) {
        this.maxProcessorWorkerThreads = maxProcessorWorkerThreads;
    }

    public void setMaxReadWorkerThreads(int maxReadWorkerThreads) {
        this.maxReadWorkerThreads = maxReadWorkerThreads;
    }

    public int getMinWorkerThreads() {
        return minWorkerThreads;
    }

    public void setMinWorkerThreads(int minWorkerThreads) {
        this.minWorkerThreads = minWorkerThreads;
    }

    public int getThreadsIncrement() {
        return threadsIncrement;
    }

    public void setThreadsIncrement(int threadsIncrement) {
        this.threadsIncrement = threadsIncrement;
    }

    public int getThreadsTimeout() {
        return threadsTimeout;
    }

    public void setThreadsTimeout(int threadsTimeout) {
        this.threadsTimeout = threadsTimeout;
    }

    public boolean isUseDirectByteBuffer() {
        return useDirectByteBuffer;
    }

    public void setUseDirectByteBuffer(boolean useDirectByteBuffer) {
        this.useDirectByteBuffer = useDirectByteBuffer;
    }

    public RequestGroupInfo getGlobalRequestProcessor() {
        return globalRequestProcessor;
    }

    public void setGlobalRequestProcessor(RequestGroupInfo globalRequestProcessor) {
        this.globalRequestProcessor = globalRequestProcessor;
    }

    public void setKeepAliveStats(KeepAliveStats keepAliveStats) {
        this.keepAliveStats = keepAliveStats;
    }

    public boolean isDisplayConfiguration() {
        return displayConfiguration;
    }

    public void setDisplayConfiguration(boolean displayConfiguration) {
        this.displayConfiguration = displayConfiguration;
    }

    public boolean isIsMonitoringEnabled() {
        return isMonitoringEnabled;
    }

    public void setIsMonitoringEnabled(boolean isMonitoringEnabled) {
        this.isMonitoringEnabled = isMonitoringEnabled;
    }

    public int getCurrentConnectionNumber() {
        return currentConnectionNumber;
    }

    public void setCurrentConnectionNumber(int currentConnectionNumber) {
        this.currentConnectionNumber = currentConnectionNumber;
    }

    public void setIsWaiting(boolean isWaiting) {
        this.isWaiting = isWaiting;
    }

    public boolean isUseByteBufferView() {
        return useByteBufferView;
    }

    public void setUseByteBufferView(boolean useByteBufferView) {
        this.useByteBufferView = useByteBufferView;
    }

    public int getKaTimeout() {
        return kaTimeout;
    }

    public void setKaTimeout(int kaTimeout) {
        this.kaTimeout = kaTimeout;
    }

    public boolean isRecycleTasks() {
        return recycleTasks;
    }

    public void setRecycleTasks(boolean recycleTasks) {
        this.recycleTasks = recycleTasks;
    }

    public static int getSelectorTimeout() {
        return selectorTimeout;
    }

    public static void setSelectorTimeout(int aSelectorTimeout) {
        selectorTimeout = aSelectorTimeout;
    }

    public int getMaxQueueSizeInBytes() {
        return maxQueueSizeInBytes;
    }

    public void setMaxQueueSizeInBytes(int maxQueueSizeInBytes) {
        this.maxQueueSizeInBytes = maxQueueSizeInBytes;
    }

    public Class getAlgorithmClass() {
        return algorithmClass;
    }

    public void setAlgorithmClass(Class algorithmClass) {
        this.algorithmClass = algorithmClass;
    }

    public String getAlgorithmClassName() {
        return algorithmClassName;
    }

    public void setAlgorithmClassName(String algorithmClassName) {
        this.algorithmClassName = algorithmClassName;
    }

    public int getSsBackLog() {
        return ssBackLog;
    }

    public void setSsBackLog(int ssBackLog) {
        this.ssBackLog = ssBackLog;
    }

    public long getNextKeysExpiration() {
        return nextKeysExpiration;
    }

    public void setNextKeysExpiration(long nextKeysExpiration) {
        this.nextKeysExpiration = nextKeysExpiration;
    }

    public String getDefaultResponseType() {
        return defaultResponseType;
    }

    public void setDefaultResponseType(String defaultResponseType) {
        this.defaultResponseType = defaultResponseType;
    }

    public String getForcedRequestType() {
        return forcedRequestType;
    }

    public void setForcedRequestType(String forcedRequestType) {
        this.forcedRequestType = forcedRequestType;
    }

    public static String getRootFolder() {
        return rootFolder;
    }

    public static void setRootFolder(String aRootFolder) {
        rootFolder = aRootFolder;
    }

    public ConcurrentLinkedQueue<SelectionKey> getKeysToEnable() {
        return keysToEnable;
    }

    public void setKeysToEnable(ConcurrentLinkedQueue<SelectionKey> keysToEnable) {
        this.keysToEnable = keysToEnable;
    }

    public ConcurrentLinkedQueue<ProcessorTask> getProcessorTasks() {
        return processorTasks;
    }

    public void setProcessorTasks(ConcurrentLinkedQueue<ProcessorTask> 
            processorTasks) {
        this.processorTasks = processorTasks;
    }

    public ConcurrentLinkedQueue<ReadTask> getReadTasks() {
        return readTasks;
    }

    public void setReadTasks(ConcurrentLinkedQueue<ReadTask> readTasks) {
        this.readTasks = readTasks;
    }

    public ConcurrentLinkedQueue<ProcessorTask> getActiveProcessorTasks() {
        return activeProcessorTasks;
    }

    public void setActiveProcessorTasks(ConcurrentLinkedQueue<ProcessorTask> 
            activeProcessorTasks) {
        this.activeProcessorTasks = activeProcessorTasks;
    }

    public int getCurReadThread() {
        return curReadThread;
    }

    public void setCurReadThread(int curReadThread) {
        this.curReadThread = curReadThread;
    }

    public static Logger getLogger() {
        return logger;
    }

    public Management getManagement() {
        return jmxManagement;
    }

    public void setManagement(Management jmxManagement) {
        this.jmxManagement = jmxManagement;
    }

    public ClassLoader getClassLoader() {
        return classLoader;
    }

    
    /**
     * Set the <code>ClassLoader</code> used to load configurable
     * classes (Pipeline, StreamAlgorithm).
     */
    public void setClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    public boolean isEnableNioLogging() {
        return enableNioLogging;
    }

    public void setEnableNioLogging(boolean enableNioLogging) {
        this.enableNioLogging = enableNioLogging;
    }

    public int getMaxPostSize() {
        return maxPostSize;
    }

    public void setMaxPostSize(int maxPostSize) {
        this.maxPostSize = maxPostSize;
    }

    public void setReuseAddress(boolean reuseAddress){
        this.reuseAddress = reuseAddress;
    }

    public boolean getReuseAddress(){
        return reuseAddress;
    }

    public KeepAlivePipeline getKeepAlivePipeline() {
        return keepAlivePipeline;
    }

    public void setKeepAlivePipeline(KeepAlivePipeline keepAlivePipeline) {
        this.keepAlivePipeline = keepAlivePipeline;
    }

    
    /**
     * Set the flag to control upload time-outs.
     */
    public void setDisableUploadTimeout(boolean isDisabled) {
        disableUploadTimeout = isDisabled;
    }

    
    /**
     * Get the flag that controls upload time-outs.
     */
    public boolean getDisableUploadTimeout() {
        return disableUploadTimeout;
    }
    
    
    /**
     * Set the upload timeout.
     */
    public void setUploadTimeout(int uploadTimeout) {
        this.uploadTimeout = uploadTimeout ;
    }

    
    /**
     * Get the upload timeout.
     */
    public int getTimeout() {
        return uploadTimeout;
    }
}