FileDocCategorySizeDatePackage
ServerConnection.javaAPI DocApache James 2.3.121480Fri Jan 12 12:56:34 GMT 2007org.apache.james.util.connection

ServerConnection.java

/****************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one   *
 * or more contributor license agreements.  See the NOTICE file *
 * distributed with this work for additional information        *
 * regarding copyright ownership.  The ASF licenses this file   *
 * to you under the Apache License, Version 2.0 (the            *
 * "License"); you may not use this file except in compliance   *
 * with the License.  You may obtain a copy of the License at   *
 *                                                              *
 *   http://www.apache.org/licenses/LICENSE-2.0                 *
 *                                                              *
 * Unless required by applicable law or agreed to in writing,   *
 * software distributed under the License is distributed on an  *
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
 * KIND, either express or implied.  See the License for the    *
 * specific language governing permissions and limitations      *
 * under the License.                                           *
 ****************************************************************/

package org.apache.james.util.connection;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.avalon.cornerstone.services.connection.ConnectionHandler;
import org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory;
import org.apache.avalon.excalibur.pool.HardResourceLimitingPool;
import org.apache.avalon.excalibur.pool.ObjectFactory;
import org.apache.avalon.excalibur.pool.Pool;
import org.apache.avalon.excalibur.pool.Poolable;
import org.apache.excalibur.thread.ThreadPool ;
import org.apache.avalon.framework.activity.Initializable;
import org.apache.avalon.framework.container.ContainerUtil;
import org.apache.avalon.framework.logger.AbstractLogEnabled;


/**
 * Represents a single server socket managed by a connection manager.
 * The connection manager will spawn a single ServerConnection for each
 * server socket that the connection manager is managing.
 *
 */
