FileDocCategorySizeDatePackage
ServerConnection.javaAPI DocApache James 2.3.121480Fri Jan 12 12:56:34 GMT 2007org.apache.james.util.connection

ServerConnection

public class ServerConnection extends org.apache.avalon.framework.logger.AbstractLogEnabled implements org.apache.avalon.framework.activity.Initializable, Runnable
Represents a single server socket managed by a connection manager. The connection manager will spawn a single ServerConnection for each server socket that the connection manager is managing.

Fields Summary
private static int
POLLING_INTERVAL
This is a hack to deal with the fact that there appears to be no platform-independent way to break out of a ServerSocket accept() call. On some platforms closing either the ServerSocket itself, or its associated InputStream, causes the accept method to exit. Unfortunately, this behavior is not consistent across platforms. The deal with this, we introduce a polling loop of 20 seconds for the server socket. This introduces a cost across platforms, but is necessary to maintain cross-platform functionality.
private ServerSocket
serverSocket
The server socket which this connection is managing
private org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory
handlerFactory
The connection handler factory that generates connection handlers to manage client connections to this server socket
private org.apache.avalon.excalibur.pool.Pool
runnerPool
The pool that produces ClientConnectionRunners
private org.apache.avalon.excalibur.pool.ObjectFactory
theRunnerFactory
The factory used to provide ClientConnectionRunner objects
private org.apache.excalibur.thread.ThreadPool
connThreadPool
The thread pool used to spawn individual threads used to manage each client connection.
private int
socketTimeout
The timeout for client sockets spawned off this connection.
private int
maxOpenConn
The maximum number of open client connections that this server connection will allow.
private final ArrayList
clientConnectionRunners
A collection of client connection runners.
private Thread
serverConnectionThread
The thread used to manage this server connection.
Constructors Summary
public ServerConnection(ServerSocket serverSocket, org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory handlerFactory, org.apache.excalibur.thread.ThreadPool threadPool, int timeout, int maxOpenConn)
The sole constructor for a ServerConnection.

param
serverSocket the ServerSocket associated with this ServerConnection
param
handlerFactory the factory that generates ConnectionHandlers for the client connections spawned off this ServerConnection
param
threadPool the ThreadPool used to obtain handler threads
param
timeout the client idle timeout for this ServerConnection's client connections
param
maxOpenConn the maximum number of open client connections allowed for this ServerConnection


                                                                                                                
      
                             
                             
                             
                              
        this.serverSocket = serverSocket;
        this.handlerFactory = handlerFactory;
        connThreadPool = threadPool;
        socketTimeout = timeout;
        this.maxOpenConn = maxOpenConn;
    
Methods Summary
private org.apache.james.util.connection.ServerConnection$ClientConnectionRunneraddClientConnectionRunner()
Returns a ClientConnectionRunner in the set managed by this ServerConnection object.

param
clientConnectionRunner the ClientConnectionRunner to be added

        synchronized (clientConnectionRunners) {
            ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
            clientConnectionRunners.add(clientConnectionRunner);
            if (getLogger().isDebugEnabled()) {
                getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size());
            }
            return clientConnectionRunner;
        }
    
public voiddispose()
The dispose operation is called by the owning ConnectionManager at the end of its lifecycle. Cleans up the server connection, forcing everything to finish.

        if (getLogger().isDebugEnabled()) {
            getLogger().debug("Disposing server connection..." + this.toString());
        }
        synchronized( this ) {
            if( null != serverConnectionThread ) {
                // Execution of this block means that the run() method
                // hasn't finished yet.  So we interrupt the thread
                // to terminate run() and wait for the run() method
                // to finish.  The notifyAll() at the end of run() will
                // wake this thread and allow dispose() to end.
                Thread thread = serverConnectionThread;
                serverConnectionThread = null;
                thread.interrupt();
                try {
                    serverSocket.close();
                } catch (IOException ie) {
                    // Ignored - we're doing this to break out of the
                    // accept.  This minimizes the time required to
                    // shutdown the server.  Unfortunately, this is
                    // not guaranteed to work on all platforms.  See
                    // the comments for POLLING_INTERVAL
                }
                try {
                    if (POLLING_INTERVAL > 0) {
                        wait(2L*POLLING_INTERVAL);
                    } else {
                        wait();
                    }
                } catch (InterruptedException ie) {
                    // Expected - just complete dispose()
                }
            }
            ContainerUtil.dispose(runnerPool);
            runnerPool = null;
        }

        getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());

        synchronized (clientConnectionRunners) {
            Iterator runnerIterator = clientConnectionRunners.iterator();
            while( runnerIterator.hasNext() ) {
                ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
                runner.dispose();
                runner = null;
            }
            clientConnectionRunners.clear();
        }

        getLogger().debug("Cleaned up clients - " + this.toString());

    
