Fields Summary |
---|
private Vector | nestedTasksCollection holding the nested tasks |
private final Object | semaphoreSemaphore to notify of completed threads |
private int | numThreadsTotal number of threads to run |
private int | numThreadsPerProcessorTotal number of threads per processor to run. |
private long | timeoutThe timeout period in milliseconds |
private volatile boolean | stillRunningIndicates threads are still running and new threads can be issued |
private boolean | timedOutIndicates that the execution timedout |
private boolean | failOnAnyIndicates whether failure of any of the nested tasks should end
execution |
private TaskList | daemonTasksThe dameon task list if any |
private StringBuffer | exceptionMessageAccumulation of exceptions messages from all nested tasks |
private int | numExceptionsNumber of exceptions from nested tasks |
private Throwable | firstExceptionThe first exception encountered |
private org.apache.tools.ant.Location | firstLocationThe location of the first exception |
Methods Summary |
---|
public void | addDaemons(org.apache.tools.ant.taskdefs.Parallel$TaskList daemonTasks)Add a group of daemon threads
if (this.daemonTasks != null) {
throw new BuildException("Only one daemon group is supported");
}
this.daemonTasks = daemonTasks;
|
public void | addTask(org.apache.tools.ant.Task nestedTask)Add a nested task to execute in parallel.
nestedTasks.addElement(nestedTask);
|
public void | execute()Execute the parallel tasks
updateThreadCounts();
if (numThreads == 0) {
numThreads = nestedTasks.size();
}
spinThreads();
|
private int | getNumProcessors()Determine the number of processors. Only effective on later VMs
try {
Class[] paramTypes = {};
Method availableProcessors =
Runtime.class.getMethod("availableProcessors", paramTypes);
Object[] args = {};
Integer ret = (Integer) availableProcessors.invoke(Runtime.getRuntime(), args);
return ret.intValue();
} catch (Exception e) {
// return a bogus number
return 0;
}
|
private void | processExceptions(org.apache.tools.ant.taskdefs.Parallel$TaskRunnable[] runnables)
if (runnables == null) {
return;
}
for (int i = 0; i < runnables.length; ++i) {
Throwable t = runnables[i].getException();
if (t != null) {
numExceptions++;
if (firstException == null) {
firstException = t;
}
if (t instanceof BuildException
&& firstLocation == Location.UNKNOWN_LOCATION) {
firstLocation = ((BuildException) t).getLocation();
}
exceptionMessage.append(StringUtils.LINE_SEP);
exceptionMessage.append(t.getMessage());
}
}
|
public void | setFailOnAny(boolean failOnAny)Control whether a failure in a nested task halts execution. Note that
the task will complete but existing threads will continue to run - they
are not stopped
this.failOnAny = failOnAny;
|
public void | setPollInterval(int pollInterval)Interval to poll for completed threads when threadCount or
threadsPerProcessor is specified. Integer in milliseconds.; optional
|
public void | setThreadCount(int numThreads)Statically determine the maximum number of tasks to execute
simultaneously. If there are less tasks than threads then all will be
executed at once, if there are more then only threadCount
tasks will be executed at one time. If threadsPerProcessor
is set and the JVM is at least a 1.4 VM then this value is
ignored.; optional
this.numThreads = numThreads;
|
public void | setThreadsPerProcessor(int numThreadsPerProcessor)Dynamically generates the number of threads to execute based on the
number of available processors (via
java.lang.Runtime.availableProcessors() ). Requires a J2SE
1.4 VM, and it will overwrite the value set in threadCount.
If used in a 1.1, 1.2, or 1.3 VM then the task will defer to
threadCount .; optional
this.numThreadsPerProcessor = numThreadsPerProcessor;
|
public void | setTimeout(long timeout)Sets the timeout on this set of tasks. If the timeout is reached
before the other threads complete, the execution of this
task completes with an exception.
Note that existing threads continue to run.
this.timeout = timeout;
|
private void | spinThreads()Spin up required threads with a maximum number active at any given time.
final int numTasks = nestedTasks.size();
TaskRunnable[] runnables = new TaskRunnable[numTasks];
stillRunning = true;
timedOut = false;
int threadNumber = 0;
for (Enumeration e = nestedTasks.elements(); e.hasMoreElements();
threadNumber++) {
Task nestedTask = (Task) e.nextElement();
runnables[threadNumber]
= new TaskRunnable(nestedTask);
}
final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
TaskRunnable[] running = new TaskRunnable[maxRunning];
threadNumber = 0;
ThreadGroup group = new ThreadGroup("parallel");
TaskRunnable[] daemons = null;
if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
daemons = new TaskRunnable[daemonTasks.tasks.size()];
}
synchronized (semaphore) {
// When we leave this block we can be sure all data is really
// stored in main memory before the new threads start, the new
// threads will for sure load the data from main memory.
//
// This probably is slightly paranoid.
}
synchronized (semaphore) {
// start any daemon threads
if (daemons != null) {
for (int i = 0; i < daemons.length; ++i) {
daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i));
Thread daemonThread = new Thread(group, daemons[i]);
daemonThread.setDaemon(true);
daemonThread.start();
}
}
// now run main threads in limited numbers...
// start initial batch of threads
for (int i = 0; i < maxRunning; ++i) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
}
if (timeout != 0) {
// start the timeout thread
Thread timeoutThread = new Thread() {
public synchronized void run() {
try {
wait(timeout);
synchronized (semaphore) {
stillRunning = false;
timedOut = true;
semaphore.notifyAll();
}
} catch (InterruptedException e) {
// ignore
}
}
};
timeoutThread.start();
}
// now find available running slots for the remaining threads
outer:
while (threadNumber < numTasks && stillRunning) {
for (int i = 0; i < maxRunning; i++) {
if (running[i] == null || running[i].isFinished()) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
// continue on outer while loop to get another
// available slot
continue outer;
}
}
// if we got here all slots in use, so sleep until
// something happens
try {
semaphore.wait();
} catch (InterruptedException ie) {
// doesn't java know interruptions are rude?
// just pretend it didn't happen and go about out business.
// sheesh!
}
}
// are all threads finished
outer2:
while (stillRunning) {
for (int i = 0; i < maxRunning; ++i) {
if (running[i] != null && !running[i].isFinished()) {
//System.out.println("Thread " + i + " is still alive ");
// still running - wait for it
try {
semaphore.wait();
} catch (InterruptedException ie) {
// who would interrupt me at a time like this?
}
continue outer2;
}
}
stillRunning = false;
}
}
if (timedOut) {
throw new BuildException("Parallel execution timed out");
}
// now did any of the threads throw an exception
exceptionMessage = new StringBuffer();
numExceptions = 0;
firstException = null;
firstLocation = Location.UNKNOWN_LOCATION;
processExceptions(daemons);
processExceptions(runnables);
if (numExceptions == 1) {
if (firstException instanceof BuildException) {
throw (BuildException) firstException;
} else {
throw new BuildException(firstException);
}
} else if (numExceptions > 1) {
throw new BuildException(exceptionMessage.toString(),
firstLocation);
}
|
private void | updateThreadCounts()Determine the number of threads based on the number of processors
if (numThreadsPerProcessor != 0) {
int numProcessors = getNumProcessors();
if (numProcessors != 0) {
numThreads = numProcessors * numThreadsPerProcessor;
}
}
|