FileDocCategorySizeDatePackage
JamesSpoolManager.javaAPI DocApache James 2.3.120946Fri Jan 12 12:56:26 GMT 2007org.apache.james.transport

JamesSpoolManager

public class JamesSpoolManager extends org.apache.avalon.framework.logger.AbstractLogEnabled implements org.apache.avalon.framework.activity.Disposable, org.apache.avalon.framework.service.Serviceable, org.apache.avalon.framework.activity.Initializable, Runnable, org.apache.avalon.framework.configuration.Configurable
Manages the mail spool. This class is responsible for retrieving messages from the spool, directing messages to the appropriate processor, and removing them from the spool when processing is complete.
version
CVS $Revision: 494012 $ $Date: 2007-01-08 11:23:58 +0100 (Mo, 08 Jan 2007) $

Fields Summary
private org.apache.avalon.framework.service.DefaultServiceManager
compMgr
System component manager
private org.apache.avalon.framework.configuration.Configuration
conf
The configuration object used by this spool manager.
private org.apache.james.services.SpoolRepository
spool
The spool that this manager will process
private HashMap
processors
The map of processor names to processors
private int
numThreads
The number of threads used to move mail through the spool.
private int
numActive
Number of active threads
private boolean
active
Spool threads are active
private Collection
spoolThreads
Spool threads
Constructors Summary
Methods Summary
public voidconfigure(org.apache.avalon.framework.configuration.Configuration conf)

see
org.apache.avalon.framework.configuration.Configurable#configure(Configuration)

        this.conf = conf;
        numThreads = conf.getChild("threads").getValueAsInteger(1);
    
public voiddispose()
The dispose operation is called at the end of a components lifecycle. Instances of this class use this method to release and destroy any resources that they own. This implementation shuts down the LinearProcessors managed by this JamesSpoolManager

throws
Exception if an error is encountered during shutdown

        getLogger().info("JamesSpoolManager dispose...");
        active = false; // shutdown the threads
        for (Iterator it = spoolThreads.iterator(); it.hasNext(); ) {
            ((Thread) it.next()).interrupt(); // interrupt any waiting accept() calls.
        }

        long stop = System.currentTimeMillis() + 60000;
        // give the spooler threads one minute to terminate gracefully
        while (numActive != 0 && stop > System.currentTimeMillis()) {
            try {
                Thread.sleep(1000);
            } catch (Exception ignored) {}
        }
        getLogger().info("JamesSpoolManager thread shutdown completed.");

        Iterator it = processors.keySet().iterator();
        while (it.hasNext()) {
            String processorName = (String)it.next();
            if (getLogger().isDebugEnabled()) {
                getLogger().debug("Processor " + processorName);
            }
            LinearProcessor processor = (LinearProcessor)processors.get(processorName);
            processor.dispose();
            processors.remove(processor);
        }
    
public voidinitialize()

