FileDocCategorySizeDatePackage
JdbcDataSource.javaAPI DocApache James 2.3.124847Fri Jan 12 12:56:34 GMT 2007org.apache.james.util.mordred

JdbcDataSource.java

/****************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one   *
 * or more contributor license agreements.  See the NOTICE file *
 * distributed with this work for additional information        *
 * regarding copyright ownership.  The ASF licenses this file   *
 * to you under the Apache License, Version 2.0 (the            *
 * "License"); you may not use this file except in compliance   *
 * with the License.  You may obtain a copy of the License at   *
 *                                                              *
 *   http://www.apache.org/licenses/LICENSE-2.0                 *
 *                                                              *
 * Unless required by applicable law or agreed to in writing,   *
 * software distributed under the License is distributed on an  *
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
 * KIND, either express or implied.  See the License for the    *
 * specific language governing permissions and limitations      *
 * under the License.                                           *
 ****************************************************************/

package org.apache.james.util.mordred;

import org.apache.avalon.excalibur.datasource.DataSourceComponent;
import org.apache.avalon.framework.activity.Disposable;
import org.apache.avalon.framework.configuration.Configurable;
import org.apache.avalon.framework.configuration.Configuration;
import org.apache.avalon.framework.configuration.ConfigurationException;
import org.apache.avalon.framework.logger.AbstractLogEnabled;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;


/**
 * <p>
 * This is a <b>reliable</b> DataSource implementation, based on the pooling logic written for <a
 * href="http://share.whichever.com/">Town</a> and the configuration found in Avalon's excalibur
 * code.
 * </p>
 *
 * <p>
 * This uses the normal <code>java.sql.Connection</code> object and
 * <code>java.sql.DriverManager</code>.  The Configuration is like this:
 * <pre>
 *   <jdbc>
 *     <pool-controller min="<i>5</i>" max="<i>10</i>" connection-class="<i>my.overrided.ConnectionClass</i>">
 *       <keep-alive>select 1</keep-alive>
 *     </pool-controller>
 *     <driver><i>com.database.jdbc.JdbcDriver</i></driver>
 *     <dburl><i>jdbc:driver://host/mydb</i></dburl>
 *     <user><i>username</i></user>
 *     <password><i>password</i></password>
 *   </jdbc>
 * </pre>
 * </p>
 *
 * @version CVS $Revision: 494012 $
 * @since 4.0
 */
