JDBCSpoolRepositorypublic class JDBCSpoolRepository extends JDBCMailRepository implements org.apache.james.services.SpoolRepositoryImplementation of a SpoolRepository on a database.
Requires a configuration element in the .conf.xml file of the form:
<repository destinationURL="town://path"
type="MAIL"
model="SYNCHRONOUS"/>
<driver>sun.jdbc.odbc.JdbcOdbcDriver</conn>
<conn>jdbc:odbc:LocalDB</conn>
<table>Message</table>
</repository>
destinationURL specifies..(Serge??)
Type can be SPOOL or MAIL
Model is currently not used and may be dropped
conn is the location of the ...(Serge)
table is the name of the table in the Database to be used
Requires a logger called MailRepository.
Approach for spool manager:
PendingMessage inner class
accept() is called....
checks whether needs to load PendingMessages()
tries to get a message()
if none, wait 60
accept(long) is called
checks whether needs to load PendingMessages
tries to get a message(long)
if none, wait accordingly
sync checkswhetherneedstoloadPendingMessages()
if pending messages has messages in immediate process, return immediately
if run query in last WAIT_LIMIT time, return immediately
query and build 2 vectors of Pending messages.
Ones that need immediate processing
Ones that are delayed. put them in time order
return
get_a_message()
loop through immediate messages.
- remove top message
- try to lock. if successful, return. otherwise loop.
if nothing, return null
get_a_message(long)
try get_a_message()
check top message in pending. if ready, then remove, try to lock, return if lock.
return null. |
Fields Summary |
---|
private static int | WAIT_LIMITHow long a thread should sleep when there are no messages to process. | private static int | LOAD_TIME_MININUMHow long we have to wait before reloading the list of pending messages | private LinkedList | pendingMessagesA queue in memory of messages that need processing | private long | pendingMessagesLoadTimeWhen the queue was last read | private int | maxPendingMessagesMaximum size of the pendingMessages queue |
Methods Summary |
---|
public synchronized org.apache.mailet.Mail | accept()Return a message to process. This is a message in the spool that is not locked.
return accept(new SpoolRepository.AcceptFilter () {
public boolean accept (String _, String __, long ___, String ____) {
return true;
}
public long getWaitTime () {
return 0;
}
});
| public synchronized org.apache.mailet.Mail | accept(long delay)Return a message that's ready to process. If a message is of type "error"
then check the last updated time, and don't try it until the long 'delay' parameter
milliseconds has passed.
return accept (new SpoolRepository.AcceptFilter () {
long sleepUntil = 0;
public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
if (Mail.ERROR.equals(state)) {
//if it's an error message, test the time
long processingTime = delay + lastUpdated;
if (processingTime < System.currentTimeMillis()) {
//It's time to process
return true;
} else {
//We don't process this, but we want to possibly reduce the amount of time
// we sleep so we wake when this message is ready.
if (sleepUntil == 0 || processingTime < sleepUntil) {
sleepUntil = processingTime;
}
return false;
}
} else {
return true;
}
}
public long getWaitTime () {
if (sleepUntil == 0) {
// in AvalonSpoolRepository we return 0: why do we change sleepUntil?
// sleepUntil = System.currentTimeMillis();
return 0;
}
long waitTime = sleepUntil - System.currentTimeMillis();
sleepUntil = 0;
return waitTime <= 0 ? 1 : waitTime;
}
});
| public synchronized org.apache.mailet.Mail | accept(SpoolRepository.AcceptFilter filter)Returns an arbitrarily selected mail deposited in this Repository for
which the supplied filter's accept method returns true.
Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
based on number of retries if the mail is ready for processing.
If no message is ready the method will block until one is, the amount of time to block is
determined by calling the filters getWaitTime method.
while (!Thread.currentThread().isInterrupted()) {
//Loop through until we are either out of pending messages or have a message
// that we can lock
PendingMessage next = null;
while ((next = getNextPendingMessage(filter)) != null && !Thread.currentThread().isInterrupted()) {
//Check whether this is time to expire
// boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage);
if (/*shouldProcess && */ lock(next.key)) {
try {
Mail mail = retrieve(next.key);
// Retrieve can return null if the mail is no longer on the spool
// (i.e. another thread has gotten to it first).
// In this case we simply continue to the next key
if (mail == null) {
unlock(next.key);
continue;
}
return mail;
} catch (javax.mail.MessagingException e) {
unlock(next.key);
getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
}
}
}
//Nothing to do... sleep!
long wait_time = filter.getWaitTime();
if (wait_time <= 0) {
wait_time = WAIT_LIMIT;
}
try {
wait (wait_time);
} catch (InterruptedException ex) {
throw ex;
}
}
throw new InterruptedException();
| public void | configure(org.apache.avalon.framework.configuration.Configuration conf)
super.configure(conf);
maxPendingMessages = conf.getChild("maxcache").getValueAsInteger(1000);
| private org.apache.james.mailrepository.JDBCSpoolRepository$PendingMessage | getNextPendingMessage(SpoolRepository.AcceptFilter filter)If not empty, gets the next pending message. Otherwise checks
checks the last time pending messages was loaded and load if
it's been more than 1 second (should be configurable).
synchronized (pendingMessages) {
if (pendingMessages.size() == 0 && pendingMessagesLoadTime < System.currentTimeMillis()) {
// pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis();
loadPendingMessages(filter);
pendingMessagesLoadTime = Math.max(filter.getWaitTime(), LOAD_TIME_MININUM) + System.currentTimeMillis();
}
if (pendingMessages.size() == 0) {
return null;
} else {
return (PendingMessage)pendingMessages.removeFirst();
}
}
| private void | loadPendingMessages(SpoolRepository.AcceptFilter filter)Retrieves the pending messages that are in the database
//Loads a vector with PendingMessage objects
synchronized (pendingMessages) {
pendingMessages.clear();
Connection conn = null;
PreparedStatement listMessages = null;
ResultSet rsListMessages = null;
try {
conn = datasource.getConnection();
listMessages =
conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
listMessages.setString(1, repositoryName);
// Too simplistic. When filtering, we may need to see
// more than just maxPendingMessages to load the
// cache, so just hope that the driver and server use
// cursors properly.
// --> listMessages.setMaxRows(maxPendingMessages);
rsListMessages = listMessages.executeQuery();
// Continue to have it loop through the list of messages until we hit
// a possible message, or we retrieve maxPendingMessages messages.
// This maxPendingMessages cap is to avoid loading thousands or
// hundreds of thousands of messages when the spool is enourmous.
while (rsListMessages.next() && pendingMessages.size() < maxPendingMessages && !Thread.currentThread().isInterrupted()) {
String key = rsListMessages.getString(1);
String state = rsListMessages.getString(2);
long lastUpdated = rsListMessages.getTimestamp(3).getTime();
String errorMessage = rsListMessages.getString(4);
if (filter.accept(key, state, lastUpdated, errorMessage)) {
pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage));
}
}
} catch (SQLException sqle) {
//Log it and avoid reloading for a bit
getLogger().error("Error retrieving pending messages", sqle);
pendingMessagesLoadTime = LOAD_TIME_MININUM * 10 + System.currentTimeMillis();
} finally {
theJDBCUtil.closeJDBCResultSet(rsListMessages);
theJDBCUtil.closeJDBCStatement(listMessages);
theJDBCUtil.closeJDBCConnection(conn);
}
}
| public void | store(org.apache.mailet.Mail mc)Needs to override this method and reset the time to load to zero.
This will force a reload of the pending messages queue once that
is empty... a message that gets added will sit here until that queue
time has passed and the list is then reloaded.
pendingMessagesLoadTime = 0;
super.store(mc);
|
|