FileDocCategorySizeDatePackage
PoolTcpEndpoint.javaAPI DocApache Tomcat 6.0.1420332Fri Jul 20 04:20:32 BST 2007org.apache.tomcat.util.net

PoolTcpEndpoint

public class PoolTcpEndpoint extends Object implements Runnable
Handle incoming TCP connections. This class implement a simple server model: one listener thread accepts on a socket and creates a new worker thread for each incoming connection. More advanced Endpoints will reuse the threads, use queues, etc.
author
James Duncan Davidson [duncan@eng.sun.com]
author
Jason Hunter [jch@eng.sun.com]
author
James Todd [gonzo@eng.sun.com]
author
Costin@eng.sun.com
author
Gal Shachor [shachor@il.ibm.com]
author
Yoav Shapira

Fields Summary
static org.apache.juli.logging.Log
log
private org.apache.tomcat.util.res.StringManager
sm
private static final int
BACKLOG
private static final int
TIMEOUT
private final Object
threadSync
private int
backlog
private int
serverTimeout
private InetAddress
inet
private int
port
private ServerSocketFactory
factory
private ServerSocket
serverSocket
private volatile boolean
running
private volatile boolean
paused
private boolean
initialized
private boolean
reinitializing
static final int
debug
protected boolean
tcpNoDelay
protected int
linger
protected int
socketTimeout
private boolean
lf
TcpConnectionHandler
handler
org.apache.tomcat.util.threads.ThreadPoolRunnable
listener
org.apache.tomcat.util.threads.ThreadPool
tp
private Thread
thread
private Stack
workerThreads
private int
curThreads
private int
maxThreads
private Vector
created
Constructors Summary
public PoolTcpEndpoint()


    
      
	tp = new ThreadPool();
    
public PoolTcpEndpoint(org.apache.tomcat.util.threads.ThreadPool tp)

        this.tp=tp;
    
Methods Summary
java.net.SocketacceptSocket()

        if( !running || serverSocket==null ) return null;

        Socket accepted = null;

    	try {
            if(factory==null) {
                accepted = serverSocket.accept();
            } else {
                accepted = factory.acceptSocket(serverSocket);
            }
            if (null == accepted) {
                log.warn(sm.getString("endpoint.warn.nullSocket"));
            } else {
                if (!running) {
                    accepted.close();  // rude, but unlikely!
                    accepted = null;
                } else if (factory != null) {
                    factory.initSocket( accepted );
                }
            }
        }
        catch(InterruptedIOException iioe) {
            // normal part -- should happen regularly so
            // that the endpoint can release if the server
            // is shutdown.
        }
        catch (AccessControlException ace) {
            // When using the Java SecurityManager this exception
            // can be thrown if you are restricting access to the
            // socket with SocketPermission's.
            // Log the unauthorized access and continue
            String msg = sm.getString("endpoint.warn.security",
                                      serverSocket, ace);
            log.warn(msg);
        }
        catch (IOException e) {

            String msg = null;

            if (running) {
                msg = sm.getString("endpoint.err.nonfatal",
                        serverSocket, e);
                log.error(msg, e);
            }

            if (accepted != null) {
                try {
                    accepted.close();
                } catch(Throwable ex) {
                    msg = sm.getString("endpoint.err.nonfatal",
                                       accepted, ex);
                    log.warn(msg, ex);
                }
                accepted = null;
            }

            if( ! running ) return null;
            reinitializing = true;
            // Restart endpoint when getting an IOException during accept
            synchronized (threadSync) {
                if (reinitializing) {
                    reinitializing = false;
                    // 1) Attempt to close server socket
                    closeServerSocket();
                    initialized = false;
                    // 2) Reinit endpoint (recreate server socket)
                    try {
                        msg = sm.getString("endpoint.warn.reinit");
                        log.warn(msg);
                        initEndpoint();
                    } catch (Throwable t) {
                        msg = sm.getString("endpoint.err.nonfatal",
                                           serverSocket, t);
                        log.error(msg, t);
                    }
                    // 3) If failed, attempt to restart endpoint
                    if (!initialized) {
                        msg = sm.getString("endpoint.warn.restart");
                        log.warn(msg);
                        try {
                            stopEndpoint();
                            initEndpoint();
                            startEndpoint();
                        } catch (Throwable t) {
                            msg = sm.getString("endpoint.err.fatal",
                                               serverSocket, t);
                            log.error(msg, t);
                        }
                        // Current thread is now invalid: kill it
                        throw new ThreadDeath();
                    }
                }
            }

        }

        return accepted;
    
