FileDocCategorySizeDatePackage
JamesSpoolManager.javaAPI DocApache James 2.3.120946Fri Jan 12 12:56:26 GMT 2007org.apache.james.transport

JamesSpoolManager.java

/****************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one   *
 * or more contributor license agreements.  See the NOTICE file *
 * distributed with this work for additional information        *
 * regarding copyright ownership.  The ASF licenses this file   *
 * to you under the Apache License, Version 2.0 (the            *
 * "License"); you may not use this file except in compliance   *
 * with the License.  You may obtain a copy of the License at   *
 *                                                              *
 *   http://www.apache.org/licenses/LICENSE-2.0                 *
 *                                                              *
 * Unless required by applicable law or agreed to in writing,   *
 * software distributed under the License is distributed on an  *
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
 * KIND, either express or implied.  See the License for the    *
 * specific language governing permissions and limitations      *
 * under the License.                                           *
 ****************************************************************/

package org.apache.james.transport;

import org.apache.avalon.framework.activity.Disposable;
import org.apache.avalon.framework.activity.Initializable;
import org.apache.avalon.framework.configuration.Configurable;
import org.apache.avalon.framework.configuration.Configuration;
import org.apache.avalon.framework.configuration.ConfigurationException;
import org.apache.avalon.framework.container.ContainerUtil;
import org.apache.avalon.framework.logger.AbstractLogEnabled;
import org.apache.avalon.framework.service.DefaultServiceManager;
import org.apache.avalon.framework.service.ServiceException;
import org.apache.avalon.framework.service.ServiceManager;
import org.apache.avalon.framework.service.Serviceable;
import org.apache.james.services.MailetLoader;
import org.apache.james.services.MatcherLoader;
import org.apache.james.services.SpoolRepository;
import org.apache.mailet.Mail;
import org.apache.mailet.Mailet;
import org.apache.mailet.MailetException;
import org.apache.mailet.Matcher;

import javax.mail.MessagingException;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;

/**
 * Manages the mail spool.  This class is responsible for retrieving
 * messages from the spool, directing messages to the appropriate
 * processor, and removing them from the spool when processing is
 * complete.
 *
 * @version CVS $Revision: 494012 $ $Date: 2007-01-08 11:23:58 +0100 (Mo, 08 Jan 2007) $
 */