public class JdbcDataSource extends AbstractLogEnabled
    implements Configurable,
               Runnable,
               Disposable,
               DataSourceComponent {
    // The limit that an active connection can be running
    public static final long ACTIVE_CONN_TIME_LIMIT = 60000; // (one minute)
    public static final long ACTIVE_CONN_HARD_TIME_LIMIT = 2*ACTIVE_CONN_TIME_LIMIT;
    // How long before you kill off a connection due to inactivity
    public static final long CONN_IDLE_LIMIT        = 600000; // (10 minutes)
    private static final boolean DEEP_DEBUG         = false;
    private static int total_served                 = 0;
    // This is a temporary variable used to track how many active threads
    // are in createConnection().  This is to prevent to many connections
    // from being opened at once.
    private int connCreationsInProgress             = 0;
    // The error message is the conn pooler cannot serve connections for whatever reason
    private String connErrorMessage                 = null;
    // the last time a connection was created...
    private long connLastCreated                    = 0;
    // connection number for like of this broker
    private int connectionCount;
    // Driver class
    private String jdbcDriver;
    // Password to login to database
    private String jdbcPassword;
    // Server to connect to database (this really is the jdbc URL)
    private String jdbcURL;
    // Username to login to database
    private String jdbcUsername;
    // Maximum number of connections to have open at any point
    private int maxConn                             = 0;
    // collection of connection objects
    private ArrayList pool;
    // Thread that checks for dead/aged connections and removes them from pool
    private Thread reaper;
    // Flag to indicate whether reaper thread should run
    private boolean reaperActive                    = false;
    // a SQL command to execute to see if the connection is still ok
    private String verifyConnSQL;

    /**
     * Implements the ConnDefinition behavior when a connection is needed. Checks the pool of
     * connections to see if there is one available.  If there is not and we are below the max
     * number of connections limit, it tries to create another connection.  It retries this 10
     * times until a connection is available or can be created
     *
     * @return java.sql.Connection
     * @throws SQLException Document throws!
     */
    public Connection getConnection() throws SQLException {
        //If the conn definition has a fatal connection problem, need to return that error
        if(connErrorMessage != null) {
            throw new SQLException(connErrorMessage);
        }
        //Look through our list of open connections right now, starting from beginning.
        //If we find one, book it.
        int count                                   = total_served++;
        if(DEEP_DEBUG) {
            StringBuffer deepDebugBuffer =
                new StringBuffer(128)
                        .append((new java.util.Date()).toString())
                        .append(" trying to get a connection (")
                        .append(count)
                        .append(")");
            System.out.println(deepDebugBuffer.toString());
        }
        for(int attempts = 1; attempts <= 100; attempts++) {
            synchronized(pool) {
                for(int i = 0; i < pool.size(); i++) {
                    PoolConnEntry entry = (PoolConnEntry)pool.get(i);
                    //Set the appropriate flags to make this connection
                    //marked as in use
                    try {
                        if(entry.lock()) {
                            if(DEEP_DEBUG) {
                                StringBuffer deepDebugBuffer =
                                    new StringBuffer(128)
                                            .append((new java.util.Date()).toString())
                                            .append(" return a connection (")
                                            .append(count)
                                            .append(")");
                                System.out.println(deepDebugBuffer.toString());
                            }
                            return entry;
                        }
                    } catch(SQLException se) {
                        //Somehow a closed connection appeared in our pool.
                        //Remove it immediately.
                        finalizeEntry(entry);
                        continue;
                    }
                    //we were unable to get a lock on this entry.. so continue through list
                } //loop through existing connections
                //If we have 0, create another
                if(DEEP_DEBUG) {
                    System.out.println(pool.size());
                }
                try {
                    if(pool.size() == 0) {
                        //create a connection
                        PoolConnEntry entry = createConn();
                        if(entry != null) {
                            if(DEEP_DEBUG) {
                                StringBuffer deepDebugBuffer =
                                    new StringBuffer(128)
                                            .append((new java.util.Date()).toString())
                                            .append(" returning new connection (")
                                            .append(count)
                                            .append(")");
                                System.out.println(deepDebugBuffer.toString());
                            }
                            return entry;
                        }
                        //looks like a connection was already created
                    } else {
                        //Since we didn't find one, and we have < max connections, then consider whether
                        //  we create another
                        //if we've hit the 3rd attempt without getting a connection,
                        //  let's create another to anticipate a slow down
                        if((attempts == 2) && (pool.size() < maxConn || maxConn == 0)) {
                            PoolConnEntry entry = createConn();
                            if(entry != null) {
                                if(DEEP_DEBUG) {
                                    StringBuffer deepDebugBuffer =
                                        new StringBuffer(32)
                                                .append(" returning new connection (")
                                                .append(count)
                                                .append(")");
                                    System.out.println(deepDebugBuffer.toString());
                                }
                                return entry;
                            } else {
                                attempts = 1;
                            }
                        }
                    }
                } catch(SQLException sqle) {
                    //Ignore... error creating the connection
                    StringWriter sout = new StringWriter();
                    PrintWriter pout  = new PrintWriter(sout, true);
                    pout.println("Error creating connection: ");
                    sqle.printStackTrace(pout);
                    if (getLogger().isErrorEnabled()) {
                        getLogger().error(sout.toString());
                    }
                }
            }
            //otherwise sleep 50ms 10 times, then create a connection
            try {
                Thread.currentThread().sleep(50);
            } catch(InterruptedException ie) {
            }
        }
        // Give up... no connections available
        throw new SQLException("Giving up... no connections available.");
    }

    /**
     * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
     */
    public void configure(final Configuration configuration)
                   throws ConfigurationException {
        try {
            jdbcDriver    = configuration.getChild("driver").getValue(null);
            jdbcURL       = configuration.getChild("dburl").getValue(null);
            jdbcUsername  = configuration.getChild("user").getValue(null);
            jdbcPassword  = configuration.getChild("password").getValue(null);
            maxConn       = configuration.getChild("max").getValueAsInteger(2);
            //logfilename?
            verifyConnSQL = configuration.getChild("keep-alive").getValue(null);
            //Not support from Town: logfilename
            //Not supporting from Excalibur: pool-controller, min, auto-commit, oradb, connection-class
            if(jdbcDriver == null) {
                throw new ConfigurationException("You need to specify a valid driver, e.g., <driver>my.class</driver>");
            }
            try {
                if (getLogger().isDebugEnabled()) {
                    getLogger().debug("Loading new driver: " + jdbcDriver);
                }
                // TODO: Figure out why this breaks when we change the Class.forName to
                //       a loadClass method call on the class loader.
                // DO NOT MESS WITH THIS UNLESS YOU ARE WILLING TO TEST
                // AND FIX THE PROBLEMS!
                Class.forName(jdbcDriver, true, Thread.currentThread().getContextClassLoader());
                // These variations do NOT work:
                // getClass().getClassLoader().loadClass(jdbcDriver);                    -- DON'T USE -- BROKEN!!
                // Thread.currentThread().getContextClassLoader().loadClass(jdbcDriver); -- DON'T USE -- BROKEN!!
            } catch(ClassNotFoundException cnfe) {
                StringBuffer exceptionBuffer =
                    new StringBuffer(128)
                            .append("'")
                            .append(jdbcDriver)
                            .append("' could not be found in classloader.  Please specify a valid JDBC driver");
                String exceptionMessage = exceptionBuffer.toString();
                getLogger().error(exceptionMessage);
                throw new ConfigurationException(exceptionMessage);
            }
            if(jdbcURL == null) {
                throw new ConfigurationException("You need to specify a valid JDBC connection string, e.g., <dburl>jdbc:driver:database</dburl>");
            }
            if(maxConn < 0) {
                throw new ConfigurationException("Maximum number of connections specified must be at least 1 (0 means no limit).");
            }
            if (getLogger().isDebugEnabled()) {
                getLogger().debug("Starting connection pooler");
                getLogger().debug("driver = " + jdbcDriver);
                getLogger().debug("dburl = " + jdbcURL);
                getLogger().debug("username = " + jdbcUsername);
                //We don't show the password
                getLogger().debug("max connections = " + maxConn);
            }
            pool         = new ArrayList();
            reaperActive = true;
            reaper       = new Thread(this);
            reaper.setDaemon(true);
            reaper.start();
        } catch(ConfigurationException ce) {
            //Let this pass through...
            throw ce;
        }
         catch(Exception e) {
            throw new ConfigurationException("Error configuring JdbcDataSource", e);
        }
    }

    /**
     * The dispose operation is called at the end of a components lifecycle.
     * Cleans up all JDBC connections.
     *
     * @throws Exception if an error is encountered during shutdown
     */
    public void dispose() {
        // Stop the background monitoring thread
        if(reaper != null) {
            reaperActive = false;
            //In case it's sleeping, help it quit faster
            reaper.interrupt();
            reaper = null;
        }
        // The various entries will finalize themselves once the reference
        // is removed, so no need to do it here
    }

    /**
     * Implements the ConnDefinition behavior when something bad has happened to a connection. If a
     * sql command was provided in the properties file, it will run this and attempt to determine
     * whether the connection is still valid.  If it is, it recycles this connection back into the
     * pool.  If it is not, it closes the connection.
     *
     * @param entry the connection that had problems
     * @deprecated - No longer used in the new approach.
     */
    public void killConnection(PoolConnEntry entry) {
        if(entry != null) {
            // if we were provided SQL to test the connection with, we will use
            // this and possibly just release the connection after clearing warnings
            if(verifyConnSQL != null) {
                try {
                    // Test this connection
                    java.sql.Statement stmt = null;
                    try {
                        stmt = entry.createStatement();
                        stmt.execute(verifyConnSQL);
                    } finally {
                        try {
                            if (stmt != null) {
                                stmt.close();
                            }
                        } catch (SQLException sqle) {
                            // Failure to close ignored on test connection
                        }
                    }
                    // Passed test... recycle the entry
                    entry.unlock();
                } catch(SQLException e1) {
                    // Failed test... close the entry
                    finalizeEntry(entry);
                }
            } else {
                // No SQL was provided... we have to kill this entry to be sure
                finalizeEntry(entry);
            }
            return;
        } else {
            if (getLogger().isWarnEnabled()) {
                getLogger().warn("----> Could not find connection to kill!!!");
            }
            return;
        }
    }

    /**
     * Implements the ConnDefinition behavior when a connection is no longer needed. This resets
     * flags on the wrapper of the connection to allow others to use this connection.
     *
     * @param entry
     */
    public void releaseConnection(PoolConnEntry entry) {
        //PoolConnEntry entry = findEntry(conn);
        if(entry != null) {
            entry.unlock();
            return;
        } else {
            if (getLogger().isWarnEnabled()) {
                getLogger().warn("----> Could not find the connection to free!!!");
            }
            return;
        }
    }

    /**
     * Background thread that checks if there are fewer connections open than minConn specifies,
     * and checks whether connections have been checked out for too long, killing them.
     */
    public void run() {
        try {
            while(reaperActive) {
                synchronized(pool) {
                    for(int i = 0; i < pool.size(); i++) try {
                        PoolConnEntry entry = (PoolConnEntry)pool.get(i);
                        long age            = System.currentTimeMillis() - entry.getLastActivity();
                        synchronized(entry) {
                            if((entry.getStatus() == PoolConnEntry.ACTIVE) &&
                               (age > ACTIVE_CONN_HARD_TIME_LIMIT)) {
                                StringBuffer logBuffer =
                                    new StringBuffer(128)
                                            .append(" ***** connection ")
                                            .append(entry.getId())
                                            .append(" is way too old: ")
                                            .append(age)
                                            .append(" > ")
                                            .append(ACTIVE_CONN_HARD_TIME_LIMIT)
                                            .append(" and will be closed.");
                                getLogger().info(logBuffer.toString());
                                // This connection is way too old...
                                // kill it no matter what
                                finalizeEntry(entry);
                                continue;
                            }
                            if((entry.getStatus() == PoolConnEntry.ACTIVE) &&
                               (age > ACTIVE_CONN_TIME_LIMIT)) {
                                StringBuffer logBuffer =
                                    new StringBuffer(128)
                                            .append(" ***** connection ")
                                            .append(entry.getId())
                                            .append(" is way too old: ")
                                            .append(age)
                                            .append(" > ")
                                            .append(ACTIVE_CONN_TIME_LIMIT);
                                getLogger().info(logBuffer.toString());
                                // This connection is way too old...
                                // just log it for now.
                                continue;
                            }
                            if((entry.getStatus() == PoolConnEntry.AVAILABLE) &&
                               (age > CONN_IDLE_LIMIT)) {
                                //We've got a connection that's too old... kill it
                                finalizeEntry(entry);
                                continue;
                            }
                        }
                    }
                    catch (Throwable ex)
                    {
                        StringWriter sout = new StringWriter();
                        PrintWriter pout = new PrintWriter(sout, true);
                        pout.println("Reaper Error: ");
                        ex.printStackTrace(pout);
                        if (getLogger().isErrorEnabled()) {
                            getLogger().error(sout.toString());
                        }
                    }
                }
                try {
                    // Check for activity every 5 seconds
                    Thread.sleep(5000L);
                } catch(InterruptedException ex) {
                }
            }
        } finally {
            Thread.currentThread().interrupted();
        }
    }

    protected void debug(String message) {
        getLogger().debug(message);
    }

    protected void info(String message) {
        getLogger().info(message);
    }

    /*
     * This is a real hack, but oh well for now
     */
    protected void warn(String message) {
        getLogger().warn(message);
    }

    /**
     * Creates a new connection as per these properties, adds it to the pool, and logs the creation.
     *
     * @return PoolConnEntry the new connection wrapped as an entry
     * @throws SQLException
     */
    private PoolConnEntry createConn() throws SQLException {
        PoolConnEntry entry = null;
        synchronized(pool) {
            if(connCreationsInProgress > 0) {
                //We are already creating one in another place
                return null;
            }
            long now = System.currentTimeMillis();
            if((now - connLastCreated) < (1000 * pool.size())) {
                //We don't want to scale up too quickly...
                if(DEEP_DEBUG) {
                    System.err.println("We don't want to scale up too quickly");
                }
                return null;
            }
            if((maxConn == 0) || (pool.size() < maxConn)) {
                connCreationsInProgress++;
                connLastCreated = now;
            } else {
                // We've already hit a limit... fail silently
                if (getLogger().isDebugEnabled())
                {
                    StringBuffer logBuffer =
                        new StringBuffer(128)
                                .append("Connection limit hit... ")
                                .append(pool.size())
                                .append(" in pool and ")
                                .append(connCreationsInProgress)
                                .append(" + on the way.");
                    getLogger().debug(logBuffer.toString());
                }
                return null;
            }
            try {
                entry = new PoolConnEntry(this,
                                          java.sql.DriverManager.getConnection(jdbcURL, jdbcUsername,
                                                                               jdbcPassword),
                                          ++connectionCount);
                if (getLogger().isDebugEnabled())
                {
                    getLogger().debug("Opening connection " + entry);
                }
                entry.lock();
                pool.add(entry);
                return entry;
            } catch(SQLException sqle) {
                //Shouldn't ever happen, but it did, just return null.
                // Exception from DriverManager.getConnection() - log it, and return null
                StringWriter sout = new StringWriter();
                PrintWriter pout = new PrintWriter(sout, true);
                pout.println("Error creating connection: ");
                sqle.printStackTrace(pout);
                if (getLogger().isErrorEnabled()) {
                    getLogger().error(sout.toString());
                }
                return null;
            } finally {
                    connCreationsInProgress--;
            }
        }
    }

    /**
     * Closes a connection and removes it from the pool.
     *
     * @param entry entry
     */
    private void finalizeEntry(PoolConnEntry entry) {
        synchronized(pool) {
            try {
                entry.finalize();
            } catch(Exception fe) {
            }
            pool.remove(entry);
        }
    }
}