FileDocCategorySizeDatePackage
JDBCSpoolRepository.javaAPI DocApache James 2.3.113626Fri Jan 12 12:56:24 GMT 2007org.apache.james.mailrepository

JDBCSpoolRepository.java

/****************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one   *
 * or more contributor license agreements.  See the NOTICE file *
 * distributed with this work for additional information        *
 * regarding copyright ownership.  The ASF licenses this file   *
 * to you under the Apache License, Version 2.0 (the            *
 * "License"); you may not use this file except in compliance   *
 * with the License.  You may obtain a copy of the License at   *
 *                                                              *
 *   http://www.apache.org/licenses/LICENSE-2.0                 *
 *                                                              *
 * Unless required by applicable law or agreed to in writing,   *
 * software distributed under the License is distributed on an  *
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
 * KIND, either express or implied.  See the License for the    *
 * specific language governing permissions and limitations      *
 * under the License.                                           *
 ****************************************************************/



package org.apache.james.mailrepository;

import org.apache.avalon.framework.configuration.Configuration;
import org.apache.avalon.framework.configuration.ConfigurationException;

import org.apache.james.services.SpoolRepository;
import org.apache.mailet.Mail;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;

/**
 * Implementation of a SpoolRepository on a database.
 *
 * <p>Requires a configuration element in the .conf.xml file of the form:
 *  <br><repository destinationURL="town://path"
 *  <br>            type="MAIL"
 *  <br>            model="SYNCHRONOUS"/>
 *  <br>            <driver>sun.jdbc.odbc.JdbcOdbcDriver</conn>
 *  <br>            <conn>jdbc:odbc:LocalDB</conn>
 *  <br>            <table>Message</table>
 *  <br></repository>
 * <p>destinationURL specifies..(Serge??)
 * <br>Type can be SPOOL or MAIL
 * <br>Model is currently not used and may be dropped
 * <br>conn is the location of the ...(Serge)
 * <br>table is the name of the table in the Database to be used
 *
 * <p>Requires a logger called MailRepository.
 *
 * <p>Approach for spool manager:
 *
 * PendingMessage inner class
 *
 * accept() is called....
 * checks whether needs to load PendingMessages()
 * tries to get a message()
 * if none, wait 60
 *
 * accept(long) is called
 * checks whether needs to load PendingMessages
 * tries to get a message(long)
 * if none, wait accordingly
 *
 * sync checkswhetherneedstoloadPendingMessages()
 * if pending messages has messages in immediate process, return immediately
 * if run query in last WAIT_LIMIT time, return immediately
 * query and build 2 vectors of Pending messages.
 *  Ones that need immediate processing
 *  Ones that are delayed.  put them in time order
 * return
 *
 * get_a_message()
 * loop through immediate messages.
 *  - remove top message
 *  - try to lock.  if successful, return.  otherwise loop.
 * if nothing, return null
 *
 * get_a_message(long)
 * try get_a_message()
 * check top message in pending.  if ready, then remove, try to lock, return if lock.
 * return null.
 *
 *
 * @version 1.0.0, 24/04/1999
 */
public class JDBCSpoolRepository extends JDBCMailRepository implements SpoolRepository {

    /**
     * How long a thread should sleep when there are no messages to process.
     */
    private static int WAIT_LIMIT = 60000;
    /**
     * How long we have to wait before reloading the list of pending messages
     */
    private static int LOAD_TIME_MININUM = 1000;
    /**
     * A queue in memory of messages that need processing
     */
    private LinkedList pendingMessages = new LinkedList();
    /**
     * When the queue was last read
     */
    private long pendingMessagesLoadTime = 0;
    /**
     * Maximum size of the pendingMessages queue
     */
    private int maxPendingMessages = 0;

    /**
     * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
     */
    public void configure(Configuration conf) throws ConfigurationException {
        super.configure(conf);
        maxPendingMessages = conf.getChild("maxcache").getValueAsInteger(1000);
    }

    /**
     * Return a message to process.  This is a message in the spool that is not locked.
     */
    public synchronized Mail accept() throws InterruptedException {
        return accept(new SpoolRepository.AcceptFilter () {
            public boolean accept (String _, String __, long ___, String ____) {
                return true;
            }

            public long getWaitTime () {
                return 0;
            }
        });
    }