public class ServerConnection extends AbstractLogEnabled
    implements Initializable, Runnable {

    /**
     * This is a hack to deal with the fact that there appears to be
     * no platform-independent way to break out of a ServerSocket
     * accept() call.  On some platforms closing either the ServerSocket
     * itself, or its associated InputStream, causes the accept
     * method to exit.  Unfortunately, this behavior is not consistent
     * across platforms.  The deal with this, we introduce a polling
     * loop of 20 seconds for the server socket.  This introduces a
     * cost across platforms, but is necessary to maintain cross-platform
     * functionality.
     */
    private static int POLLING_INTERVAL = 20*1000;

    /**
     * The server socket which this connection is managing
     */
    private ServerSocket serverSocket;

    /**
     * The connection handler factory that generates connection
     * handlers to manage client connections to this server socket
     */
    private ConnectionHandlerFactory handlerFactory;

    /**
     * The pool that produces ClientConnectionRunners
     */
    private Pool runnerPool;

    /**
     * The factory used to provide ClientConnectionRunner objects
     */
    private ObjectFactory theRunnerFactory = new ClientConnectionRunnerFactory();

    /**
     * The thread pool used to spawn individual threads used to manage each
     * client connection.
     */
    private ThreadPool connThreadPool;

    /**
     * The timeout for client sockets spawned off this connection.
     */
    private int socketTimeout;

    /**
     * The maximum number of open client connections that this server
     * connection will allow.
     */
    private int maxOpenConn;

    /**
     * A collection of client connection runners.
     */
    private final ArrayList clientConnectionRunners = new ArrayList();

    /**
     * The thread used to manage this server connection.
     */
    private Thread serverConnectionThread;

    /**
     * The sole constructor for a ServerConnection.
     *
     * @param serverSocket the ServerSocket associated with this ServerConnection
     * @param handlerFactory the factory that generates ConnectionHandlers for the client
     *                       connections spawned off this ServerConnection
     * @param threadPool the ThreadPool used to obtain handler threads
     * @param timeout the client idle timeout for this ServerConnection's client connections
     * @param maxOpenConn the maximum number of open client connections allowed for this
     *                    ServerConnection
     */
    public ServerConnection(ServerSocket serverSocket,
                            ConnectionHandlerFactory handlerFactory,
                            ThreadPool threadPool,
                            int timeout,
                            int maxOpenConn) {
        this.serverSocket = serverSocket;
        this.handlerFactory = handlerFactory;
        connThreadPool = threadPool;
        socketTimeout = timeout;
        this.maxOpenConn = maxOpenConn;
    }

    /**
     * @see org.apache.avalon.framework.activity.Initializable#initialize()
     */
    public void initialize() throws Exception {
        runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
        ContainerUtil.enableLogging(runnerPool,getLogger());
        ContainerUtil.initialize(runnerPool);
    }

    /**
     * The dispose operation is called by the owning ConnectionManager
     * at the end of its lifecycle.  Cleans up the server connection, forcing
     * everything to finish.
     */
    public void dispose() {
        if (getLogger().isDebugEnabled()) {
            getLogger().debug("Disposing server connection..." + this.toString());
        }
        synchronized( this ) {
            if( null != serverConnectionThread ) {
                // Execution of this block means that the run() method
                // hasn't finished yet.  So we interrupt the thread
                // to terminate run() and wait for the run() method
                // to finish.  The notifyAll() at the end of run() will
                // wake this thread and allow dispose() to end.
                Thread thread = serverConnectionThread;
                serverConnectionThread = null;
                thread.interrupt();
                try {
                    serverSocket.close();
                } catch (IOException ie) {
                    // Ignored - we're doing this to break out of the
                    // accept.  This minimizes the time required to
                    // shutdown the server.  Unfortunately, this is
                    // not guaranteed to work on all platforms.  See
                    // the comments for POLLING_INTERVAL
                }
                try {
                    if (POLLING_INTERVAL > 0) {
                        wait(2L*POLLING_INTERVAL);
                    } else {
                        wait();
                    }
                } catch (InterruptedException ie) {
                    // Expected - just complete dispose()
                }
            }
            ContainerUtil.dispose(runnerPool);
            runnerPool = null;
        }

        getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());

        synchronized (clientConnectionRunners) {
            Iterator runnerIterator = clientConnectionRunners.iterator();
            while( runnerIterator.hasNext() ) {
                ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
                runner.dispose();
                runner = null;
            }
            clientConnectionRunners.clear();
        }

        getLogger().debug("Cleaned up clients - " + this.toString());

    }

    /**
     * Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
     *
     * @param clientConnectionRunner the ClientConnectionRunner to be added
     */
    private ClientConnectionRunner addClientConnectionRunner()
            throws Exception {
        synchronized (clientConnectionRunners) {
            ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
            clientConnectionRunners.add(clientConnectionRunner);
            if (getLogger().isDebugEnabled()) {
                getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size());
            }
            return clientConnectionRunner;
        }
    }

    /**
     * Removes a ClientConnectionRunner from the set managed by this ServerConnection object.
     *
     * @param clientConnectionRunner the ClientConnectionRunner to be removed
     */
    private void removeClientConnectionRunner(ClientConnectionRunner clientConnectionRunner) {

       /*
        * checking runnerPool avoids 'dead-lock' when service is disposing :
        * (dispose() calls dispose all runners)
        * but runner is 'running' and cleans up on exit
        * this situation will result in a dead-lock on 'clientConnectionRunners'
        */
        if( runnerPool == null ) {
            getLogger().info("ServerConnection.removeClientConnectionRunner - dispose has been called - so just return : " + clientConnectionRunner );
            return;
        }
        
        synchronized (clientConnectionRunners) {
            if (clientConnectionRunners.remove(clientConnectionRunner)) {
                if (getLogger().isDebugEnabled()) {
                    getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size());
                }
                runnerPool.put(clientConnectionRunner);
            }
        }

        synchronized (this) { notify(); } // match the wait(...) in the run() inner loop before accept().
    }

    /**
     * Provides the body for the thread of execution for a ServerConnection.
     * Connections made to the server socket are passed to an appropriate,
     * newly created, ClientConnectionRunner
     */
    public void run() {
        serverConnectionThread = Thread.currentThread();

        int ioExceptionCount = 0;
        try {
            serverSocket.setSoTimeout(POLLING_INTERVAL);
        } catch (SocketException se) {
            // Ignored - for the moment
        }

        if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
            StringBuffer debugBuffer =
                new StringBuffer(128)
                    .append(serverConnectionThread.getName())
                    .append(" is listening on ")
                    .append(serverSocket.toString());
            getLogger().debug(debugBuffer.toString());
        }
        while( !Thread.currentThread().interrupted() && null != serverConnectionThread ) {
            try {
                Socket clientSocket = null;
                try {
                    while (maxOpenConn > 0 && clientConnectionRunners.size() >= maxOpenConn) {
                        getLogger().warn("Maximum number of open connections (" +  clientConnectionRunners.size() + ") in use.");
                        synchronized (this) { wait(10000); }
                    }

                    clientSocket = serverSocket.accept();

                } catch( InterruptedIOException iioe ) {
                    // This exception is expected upon ServerConnection shutdown.
                    // See the POLLING_INTERVAL comment
                    continue;
                } catch( IOException se ) {
                    if (ioExceptionCount > 0) {
                        getLogger().error( "Fatal exception while listening on server socket.  Terminating connection.", se );
                        break;
                    } else {
                        continue;
                    }
                } catch( SecurityException se ) {
                    getLogger().error( "Fatal exception while listening on server socket.  Terminating connection.", se );
                    break;
                }
                ClientConnectionRunner runner = null;
                synchronized (clientConnectionRunners) {
                    if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) {
                        if (getLogger().isWarnEnabled()) {
                           getLogger().warn("Maximum number of open connections exceeded - refusing connection.  Current number of connections is " + clientConnectionRunners.size());
                           if (getLogger().isWarnEnabled()) {
                               Iterator runnerIterator = clientConnectionRunners.iterator();
                               getLogger().info("Connections: ");
                               while( runnerIterator.hasNext() ) {
                                   getLogger().info("    " + ((ClientConnectionRunner)runnerIterator.next()).toString());
                               }
                           }
                        }
                        try {
                            clientSocket.close();
                        } catch (IOException ignored) {
                            // We ignore this exception, as we already have an error condition.
                        }
                        continue;
                    } else {
                        clientSocket.setSoTimeout(socketTimeout);
                        runner = addClientConnectionRunner();
                        runner.setSocket(clientSocket);
                    }
                }
                setupLogger( runner );
                try {
                    connThreadPool.execute( runner );
                } catch (Exception e) {
                    // This error indicates that the underlying thread pool
                    // is out of threads.  For robustness, we catch this and
                    // cleanup
                    getLogger().error("Internal error - insufficient threads available to service request.  " +
                                      Thread.activeCount() + " threads in service request pool.", e);
                    try {
                        clientSocket.close();
                    } catch (IOException ignored) {
                        // We ignore this exception, as we already have an error condition.
                    }
                    // In this case, the thread will not remove the client connection runner,
                    // so we must.
                    removeClientConnectionRunner(runner);
                }
            } catch( IOException ioe ) {
                getLogger().error( "Exception accepting connection", ioe );
            } catch( Throwable e ) {
                getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
            }
        }
        synchronized( this ) {
            serverConnectionThread = null;
            Thread.currentThread().interrupted();
            notifyAll();
        }
    }

    /**
     * An inner class to provide the actual body of the thread of execution
     * that occurs upon a client connection.
     *
     */
    class ClientConnectionRunner extends AbstractLogEnabled
        implements Poolable, Runnable  {

        /**
         * The Socket that this client connection is using for transport.
         */
        private Socket clientSocket;

        /**
         * The thread of execution associated with this client connection.
         */
        private Thread clientSocketThread;

        /**
         * Returns string for diagnostic logging
         */
        public String toString() {
            return getClass().getName() + " for " + clientSocket + " on " + clientSocketThread;
        }

        public ClientConnectionRunner() {
        }

        /**
         * The dispose operation that terminates the runner.  Should only be
         * called by the ServerConnection that owns the ClientConnectionRunner
         */
        public void dispose() {
            synchronized( this ) {
                if (null != clientSocketThread) {
                    // Execution of this block means that the run() method
                    // hasn't finished yet.  So we interrupt the thread
                    // to terminate run() and wait for the run() method
                    // to finish.  The notifyAll() at the end of run() will
                    // wake this thread and allow dispose() to end.
                    clientSocketThread.interrupt();
                    clientSocketThread = null;
                    try {
                        wait();
                    } catch (InterruptedException ie) {
                        // Expected - return from the method
                    }
                }
            }
        }

        /**
         * Sets the socket for a ClientConnectionRunner.
         *
         * @param socket the client socket associated with this ClientConnectionRunner
         */
        public void setSocket(Socket socket) {
            clientSocket = socket;
        }

        /**
         * Provides the body for the thread of execution dealing with a particular client
         * connection.  An appropriate ConnectionHandler is created, applied, executed,
         * and released.
         */
        public void run() {
            ConnectionHandler handler = null;
            try {
                clientSocketThread = Thread.currentThread();

                handler = ServerConnection.this.handlerFactory.createConnectionHandler();
                String connectionString = null;
                if( getLogger().isDebugEnabled() ) {
                    connectionString = getConnectionString();
                    String message = "Starting " + connectionString;
                    getLogger().debug( message );
                }

                handler.handleConnection(clientSocket);

                if( getLogger().isDebugEnabled() ) {
                    String message = "Ending " + connectionString;
                    getLogger().debug( message );
                }

            } catch( Throwable e ) {
                getLogger().error( "Error handling connection", e );
            } finally {

                // Close the underlying socket
                try {
                    if (clientSocket != null) {
                        clientSocket.close();
                    }
                } catch( IOException ioe ) {
                    getLogger().warn( "Error shutting down connection", ioe );
                }

                clientSocket = null;

                // Null out the thread, notify other threads to encourage
                // a context switch
                synchronized( this ) {
                    clientSocketThread = null;

                    Thread.currentThread().interrupted();

                    // Release the handler and kill the reference to the handler factory
                    //
                    // This needs to be done after the clientSocketThread is nulled out,
                    // otherwise we could trash a reused ClientConnectionRunner
                    if (handler != null) {
                        ServerConnection.this.handlerFactory.releaseConnectionHandler( handler );
                        handler = null;
                    }

                    // Remove this runner from the list of active connections.
                    ServerConnection.this.removeClientConnectionRunner(this);

                    notifyAll();
                }
            }
        }

        /**
         * Helper method to return a formatted string with connection transport information.
         *
         * @return a formatted string
         */
        private String getConnectionString() {
            if (clientSocket == null) {
                return "invalid socket";
            }
            StringBuffer connectionBuffer
                = new StringBuffer(256)
                    .append("connection on ")
                    .append(clientSocket.getLocalAddress().getHostAddress().toString())
                    .append(":")
                    .append(clientSocket.getLocalPort())
                    .append(" from ")
                    .append(clientSocket.getInetAddress().getHostAddress().toString())
                    .append(":")
                    .append(clientSocket.getPort());
            return connectionBuffer.toString();
        }
    }

    /**
     * The factory for producing handlers.
     */
    private class ClientConnectionRunnerFactory
        implements ObjectFactory {

        /**
         * @see org.apache.avalon.excalibur.pool.ObjectFactory#newInstance()
         */
        public Object newInstance() throws Exception {
            return new ClientConnectionRunner();
        }

        /**
         * @see org.apache.avalon.excalibur.pool.ObjectFactory#getCreatedClass()
         */
        public Class getCreatedClass() {
            return ClientConnectionRunner.class;
        }

        /**
         * @see org.apache.avalon.excalibur.pool.ObjectFactory#decommision(Object)
         */
        public void decommission( Object object ) throws Exception {
            return;
        }
    }
}