see
org.apache.avalon.framework.activity.Initializable#initialize()


        getLogger().info("JamesSpoolManager init...");
        spool = (SpoolRepository) compMgr.lookup(SpoolRepository.ROLE);

        MailetLoader mailetLoader
        = (MailetLoader) compMgr.lookup(MailetLoader.ROLE);
        MatcherLoader matchLoader
        = (MatcherLoader) compMgr.lookup(MatcherLoader.ROLE);

        //A processor is a Collection of
        processors = new HashMap();

        final Configuration[] processorConfs = conf.getChildren( "processor" );
        for ( int i = 0; i < processorConfs.length; i++ )
        {
            Configuration processorConf = processorConfs[i];
            String processorName = processorConf.getAttribute("name");
            try {
                LinearProcessor processor = new LinearProcessor();
                setupLogger(processor, processorName);
                processor.setSpool(spool);
                processor.initialize();
                processors.put(processorName, processor);

                final Configuration[] mailetConfs
                    = processorConf.getChildren( "mailet" );
                // Loop through the mailet configuration, load
                // all of the matcher and mailets, and add
                // them to the processor.
                for ( int j = 0; j < mailetConfs.length; j++ )
                {
                    Configuration c = mailetConfs[j];
                    String mailetClassName = c.getAttribute("class");
                    String matcherName = c.getAttribute("match");
                    Mailet mailet = null;
                    Matcher matcher = null;
                    try {
                        matcher = matchLoader.getMatcher(matcherName);
                        //The matcher itself should log that it's been inited.
                        if (getLogger().isInfoEnabled()) {
                            StringBuffer infoBuffer =
                                new StringBuffer(64)
                                        .append("Matcher ")
                                        .append(matcherName)
                                        .append(" instantiated.");
                            getLogger().info(infoBuffer.toString());
                        }
                    } catch (MessagingException ex) {
                        // **** Do better job printing out exception
                        if (getLogger().isErrorEnabled()) {
                            StringBuffer errorBuffer =
                                new StringBuffer(256)
                                        .append("Unable to init matcher ")
                                        .append(matcherName)
                                        .append(": ")
                                        .append(ex.toString());
                            getLogger().error( errorBuffer.toString(), ex );
                if (ex.getNextException() != null) {
                getLogger().error( "Caused by nested exception: ", ex.getNextException());
                }
                        }
                        System.err.println("Unable to init matcher " + matcherName);
                        System.err.println("Check spool manager logs for more details.");
                        //System.exit(1);
                        throw ex;
                    }
                    try {
                        mailet = mailetLoader.getMailet(mailetClassName, c);
                        if (getLogger().isInfoEnabled()) {
                            StringBuffer infoBuffer =
                                new StringBuffer(64)
                                        .append("Mailet ")
                                        .append(mailetClassName)
                                        .append(" instantiated.");
                            getLogger().info(infoBuffer.toString());
                        }
                    } catch (MessagingException ex) {
                        // **** Do better job printing out exception
                        if (getLogger().isErrorEnabled()) {
                            StringBuffer errorBuffer =
                                new StringBuffer(256)
                                        .append("Unable to init mailet ")
                                        .append(mailetClassName)
                                        .append(": ")
                                        .append(ex.toString());
                            getLogger().error( errorBuffer.toString(), ex );
                if (ex.getNextException() != null) {
                getLogger().error( "Caused by nested exception: ", ex.getNextException());
                }
                        }
                        System.err.println("Unable to init mailet " + mailetClassName);
                        System.err.println("Check spool manager logs for more details.");
                        //System.exit(1);
                        throw ex;
                    }
                    //Add this pair to the processor
                    processor.add(matcher, mailet);
                }

                // Close the processor matcher/mailet lists.
                //
                // Please note that this is critical to the proper operation
                // of the LinearProcessor code.  The processor will not be
                // able to service mails until this call is made.
                processor.closeProcessorLists();

                if (getLogger().isInfoEnabled()) {
                    StringBuffer infoBuffer =
                        new StringBuffer(64)
                                .append("Processor ")
                                .append(processorName)
                                .append(" instantiated.");
                    getLogger().info(infoBuffer.toString());
                }
            } catch (Exception ex) {
                if (getLogger().isErrorEnabled()) {
                    StringBuffer errorBuffer =
                       new StringBuffer(256)
                               .append("Unable to init processor ")
                               .append(processorName)
                               .append(": ")
                               .append(ex.toString());
                    getLogger().error( errorBuffer.toString(), ex );
                }
                throw ex;
            }
        }
        if (getLogger().isInfoEnabled()) {
            StringBuffer infoBuffer =
                new StringBuffer(64)
                    .append("Spooler Manager uses ")
                    .append(numThreads)
                    .append(" Thread(s)");
            getLogger().info(infoBuffer.toString());
        }

        active = true;
        numActive = 0;
        spoolThreads = new java.util.ArrayList(numThreads);
        for ( int i = 0 ; i < numThreads ; i++ ) {
            Thread reader = new Thread(this, "Spool Thread #" + i);
            spoolThreads.add(reader);
            reader.start();
        }
    
protected voidprocess(org.apache.mailet.Mail mail)
Process this mail message by the appropriate processor as designated in the state of the Mail object.

