ServerConnectionpublic class ServerConnection extends org.apache.avalon.framework.logger.AbstractLogEnabled implements org.apache.avalon.framework.activity.Initializable, RunnableRepresents a single server socket managed by a connection manager.
The connection manager will spawn a single ServerConnection for each
server socket that the connection manager is managing. |
Fields Summary |
---|
private static int | POLLING_INTERVALThis is a hack to deal with the fact that there appears to be
no platform-independent way to break out of a ServerSocket
accept() call. On some platforms closing either the ServerSocket
itself, or its associated InputStream, causes the accept
method to exit. Unfortunately, this behavior is not consistent
across platforms. The deal with this, we introduce a polling
loop of 20 seconds for the server socket. This introduces a
cost across platforms, but is necessary to maintain cross-platform
functionality. | private ServerSocket | serverSocketThe server socket which this connection is managing | private org.apache.avalon.cornerstone.services.connection.ConnectionHandlerFactory | handlerFactoryThe connection handler factory that generates connection
handlers to manage client connections to this server socket | private org.apache.avalon.excalibur.pool.Pool | runnerPoolThe pool that produces ClientConnectionRunners | private org.apache.avalon.excalibur.pool.ObjectFactory | theRunnerFactoryThe factory used to provide ClientConnectionRunner objects | private org.apache.excalibur.thread.ThreadPool | connThreadPoolThe thread pool used to spawn individual threads used to manage each
client connection. | private int | socketTimeoutThe timeout for client sockets spawned off this connection. | private int | maxOpenConnThe maximum number of open client connections that this server
connection will allow. | private final ArrayList | clientConnectionRunnersA collection of client connection runners. | private Thread | serverConnectionThreadThe thread used to manage this server connection. |
Methods Summary |
---|
private org.apache.james.util.connection.ServerConnection$ClientConnectionRunner | addClientConnectionRunner()Returns a ClientConnectionRunner in the set managed by this ServerConnection object.
synchronized (clientConnectionRunners) {
ClientConnectionRunner clientConnectionRunner = (ClientConnectionRunner)runnerPool.get();
clientConnectionRunners.add(clientConnectionRunner);
if (getLogger().isDebugEnabled()) {
getLogger().debug("Adding one connection for a total of " + clientConnectionRunners.size());
}
return clientConnectionRunner;
}
| public void | dispose()The dispose operation is called by the owning ConnectionManager
at the end of its lifecycle. Cleans up the server connection, forcing
everything to finish.
if (getLogger().isDebugEnabled()) {
getLogger().debug("Disposing server connection..." + this.toString());
}
synchronized( this ) {
if( null != serverConnectionThread ) {
// Execution of this block means that the run() method
// hasn't finished yet. So we interrupt the thread
// to terminate run() and wait for the run() method
// to finish. The notifyAll() at the end of run() will
// wake this thread and allow dispose() to end.
Thread thread = serverConnectionThread;
serverConnectionThread = null;
thread.interrupt();
try {
serverSocket.close();
} catch (IOException ie) {
// Ignored - we're doing this to break out of the
// accept. This minimizes the time required to
// shutdown the server. Unfortunately, this is
// not guaranteed to work on all platforms. See
// the comments for POLLING_INTERVAL
}
try {
if (POLLING_INTERVAL > 0) {
wait(2L*POLLING_INTERVAL);
} else {
wait();
}
} catch (InterruptedException ie) {
// Expected - just complete dispose()
}
}
ContainerUtil.dispose(runnerPool);
runnerPool = null;
}
getLogger().debug("Closed server connection - cleaning up clients - " + this.toString());
synchronized (clientConnectionRunners) {
Iterator runnerIterator = clientConnectionRunners.iterator();
while( runnerIterator.hasNext() ) {
ClientConnectionRunner runner = (ClientConnectionRunner)runnerIterator.next();
runner.dispose();
runner = null;
}
clientConnectionRunners.clear();
}
getLogger().debug("Cleaned up clients - " + this.toString());
| public void | initialize()
runnerPool = new HardResourceLimitingPool(theRunnerFactory, 5, maxOpenConn);
ContainerUtil.enableLogging(runnerPool,getLogger());
ContainerUtil.initialize(runnerPool);
| private void | removeClientConnectionRunner(org.apache.james.util.connection.ServerConnection$ClientConnectionRunner clientConnectionRunner)Removes a ClientConnectionRunner from the set managed by this ServerConnection object.
/*
* checking runnerPool avoids 'dead-lock' when service is disposing :
* (dispose() calls dispose all runners)
* but runner is 'running' and cleans up on exit
* this situation will result in a dead-lock on 'clientConnectionRunners'
*/
if( runnerPool == null ) {
getLogger().info("ServerConnection.removeClientConnectionRunner - dispose has been called - so just return : " + clientConnectionRunner );
return;
}
synchronized (clientConnectionRunners) {
if (clientConnectionRunners.remove(clientConnectionRunner)) {
if (getLogger().isDebugEnabled()) {
getLogger().debug("Releasing one connection, leaving a total of " + clientConnectionRunners.size());
}
runnerPool.put(clientConnectionRunner);
}
}
synchronized (this) { notify(); } // match the wait(...) in the run() inner loop before accept().
| public void | run()Provides the body for the thread of execution for a ServerConnection.
Connections made to the server socket are passed to an appropriate,
newly created, ClientConnectionRunner
serverConnectionThread = Thread.currentThread();
int ioExceptionCount = 0;
try {
serverSocket.setSoTimeout(POLLING_INTERVAL);
} catch (SocketException se) {
// Ignored - for the moment
}
if ((getLogger().isDebugEnabled()) && (serverConnectionThread != null)) {
StringBuffer debugBuffer =
new StringBuffer(128)
.append(serverConnectionThread.getName())
.append(" is listening on ")
.append(serverSocket.toString());
getLogger().debug(debugBuffer.toString());
}
while( !Thread.currentThread().interrupted() && null != serverConnectionThread ) {
try {
Socket clientSocket = null;
try {
while (maxOpenConn > 0 && clientConnectionRunners.size() >= maxOpenConn) {
getLogger().warn("Maximum number of open connections (" + clientConnectionRunners.size() + ") in use.");
synchronized (this) { wait(10000); }
}
clientSocket = serverSocket.accept();
} catch( InterruptedIOException iioe ) {
// This exception is expected upon ServerConnection shutdown.
// See the POLLING_INTERVAL comment
continue;
} catch( IOException se ) {
if (ioExceptionCount > 0) {
getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
break;
} else {
continue;
}
} catch( SecurityException se ) {
getLogger().error( "Fatal exception while listening on server socket. Terminating connection.", se );
break;
}
ClientConnectionRunner runner = null;
synchronized (clientConnectionRunners) {
if ((maxOpenConn > 0) && (clientConnectionRunners.size() >= maxOpenConn)) {
if (getLogger().isWarnEnabled()) {
getLogger().warn("Maximum number of open connections exceeded - refusing connection. Current number of connections is " + clientConnectionRunners.size());
if (getLogger().isWarnEnabled()) {
Iterator runnerIterator = clientConnectionRunners.iterator();
getLogger().info("Connections: ");
while( runnerIterator.hasNext() ) {
getLogger().info(" " + ((ClientConnectionRunner)runnerIterator.next()).toString());
}
}
}
try {
clientSocket.close();
} catch (IOException ignored) {
// We ignore this exception, as we already have an error condition.
}
continue;
} else {
clientSocket.setSoTimeout(socketTimeout);
runner = addClientConnectionRunner();
runner.setSocket(clientSocket);
}
}
setupLogger( runner );
try {
connThreadPool.execute( runner );
} catch (Exception e) {
// This error indicates that the underlying thread pool
// is out of threads. For robustness, we catch this and
// cleanup
getLogger().error("Internal error - insufficient threads available to service request. " +
Thread.activeCount() + " threads in service request pool.", e);
try {
clientSocket.close();
} catch (IOException ignored) {
// We ignore this exception, as we already have an error condition.
}
// In this case, the thread will not remove the client connection runner,
// so we must.
removeClientConnectionRunner(runner);
}
} catch( IOException ioe ) {
getLogger().error( "Exception accepting connection", ioe );
} catch( Throwable e ) {
getLogger().error( "Exception executing client connection runner: " + e.getMessage(), e );
}
}
synchronized( this ) {
serverConnectionThread = null;
Thread.currentThread().interrupted();
notifyAll();
}
|
|