    /**
     * Return a message that's ready to process.  If a message is of type "error"
     * then check the last updated time, and don't try it until the long 'delay' parameter
     * milliseconds has passed.
     */
    public synchronized Mail accept(final long delay) throws InterruptedException {
        return accept (new SpoolRepository.AcceptFilter () {
            long sleepUntil = 0;
                
                public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
                    if (Mail.ERROR.equals(state)) {
                        //if it's an error message, test the time
                        long processingTime = delay + lastUpdated;
                        if (processingTime < System.currentTimeMillis()) {
                            //It's time to process
                            return true;
                        } else {
                            //We don't process this, but we want to possibly reduce the amount of time
                            //  we sleep so we wake when this message is ready.
                            if (sleepUntil == 0 || processingTime < sleepUntil) {
                                sleepUntil = processingTime;
                            }
                            return false;
                        }
                    } else {
                        return true;
                    }
                }
                

                public long getWaitTime () {
                    if (sleepUntil == 0) {
                        // in AvalonSpoolRepository we return 0: why do we change sleepUntil?
                        // sleepUntil = System.currentTimeMillis();
                        return 0;
                    }
                    long waitTime = sleepUntil - System.currentTimeMillis();
                    sleepUntil = 0;
                    return waitTime <= 0 ? 1 : waitTime;
                }
                
            });
    }

    /**
     * Returns an arbitrarily selected mail deposited in this Repository for
     * which the supplied filter's accept method returns true.
     * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
     * based on number of retries if the mail is ready for processing.
     * If no message is ready the method will block until one is, the amount of time to block is
     * determined by calling the filters getWaitTime method.
     *
     * @return  the mail
     */
    public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
        while (!Thread.currentThread().isInterrupted()) {
            //Loop through until we are either out of pending messages or have a message
            // that we can lock
            PendingMessage next = null;
            while ((next = getNextPendingMessage(filter)) != null && !Thread.currentThread().isInterrupted()) {
                //Check whether this is time to expire
                
                // boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage);
                
                if (/*shouldProcess && */ lock(next.key)) {
                    try {
                        Mail mail = retrieve(next.key);
                        // Retrieve can return null if the mail is no longer on the spool
                        // (i.e. another thread has gotten to it first).
                        // In this case we simply continue to the next key
                        if (mail == null) {
                            unlock(next.key);
                            continue;
                        }
                        return mail;
                    } catch (javax.mail.MessagingException e) {
                        unlock(next.key);
                        getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
                    }
                }
            }
            //Nothing to do... sleep!
            long wait_time = filter.getWaitTime();
            if (wait_time <= 0) {
                wait_time = WAIT_LIMIT;
            }
            try {
                wait (wait_time);
            } catch (InterruptedException ex) {
                throw ex;
            }
        }
        throw new InterruptedException();
    }

    /**
     * Needs to override this method and reset the time to load to zero.
     * This will force a reload of the pending messages queue once that
     * is empty... a message that gets added will sit here until that queue
     * time has passed and the list is then reloaded.
     */
    public void store(Mail mc) throws javax.mail.MessagingException {
        pendingMessagesLoadTime = 0;
        super.store(mc);
    }

    /**
     * If not empty, gets the next pending message.  Otherwise checks
     * checks the last time pending messages was loaded and load if
     * it's been more than 1 second (should be configurable).
     */
    private PendingMessage getNextPendingMessage(SpoolRepository.AcceptFilter filter) {
        synchronized (pendingMessages) {
            if (pendingMessages.size() == 0 && pendingMessagesLoadTime < System.currentTimeMillis()) {
                // pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis();
                loadPendingMessages(filter);
                pendingMessagesLoadTime = Math.max(filter.getWaitTime(), LOAD_TIME_MININUM) + System.currentTimeMillis();
            }

            if (pendingMessages.size() == 0) {
                return null;
            } else {
                return (PendingMessage)pendingMessages.removeFirst();
            }
        }
    }

    /**
     * Retrieves the pending messages that are in the database
     */
    private void loadPendingMessages(SpoolRepository.AcceptFilter filter) {
        //Loads a vector with PendingMessage objects
        synchronized (pendingMessages) {
            pendingMessages.clear();

            Connection conn = null;
            PreparedStatement listMessages = null;
            ResultSet rsListMessages = null;
            try {
                conn = datasource.getConnection();
                listMessages =
                    conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
                listMessages.setString(1, repositoryName);
                // Too simplistic.  When filtering, we may need to see
                // more than just maxPendingMessages to load the
                // cache, so just hope that the driver and server use
                // cursors properly.
                // --> listMessages.setMaxRows(maxPendingMessages);
                rsListMessages = listMessages.executeQuery();
                // Continue to have it loop through the list of messages until we hit
                // a possible message, or we retrieve maxPendingMessages messages.
                // This maxPendingMessages cap is to avoid loading thousands or
                // hundreds of thousands of messages when the spool is enourmous.
                while (rsListMessages.next() && pendingMessages.size() < maxPendingMessages && !Thread.currentThread().isInterrupted()) {
                    String key = rsListMessages.getString(1);
                    String state = rsListMessages.getString(2);
                    long lastUpdated = rsListMessages.getTimestamp(3).getTime();
                    String errorMessage = rsListMessages.getString(4);
                    if (filter.accept(key, state, lastUpdated, errorMessage)) {
                        pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage));
                    }
                }
            } catch (SQLException sqle) {
                //Log it and avoid reloading for a bit
                getLogger().error("Error retrieving pending messages", sqle);
                pendingMessagesLoadTime = LOAD_TIME_MININUM * 10 + System.currentTimeMillis();
            } finally {
                theJDBCUtil.closeJDBCResultSet(rsListMessages);
                theJDBCUtil.closeJDBCStatement(listMessages);
                theJDBCUtil.closeJDBCConnection(conn);
            }
        }
    }

    /**
     * Simple class to hold basic information about a message in the spool
     */
    class PendingMessage {
        protected String key;
        protected String state;
        protected long lastUpdated;
        protected String errorMessage;

        public PendingMessage(String key, String state, long lastUpdated, String errorMessage) {
            this.key = key;
            this.state = state;
            this.lastUpdated = lastUpdated;
            this.errorMessage = errorMessage;
        }
    }
}