param
mail the mail message to be processed

        while (true) {
            String processorName = mail.getState();
            if (processorName.equals(Mail.GHOST)) {
                //This message should disappear
                return;
            }
            try {
                LinearProcessor processor
                    = (LinearProcessor)processors.get(processorName);
                if (processor == null) {
                    StringBuffer exceptionMessageBuffer =
                        new StringBuffer(128)
                            .append("Unable to find processor ")
                            .append(processorName)
                            .append(" requested for processing of ")
                            .append(mail.getName());
                    String exceptionMessage = exceptionMessageBuffer.toString();
                    getLogger().debug(exceptionMessage);
                    mail.setState(Mail.ERROR);
                    throw new MailetException(exceptionMessage);
                }
                StringBuffer logMessageBuffer = null;
                if (getLogger().isDebugEnabled()) {
                    logMessageBuffer =
                        new StringBuffer(64)
                                .append("Processing ")
                                .append(mail.getName())
                                .append(" through ")
                                .append(processorName);
                    getLogger().debug(logMessageBuffer.toString());
                }
                processor.service(mail);
                if (getLogger().isDebugEnabled()) {
                    logMessageBuffer =
                        new StringBuffer(128)
                                .append("Processed ")
                                .append(mail.getName())
                                .append(" through ")
                                .append(processorName);
                    getLogger().debug(logMessageBuffer.toString());
                    getLogger().debug("Result was " + mail.getState());
                }
                return;
            } catch (Throwable e) {
                // This is a strange error situation that shouldn't ordinarily
                // happen
                StringBuffer exceptionBuffer = 
                    new StringBuffer(64)
                            .append("Exception in processor <")
                            .append(processorName)
                            .append(">");
                getLogger().error(exceptionBuffer.toString(), e);
                if (processorName.equals(Mail.ERROR)) {
                    // We got an error on the error processor...
                    // kill the message
                    mail.setState(Mail.GHOST);
                    mail.setErrorMessage(e.getMessage());
                } else {
                    //We got an error... send it to the requested processor
                    if (!(e instanceof MessagingException)) {
                        //We got an error... send it to the error processor
                        mail.setState(Mail.ERROR);
                    }
                    mail.setErrorMessage(e.getMessage());
                }
            }
            if (getLogger().isErrorEnabled()) {
                StringBuffer logMessageBuffer =
                    new StringBuffer(128)
                            .append("An error occurred processing ")
                            .append(mail.getName())
                            .append(" through ")
                            .append(processorName);
                getLogger().error(logMessageBuffer.toString());
                getLogger().error("Result was " + mail.getState());
            }
        }
    
public voidrun()
This routinely checks the message spool for messages, and processes them as necessary


        if (getLogger().isInfoEnabled())
        {
            getLogger().info("Run JamesSpoolManager: "
                             + Thread.currentThread().getName());
            getLogger().info("Spool=" + spool.getClass().getName());
        }

        numActive++;
        while(active) {
            String key = null;
            try {
                Mail mail = (Mail)spool.accept();
                key = mail.getName();
                if (getLogger().isDebugEnabled()) {
                    StringBuffer debugBuffer =
                        new StringBuffer(64)
                                .append("==== Begin processing mail ")
                                .append(mail.getName())
                                .append("====");
                    getLogger().debug(debugBuffer.toString());
                }
                process(mail);
                // Only remove an email from the spool is processing is
                // complete, or if it has no recipients
                if ((Mail.GHOST.equals(mail.getState())) ||
                    (mail.getRecipients() == null) ||
                    (mail.getRecipients().size() == 0)) {
                    ContainerUtil.dispose(mail);
                    spool.remove(key);
                    if (getLogger().isDebugEnabled()) {
                        StringBuffer debugBuffer =
                            new StringBuffer(64)
                                    .append("==== Removed from spool mail ")
                                    .append(key)
                                    .append("====");
                        getLogger().debug(debugBuffer.toString());
                    }
                }
                else {
                    // spool.remove() has a side-effect!  It unlocks the
                    // message so that other threads can work on it!  If
                    // we don't remove it, we must unlock it!
                    spool.store(mail);
                    ContainerUtil.dispose(mail);
                    spool.unlock(key);
                    // Do not notify: we simply updated the current mail
                    // and we are able to reprocess it now.
                }
                mail = null;
            } catch (InterruptedException ie) {
                getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName());
            } catch (Throwable e) {
                if (getLogger().isErrorEnabled()) {
                    getLogger().error("Exception processing " + key + " in JamesSpoolManager.run "
                                      + e.getMessage(), e);
                }
                /* Move the mail to ERROR state?  If we do, it could be
                 * deleted if an error occurs in the ERROR processor.
                 * Perhaps the answer is to resolve that issue by
                 * having a special state for messages that are not to
                 * be processed, but aren't to be deleted?  The message
                 * would already be in the spool, but would not be
                 * touched again.
                if (mail != null) {
                    try {
                        mail.setState(Mail.ERROR);
                        spool.store(mail);
                    }
                }
                */
            }
        }
        if (getLogger().isInfoEnabled())
        {
            getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
        }
        numActive--;
    
public voidservice(org.apache.avalon.framework.service.ServiceManager comp)

see
org.apache.avalon.framework.service.Serviceable#service(ServiceManager)

        // threadManager = (ThreadManager) comp.lookup(ThreadManager.ROLE);
        compMgr = new DefaultServiceManager(comp);