public class JamesSpoolManager
    extends AbstractLogEnabled
    implements Serviceable, Configurable, Initializable, Runnable, Disposable {

    /**
     * System component manager
     */
    private DefaultServiceManager compMgr;

    /**
     * The configuration object used by this spool manager.
     */
    private Configuration conf;

    /**
     * The spool that this manager will process
     */
    private SpoolRepository spool;

    /**
     * The map of processor names to processors
     */
    private HashMap processors;

    /**
     * The number of threads used to move mail through the spool.
     */
    private int numThreads;

    /**
     * The ThreadPool containing worker threads.
     *
     * This used to be used, but for threads that lived the entire
     * lifespan of the application.  Currently commented out.  In
     * the future, we could use a thread pool to run short-lived
     * workers, so that we have a smaller number of readers that
     * accept a message from the spool, and dispatch to a pool of
     * worker threads that process the message.
     */
    // private ThreadPool workerPool;

    /**
     * The ThreadManager from which the thread pool is obtained.
     */
    // private ThreadManager threadManager;

    /**
     * Number of active threads
     */
    private int numActive;

    /**
     * Spool threads are active
     */
    private boolean active;

    /**
     * Spool threads
     */
    private Collection spoolThreads;

    /**
     * @see org.apache.avalon.framework.service.Serviceable#service(ServiceManager)
     */
    public void service(ServiceManager comp) throws ServiceException {
        // threadManager = (ThreadManager) comp.lookup(ThreadManager.ROLE);
        compMgr = new DefaultServiceManager(comp);
    }

    /**
     * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
     */
    public void configure(Configuration conf) throws ConfigurationException {
        this.conf = conf;
        numThreads = conf.getChild("threads").getValueAsInteger(1);
    }

    /**
     * @see org.apache.avalon.framework.activity.Initializable#initialize()
     */
    public void initialize() throws Exception {

        getLogger().info("JamesSpoolManager init...");
        spool = (SpoolRepository) compMgr.lookup(SpoolRepository.ROLE);

        MailetLoader mailetLoader
        = (MailetLoader) compMgr.lookup(MailetLoader.ROLE);
        MatcherLoader matchLoader
        = (MatcherLoader) compMgr.lookup(MatcherLoader.ROLE);

        //A processor is a Collection of
        processors = new HashMap();

        final Configuration[] processorConfs = conf.getChildren( "processor" );
        for ( int i = 0; i < processorConfs.length; i++ )
        {
            Configuration processorConf = processorConfs[i];
            String processorName = processorConf.getAttribute("name");
            try {
                LinearProcessor processor = new LinearProcessor();
                setupLogger(processor, processorName);
                processor.setSpool(spool);
                processor.initialize();
                processors.put(processorName, processor);

                final Configuration[] mailetConfs
                    = processorConf.getChildren( "mailet" );
                // Loop through the mailet configuration, load
                // all of the matcher and mailets, and add
                // them to the processor.
                for ( int j = 0; j < mailetConfs.length; j++ )
                {
                    Configuration c = mailetConfs[j];
                    String mailetClassName = c.getAttribute("class");
                    String matcherName = c.getAttribute("match");
                    Mailet mailet = null;
                    Matcher matcher = null;
                    try {
                        matcher = matchLoader.getMatcher(matcherName);
                        //The matcher itself should log that it's been inited.
                        if (getLogger().isInfoEnabled()) {
                            StringBuffer infoBuffer =
                                new StringBuffer(64)
                                        .append("Matcher ")
                                        .append(matcherName)
                                        .append(" instantiated.");
                            getLogger().info(infoBuffer.toString());
                        }
                    } catch (MessagingException ex) {
                        // **** Do better job printing out exception
                        if (getLogger().isErrorEnabled()) {
                            StringBuffer errorBuffer =
                                new StringBuffer(256)
                                        .append("Unable to init matcher ")
                                        .append(matcherName)
                                        .append(": ")
                                        .append(ex.toString());
                            getLogger().error( errorBuffer.toString(), ex );
                if (ex.getNextException() != null) {
                getLogger().error( "Caused by nested exception: ", ex.getNextException());
                }
                        }
                        System.err.println("Unable to init matcher " + matcherName);
                        System.err.println("Check spool manager logs for more details.");
                        //System.exit(1);
                        throw ex;
                    }
                    try {
                        mailet = mailetLoader.getMailet(mailetClassName, c);
                        if (getLogger().isInfoEnabled()) {
                            StringBuffer infoBuffer =
                                new StringBuffer(64)
                                        .append("Mailet ")
                                        .append(mailetClassName)
                                        .append(" instantiated.");
                            getLogger().info(infoBuffer.toString());
                        }
                    } catch (MessagingException ex) {
                        // **** Do better job printing out exception
                        if (getLogger().isErrorEnabled()) {
                            StringBuffer errorBuffer =
                                new StringBuffer(256)
                                        .append("Unable to init mailet ")
                                        .append(mailetClassName)
                                        .append(": ")
                                        .append(ex.toString());
                            getLogger().error( errorBuffer.toString(), ex );
                if (ex.getNextException() != null) {
                getLogger().error( "Caused by nested exception: ", ex.getNextException());
                }
                        }
                        System.err.println("Unable to init mailet " + mailetClassName);
                        System.err.println("Check spool manager logs for more details.");
                        //System.exit(1);
                        throw ex;
                    }
                    //Add this pair to the processor
                    processor.add(matcher, mailet);
                }

                // Close the processor matcher/mailet lists.
                //
                // Please note that this is critical to the proper operation
                // of the LinearProcessor code.  The processor will not be
                // able to service mails until this call is made.
                processor.closeProcessorLists();

                if (getLogger().isInfoEnabled()) {
                    StringBuffer infoBuffer =
                        new StringBuffer(64)
                                .append("Processor ")
                                .append(processorName)
                                .append(" instantiated.");
                    getLogger().info(infoBuffer.toString());
                }
            } catch (Exception ex) {
                if (getLogger().isErrorEnabled()) {
                    StringBuffer errorBuffer =
                       new StringBuffer(256)
                               .append("Unable to init processor ")
                               .append(processorName)
                               .append(": ")
                               .append(ex.toString());
                    getLogger().error( errorBuffer.toString(), ex );
                }
                throw ex;
            }
        }
        if (getLogger().isInfoEnabled()) {
            StringBuffer infoBuffer =
                new StringBuffer(64)
                    .append("Spooler Manager uses ")
                    .append(numThreads)
                    .append(" Thread(s)");
            getLogger().info(infoBuffer.toString());
        }

        active = true;
        numActive = 0;
        spoolThreads = new java.util.ArrayList(numThreads);
        for ( int i = 0 ; i < numThreads ; i++ ) {
            Thread reader = new Thread(this, "Spool Thread #" + i);
            spoolThreads.add(reader);
            reader.start();
        }
    }

    /**
     * This routinely checks the message spool for messages, and processes
     * them as necessary
     */
    public void run() {

        if (getLogger().isInfoEnabled())
        {
            getLogger().info("Run JamesSpoolManager: "
                             + Thread.currentThread().getName());
            getLogger().info("Spool=" + spool.getClass().getName());
        }

        numActive++;
        while(active) {
            String key = null;
            try {
                Mail mail = (Mail)spool.accept();
                key = mail.getName();
                if (getLogger().isDebugEnabled()) {
                    StringBuffer debugBuffer =
                        new StringBuffer(64)
                                .append("==== Begin processing mail ")
                                .append(mail.getName())
                                .append("====");
                    getLogger().debug(debugBuffer.toString());
                }
                process(mail);
                // Only remove an email from the spool is processing is
                // complete, or if it has no recipients
                if ((Mail.GHOST.equals(mail.getState())) ||
                    (mail.getRecipients() == null) ||
                    (mail.getRecipients().size() == 0)) {
                    ContainerUtil.dispose(mail);
                    spool.remove(key);
                    if (getLogger().isDebugEnabled()) {
                        StringBuffer debugBuffer =
                            new StringBuffer(64)
                                    .append("==== Removed from spool mail ")
                                    .append(key)
                                    .append("====");
                        getLogger().debug(debugBuffer.toString());
                    }
                }
                else {
                    // spool.remove() has a side-effect!  It unlocks the
                    // message so that other threads can work on it!  If
                    // we don't remove it, we must unlock it!
                    spool.store(mail);
                    ContainerUtil.dispose(mail);
                    spool.unlock(key);
                    // Do not notify: we simply updated the current mail
                    // and we are able to reprocess it now.
                }
                mail = null;
            } catch (InterruptedException ie) {
                getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName());
            } catch (Throwable e) {
                if (getLogger().isErrorEnabled()) {
                    getLogger().error("Exception processing " + key + " in JamesSpoolManager.run "
                                      + e.getMessage(), e);
                }
                /* Move the mail to ERROR state?  If we do, it could be
                 * deleted if an error occurs in the ERROR processor.
                 * Perhaps the answer is to resolve that issue by
                 * having a special state for messages that are not to
                 * be processed, but aren't to be deleted?  The message
                 * would already be in the spool, but would not be
                 * touched again.
                if (mail != null) {
                    try {
                        mail.setState(Mail.ERROR);
                        spool.store(mail);
                    }
                }
                */
            }
        }
        if (getLogger().isInfoEnabled())
        {
            getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
        }
        numActive--;
    }

    /**
     * Process this mail message by the appropriate processor as designated
     * in the state of the Mail object.
     *
     * @param mail the mail message to be processed
     */
    protected void process(Mail mail) {
        while (true) {
            String processorName = mail.getState();
            if (processorName.equals(Mail.GHOST)) {
                //This message should disappear
                return;
            }
            try {
                LinearProcessor processor
                    = (LinearProcessor)processors.get(processorName);
                if (processor == null) {
                    StringBuffer exceptionMessageBuffer =
                        new StringBuffer(128)
                            .append("Unable to find processor ")
                            .append(processorName)
                            .append(" requested for processing of ")
                            .append(mail.getName());
                    String exceptionMessage = exceptionMessageBuffer.toString();
                    getLogger().debug(exceptionMessage);
                    mail.setState(Mail.ERROR);
                    throw new MailetException(exceptionMessage);
                }
                StringBuffer logMessageBuffer = null;
                if (getLogger().isDebugEnabled()) {
                    logMessageBuffer =
                        new StringBuffer(64)
                                .append("Processing ")
                                .append(mail.getName())
                                .append(" through ")
                                .append(processorName);
                    getLogger().debug(logMessageBuffer.toString());
                }
                processor.service(mail);
                if (getLogger().isDebugEnabled()) {
                    logMessageBuffer =
                        new StringBuffer(128)
                                .append("Processed ")
                                .append(mail.getName())
                                .append(" through ")
                                .append(processorName);
                    getLogger().debug(logMessageBuffer.toString());
                    getLogger().debug("Result was " + mail.getState());
                }
                return;
            } catch (Throwable e) {
                // This is a strange error situation that shouldn't ordinarily
                // happen
                StringBuffer exceptionBuffer = 
                    new StringBuffer(64)
                            .append("Exception in processor <")
                            .append(processorName)
                            .append(">");
                getLogger().error(exceptionBuffer.toString(), e);
                if (processorName.equals(Mail.ERROR)) {
                    // We got an error on the error processor...
                    // kill the message
                    mail.setState(Mail.GHOST);
                    mail.setErrorMessage(e.getMessage());
                } else {
                    //We got an error... send it to the requested processor
                    if (!(e instanceof MessagingException)) {
                        //We got an error... send it to the error processor
                        mail.setState(Mail.ERROR);
                    }
                    mail.setErrorMessage(e.getMessage());
                }
            }
            if (getLogger().isErrorEnabled()) {
                StringBuffer logMessageBuffer =
                    new StringBuffer(128)
                            .append("An error occurred processing ")
                            .append(mail.getName())
                            .append(" through ")
                            .append(processorName);
                getLogger().error(logMessageBuffer.toString());
                getLogger().error("Result was " + mail.getState());
            }
        }
    }

    /**
     * The dispose operation is called at the end of a components lifecycle.
     * Instances of this class use this method to release and destroy any
     * resources that they own.
     *
     * This implementation shuts down the LinearProcessors managed by this
     * JamesSpoolManager
     *
     * @throws Exception if an error is encountered during shutdown
     */
    public void dispose() {
        getLogger().info("JamesSpoolManager dispose...");
        active = false; // shutdown the threads
        for (Iterator it = spoolThreads.iterator(); it.hasNext(); ) {
            ((Thread) it.next()).interrupt(); // interrupt any waiting accept() calls.
        }

        long stop = System.currentTimeMillis() + 60000;
        // give the spooler threads one minute to terminate gracefully
        while (numActive != 0 && stop > System.currentTimeMillis()) {
            try {
                Thread.sleep(1000);
            } catch (Exception ignored) {}
        }
        getLogger().info("JamesSpoolManager thread shutdown completed.");

        Iterator it = processors.keySet().iterator();
        while (it.hasNext()) {
            String processorName = (String)it.next();
            if (getLogger().isDebugEnabled()) {
                getLogger().debug("Processor " + processorName);
            }
            LinearProcessor processor = (LinearProcessor)processors.get(processorName);
            processor.dispose();
            processors.remove(processor);
        }
    }

}