in src/main/org/apache/tools/ant/taskdefs/Parallel.java [256:401]
private void spinThreads() throws BuildException {
stillRunning = true;
timedOut = false;
boolean interrupted = false;
TaskRunnable[] runnables = nestedTasks.stream().map(TaskRunnable::new)
.toArray(TaskRunnable[]::new);
final int numTasks = nestedTasks.size();
final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
TaskRunnable[] running = new TaskRunnable[maxRunning];
ThreadGroup group = new ThreadGroup("parallel");
TaskRunnable[] daemons = null;
if (daemonTasks != null && !daemonTasks.tasks.isEmpty()) {
daemons = new TaskRunnable[daemonTasks.tasks.size()];
}
synchronized (semaphore) {
// When we leave this block we can be sure all data is really
// stored in main memory before the new threads start, the new
// threads will for sure load the data from main memory.
//
// This probably is slightly paranoid.
}
synchronized (semaphore) {
// start any daemon threads
if (daemons != null) {
for (int i = 0; i < daemons.length; ++i) {
daemons[i] = new TaskRunnable(daemonTasks.tasks.get(i));
Thread daemonThread = new Thread(group, daemons[i]);
daemonThread.setDaemon(true);
daemonThread.start();
}
}
// now run main threads in limited numbers...
// start initial batch of threads
int threadNumber = 0;
for (int i = 0; i < maxRunning; ++i) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
}
if (timeout != 0) {
// start the timeout thread
Thread timeoutThread = new Thread() {
@Override
public synchronized void run() {
try {
final long start = System.currentTimeMillis();
final long end = start + timeout;
long now = System.currentTimeMillis();
while (now < end) {
wait(end - now);
now = System.currentTimeMillis();
}
synchronized (semaphore) {
stillRunning = false;
timedOut = true;
semaphore.notifyAll();
}
} catch (InterruptedException e) {
// ignore
}
}
};
timeoutThread.start();
}
try {
// now find available running slots for the remaining threads
outer: while (threadNumber < numTasks && stillRunning) {
for (int i = 0; i < maxRunning; i++) {
if (running[i] == null || running[i].isFinished()) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
// continue on outer while loop to get another
// available slot
continue outer;
}
}
// if we got here all slots in use, so sleep until
// something happens
semaphore.wait();
}
// are all threads finished
outer2: while (stillRunning) {
for (int i = 0; i < maxRunning; ++i) {
if (running[i] != null && !running[i].isFinished()) {
// System.out.println("Thread " + i + " is still
// alive ");
// still running - wait for it
semaphore.wait(); //NOSONAR
continue outer2;
}
}
stillRunning = false;
}
} catch (InterruptedException ie) {
interrupted = true;
}
if (!timedOut && !failOnAny) {
// https://issues.apache.org/bugzilla/show_bug.cgi?id=49527
killAll(running);
}
}
if (interrupted) {
throw new BuildException("Parallel execution interrupted.");
}
if (timedOut) {
throw new BuildException("Parallel execution timed out");
}
// now did any of the threads throw an exception
exceptionMessage = new StringBuffer();
numExceptions = 0;
firstException = null;
firstExitStatus = null;
firstLocation = Location.UNKNOWN_LOCATION;
processExceptions(daemons);
processExceptions(runnables);
if (numExceptions == 1) {
if (firstException instanceof BuildException) {
throw (BuildException) firstException;
}
throw new BuildException(firstException);
}
if (numExceptions > 1) {
if (firstExitStatus == null) {
throw new BuildException(exceptionMessage.toString(),
firstLocation);
}
throw new ExitStatusException(exceptionMessage.toString(),
firstExitStatus, firstLocation);
}
}