protected voidcloseServerSocket()

        if (!paused)
            unlockAccept();
        try {
            if( serverSocket!=null)
                serverSocket.close();
        } catch(Exception e) {
            log.error(sm.getString("endpoint.err.close"), e);
        }
        serverSocket = null;
    
private MasterSlaveWorkerThreadcreateWorkerThread()
Create (or allocate) and return an available processor for use in processing a specific HTTP request, if possible. If the maximum allowed processors have already been created and are in use, return null instead.


        synchronized (workerThreads) {
            if (workerThreads.size() > 0) {
                return ((MasterSlaveWorkerThread) workerThreads.pop());
            }
            if ((maxThreads > 0) && (curThreads < maxThreads)) {
                return (newWorkerThread());
            } else {
                if (maxThreads < 0) {
                    return (newWorkerThread());
                } else {
                    return (null);
                }
            }
        }

    
public java.net.InetAddressgetAddress()

	    return inet;
    
public intgetBacklog()

        return backlog;
    
public TcpConnectionHandlergetConnectionHandler()

	    return handler;
    
public intgetCurrentThreadCount()

        return curThreads;
    
public intgetCurrentThreadsBusy()

        return curThreads - workerThreads.size();
    
public intgetMaxSpareThreads()

        return tp.getMaxSpareThreads();
    
public intgetMaxThreads()

        return tp.getMaxThreads();
    
public intgetMinSpareThreads()

        return tp.getMinSpareThreads();
    
public intgetPort()

        return port;
    
public intgetServerSoTimeout()

        return serverTimeout;
    
ServerSocketFactorygetServerSocketFactory()

 	    return factory;
   
public intgetSoLinger()

        return linger;
    
public intgetSoTimeout()

        return socketTimeout;
    
public java.lang.StringgetStrategy()

        if (lf) {
            return "lf";
        } else {
            return "ms";
        }
    
public booleangetTcpNoDelay()

        return tcpNoDelay;
    
public intgetThreadPriority()

      return tp.getThreadPriority();
    
public voidinitEndpoint()

        try {
            if(factory==null)
                factory=ServerSocketFactory.getDefault();
            if(serverSocket==null) {
                try {
                    if (inet == null) {
                        serverSocket = factory.createSocket(port, backlog);
                    } else {
                        serverSocket = factory.createSocket(port, backlog, inet);
                    }
                } catch ( BindException be ) {
                    throw new BindException(be.getMessage() + ":" + port);
                }
            }
            if( serverTimeout >= 0 )
                serverSocket.setSoTimeout( serverTimeout );
        } catch( IOException ex ) {
            throw ex;
        } catch( InstantiationException ex1 ) {
            throw ex1;
        }
        initialized = true;
    
public booleanisPaused()

	return paused;
    
public booleanisRunning()

	return running;
    
private MasterSlaveWorkerThreadnewWorkerThread()
Create and return a new processor suitable for processing HTTP requests and returning the corresponding responses.


        MasterSlaveWorkerThread workerThread = 
            new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads));
        workerThread.start();
        created.addElement(workerThread);
        return (workerThread);

    
public voidpauseEndpoint()

        if (running && !paused) {
            paused = true;
            unlockAccept();
        }
    
voidprocessSocket(java.net.Socket s, TcpConnection con, java.lang.Object[] threadData)

        // Process the connection
        int step = 1;
        try {
            
            // 1: Set socket options: timeout, linger, etc
            setSocketOptions(s);
            
            // 2: SSL handshake
            step = 2;
            if (getServerSocketFactory() != null) {
                getServerSocketFactory().handshake(s);
            }
            
            // 3: Process the connection
            step = 3;
            con.setEndpoint(this);
            con.setSocket(s);
            getConnectionHandler().processConnection(con, threadData);
            
        } catch (SocketException se) {
            log.debug(sm.getString("endpoint.err.socket", s.getInetAddress()),
                    se);
            // Try to close the socket
            try {
                s.close();
            } catch (IOException e) {
            }
        } catch (Throwable t) {
            if (step == 2) {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("endpoint.err.handshake"), t);
                }
            } else {
                log.error(sm.getString("endpoint.err.unexpected"), t);
            }
            // Try to close the socket
            try {
                s.close();
            } catch (IOException e) {
            }
        } finally {
            if (con != null) {
                con.recycle();
            }
        }
    
voidrecycleWorkerThread(MasterSlaveWorkerThread workerThread)
Recycle the specified Processor so that it can be used again.

param
processor The processor to be recycled

        workerThreads.push(workerThread);
    
public voidresumeEndpoint()

        if (running) {
            paused = false;
        }
    