public voidinitialize()

see
org.apache.avalon.framework.activity.Initializable#initialize()

        runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
        ContainerUtil.enableLogging(runnerPool,getLogger());
        ContainerUtil.initialize(runnerPool);
    
private voidremoveClientConnectionRunner(org.apache.james.util.connection.ServerConnection$ClientConnectionRunner clientConnectionRunner)
Removes a ClientConnectionRunner from the set managed by this ServerConnection object.

param
clientConnectionRunner the ClientConnectionRunner to be removed


       /*
        * checking runnerPool avoids 'dead-lock' when service is disposing :
        * (dispose() calls dispose all runners)
        * but runner is 'running' and cleans up on exit
        * this situation will result in a dead-lock on 'clientConnectionRunners'
        */
        if( runnerPool == null ) {
            getLogger().info("ServerConnection.removeClientConnectionRunner - dispose has been called - so just return : " + clientConnectionRunner );
            return;
        }
        
        synchronized (clientConnectionRunners) {
            if (clientConnectionRunners.remove(clientConnectionRunner)) {
                if (getLogger().isDebugEnabled()) {
                    getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size());
                }
                runnerPool.put(clientConnectionRunner);
            }
        }

        synchronized (this) { notify(); } // match the wait(...) in the run() inner loop before accept().
    
public voidrun()
Provides the body for the thread of execution for a ServerConnection. Connections made to the server socket are passed to an appropriate, newly created, ClientConnectionRunner

        serverConnectionThread = Thread.currentThread();

        int ioExceptionCount = 0;
        try {
            serverSocket.setSoTimeout(POLLING_INTERVAL);
        } catch (SocketException se) {
            // Ignored - for the moment
        }

        if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
            StringBuffer debugBuffer =
                new StringBuffer(128)
                    .append(serverConnectionThread.getName())
                    .append(" is listening on ")
                    .append(serverSocket.toString());
            getLogger().debug(debugBuffer.toString());
        }
        while( !Thread.currentThread().interrupted() && null != serverConnectionThread ) {
            try {
                Socket clientSocket = null;
                try {
                    while (maxOpenConn > 0 && clientConnectionRunners.size() >= maxOpenConn) {
                        getLogger().warn("Maximum number of open connections (" +  clientConnectionRunners.size() + ") in use.");
                        synchronized (this) { wait(10000); }
                    }

                    clientSocket = serverSocket.accept();

                } catch( InterruptedIOException iioe ) {
                    // This exception is expected upon ServerConnection shutdown.
                    // See the POLLING_INTERVAL comment
                    continue;
                } catch( IOException se ) {
                    if (ioExceptionCount > 0) {
                        getLogger().error( "Fatal exception while listening on server socket.  Terminating connection.", se );
                        break;
                    } else {
                        continue;
                    }
                } catch( SecurityException se ) {
                    getLogger().error( "Fatal exception while listening on server socket.  Terminating connection.", se );
                    break;
                }
                ClientConnectionRunner runner = null;
                synchronized (clientConnectionRunners) {
                    if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) {
                        if (getLogger().isWarnEnabled()) {
                           getLogger().warn("Maximum number of open connections exceeded - refusing connection.  Current number of connections is " + clientConnectionRunners.size());
                           if (getLogger().isWarnEnabled()) {
                               Iterator runnerIterator = clientConnectionRunners.iterator();
                               getLogger().info("Connections: ");
                               while( runnerIterator.hasNext() ) {
                                   getLogger().info("    " + ((ClientConnectionRunner)runnerIterator.next()).toString());
                               }
                           }
                        }
                        try {
                            clientSocket.close();
                        } catch (IOException ignored) {
                            // We ignore this exception, as we already have an error condition.
                        }
                        continue;
                    } else {
                        clientSocket.setSoTimeout(socketTimeout);
                        runner = addClientConnectionRunner();
                        runner.setSocket(clientSocket);
                    }
                }
                setupLogger( runner );
                try {
                    connThreadPool.execute( runner );
                } catch (Exception e) {
                    // This error indicates that the underlying thread pool
                    // is out of threads.  For robustness, we catch this and
                    // cleanup
                    getLogger().error("Internal error - insufficient threads available to service request.  " +
                                      Thread.activeCount() + " threads in service request pool.", e);
                    try {
                        clientSocket.close();
                    } catch (IOException ignored) {
                        // We ignore this exception, as we already have an error condition.
                    }
                    // In this case, the thread will not remove the client connection runner,
                    // so we must.
                    removeClientConnectionRunner(runner);
                }
            } catch( IOException ioe ) {
                getLogger().error( "Exception accepting connection", ioe );
            } catch( Throwable e ) {
                getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
            }
        }
        synchronized( this ) {
            serverConnectionThread = null;
            Thread.currentThread().interrupted();
            notifyAll();
        }