public voidrun()
The background thread that listens for incoming TCP/IP connections and hands them off to an appropriate processor.


        // Loop until we receive a shutdown command
        while (running) {

            // Loop if endpoint is paused
            while (paused) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }

            // Allocate a new worker thread
            MasterSlaveWorkerThread workerThread = createWorkerThread();
            if (workerThread == null) {
                try {
                    // Wait a little for load to go down: as a result, 
                    // no accept will be made until the concurrency is
                    // lower than the specified maxThreads, and current
                    // connections will wait for a little bit instead of
                    // failing right away.
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    // Ignore
                }
                continue;
            }
            
            // Accept the next incoming connection from the server socket
            Socket socket = acceptSocket();

            // Hand this socket off to an appropriate processor
            workerThread.assign(socket);

            // The processor will recycle itself when it finishes

        }

        // Notify the threadStop() method that we have shut ourselves down
        synchronized (threadSync) {
            threadSync.notifyAll();
        }

    
public voidsetAddress(java.net.InetAddress inet)

	    this.inet=inet;
    
public voidsetBacklog(int backlog)
Allows the server developer to specify the backlog that should be used for server sockets. By default, this value is 100.

	if( backlog>0)
	    this.backlog = backlog;
    
public voidsetConnectionHandler(TcpConnectionHandler handler)

    	this.handler=handler;
    
public voidsetMaxSpareThreads(int maxThreads)

	if(maxThreads > 0) 
	    tp.setMaxSpareThreads(maxThreads);
    
public voidsetMaxThreads(int maxThreads)

	if( maxThreads > 0)
	    tp.setMaxThreads(maxThreads);
    
public voidsetMinSpareThreads(int minThreads)

	if(minThreads > 0) 
	    tp.setMinSpareThreads(minThreads);
    
public voidsetPort(int port)

        this.port=port;
    
public voidsetServerSoTimeout(int i)

	serverTimeout=i;
    
public voidsetServerSocket(java.net.ServerSocket ss)

	    serverSocket = ss;
    
public voidsetServerSocketFactory(ServerSocketFactory factory)

	    this.factory=factory;
    
public voidsetServerTimeout(int timeout)
Sets the timeout in ms of the server sockets created by this server. This method allows the developer to make servers more or less responsive to having their server sockets shut down.

By default this value is 1000ms.

	this.serverTimeout = timeout;
    
public voidsetSoLinger(int i)

	linger=i;
    
public voidsetSoTimeout(int i)

	socketTimeout=i;
    
voidsetSocketOptions(java.net.Socket socket)

        if(linger >= 0 ) 
            socket.setSoLinger( true, linger);
        if( tcpNoDelay )
            socket.setTcpNoDelay(tcpNoDelay);
        if( socketTimeout > 0 )
            socket.setSoTimeout( socketTimeout );
    
public voidsetStrategy(java.lang.String strategy)

        if ("ms".equals(strategy)) {
            lf = false;
        } else {
            lf = true;
        }
    
public voidsetTcpNoDelay(boolean b)

	tcpNoDelay=b;
    
public voidsetThreadPriority(int threadPriority)

      tp.setThreadPriority(threadPriority);
    
public voidstartEndpoint()

        if (!initialized) {
            initEndpoint();
        }
        if (lf) {
            tp.start();
        }
        running = true;
        paused = false;
        if (lf) {
            listener = new LeaderFollowerWorkerThread(this);
            tp.runIt(listener);
        } else {
            maxThreads = getMaxThreads();
            threadStart();
        }
    
public voidstopEndpoint()

        if (running) {
            if (lf) {
                tp.shutdown();
            }
            running = false;
            if (serverSocket != null) {
                closeServerSocket();
            }
            if (!lf) {
                threadStop();
            }
            initialized=false ;
        }
    
private voidthreadStart()
Start the background processing thread.

        thread = new Thread(this, tp.getName());
        thread.setPriority(getThreadPriority());
        thread.setDaemon(true);
        thread.start();
    
private voidthreadStop()
Stop the background processing thread.

        thread = null;
    
protected voidunlockAccept()

        Socket s = null;
        try {
            // Need to create a connection to unlock the accept();
            if (inet == null) {
                s = new Socket("127.0.0.1", port);
            } else {
                s = new Socket(inet, port);
                    // setting soLinger to a small value will help shutdown the
                    // connection quicker
                s.setSoLinger(true, 0);
            }
        } catch(Exception e) {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
            }
        } finally {
            if (s != null) {
                try {
                    s.close();
                } catch (Exception e) {
                    // Ignore
                }
            }
        }