Parallel.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.tools.ant.taskdefs;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import org.apache.tools.ant.BuildException;
import org.apache.tools.ant.ExitStatusException;
import org.apache.tools.ant.Location;
import org.apache.tools.ant.Task;
import org.apache.tools.ant.TaskContainer;
import org.apache.tools.ant.property.LocalProperties;
import org.apache.tools.ant.util.StringUtils;
/**
* Executes the contained tasks in separate threads, continuing
* once all are completed.
* <p>
* New behavior allows for the ant script to specify a maximum number of
* threads that will be executed in parallel. One should be very careful about
* using the <code>waitFor</code> task when specifying <code>threadCount</code>
* as it can cause deadlocks if the number of threads is too small or if one of
* the nested tasks fails to execute completely. The task selection algorithm
* will insure that the tasks listed before a task have started before that
* task is started, but it will not insure a successful completion of those
* tasks or that those tasks will finish first (i.e. it's a classic race
* condition).
* </p>
* @since Ant 1.4
*
* @ant.task category="control"
*/
public class Parallel extends Task
implements TaskContainer {
private static final int NUMBER_TRIES = 100;
/** Class which holds a list of tasks to execute */
public static class TaskList implements TaskContainer {
/** Collection holding the nested tasks */
private List<Task> tasks = new ArrayList<>();
/**
* Add a nested task to execute parallel (asynchron).
* <p>
* @param nestedTask Nested task to be executed in parallel.
* must not be null.
*/
@Override
public void addTask(Task nestedTask) {
tasks.add(nestedTask);
}
}
/** Collection holding the nested tasks */
private Vector<Task> nestedTasks = new Vector<>();
/** Semaphore to notify of completed threads */
private final Object semaphore = new Object();
/** Total number of threads to run */
private int numThreads = 0;
/** Total number of threads per processor to run. */
private int numThreadsPerProcessor = 0;
/** The timeout period in milliseconds */
private long timeout;
/** Indicates threads are still running and new threads can be issued */
private volatile boolean stillRunning;
/** Indicates that the execution timedout */
private boolean timedOut;
/**
* Indicates whether failure of any of the nested tasks should end
* execution
*/
private boolean failOnAny;
/** The dameon task list if any */
private TaskList daemonTasks;
/** Accumulation of exceptions messages from all nested tasks */
private StringBuffer exceptionMessage;
/** Number of exceptions from nested tasks */
private int numExceptions = 0;
/** The first exception encountered */
private Throwable firstException;
/** The location of the first exception */
private Location firstLocation;
/** The status of the first ExitStatusException. */
private Integer firstExitStatus;
/**
* Add a group of daemon threads
* @param daemonTasks The tasks to be executed as daemon.
*/
public void addDaemons(TaskList daemonTasks) {
if (this.daemonTasks != null) {
throw new BuildException("Only one daemon group is supported");
}
this.daemonTasks = daemonTasks;
}
/**
* Interval to poll for completed threads when threadCount or
* threadsPerProcessor is specified. Integer in milliseconds.; optional
*
* @param pollInterval New value of property pollInterval.
*/
public void setPollInterval(int pollInterval) {
}
/**
* Control whether a failure in a nested task halts execution. Note that
* the task will complete but existing threads will continue to run - they
* are not stopped
*
* @param failOnAny if true any nested task failure causes parallel to
* complete.
*/
public void setFailOnAny(boolean failOnAny) {
this.failOnAny = failOnAny;
}
/**
* Add a nested task to execute in parallel.
* @param nestedTask Nested task to be executed in parallel
*/
@Override
public void addTask(Task nestedTask) {
nestedTasks.addElement(nestedTask);
}
/**
* Dynamically generates the number of threads to execute based on the
* number of available processors (via
* <code>java.lang.Runtime.availableProcessors()</code>).
* Will overwrite the value set in threadCount; optional
* @param numThreadsPerProcessor Number of threads to create per available
* processor.
*
*/
public void setThreadsPerProcessor(int numThreadsPerProcessor) {
this.numThreadsPerProcessor = numThreadsPerProcessor;
}
/**
* Statically determine the maximum number of tasks to execute
* simultaneously. If there are less tasks than threads then all will be
* executed at once, if there are more then only <code>threadCount</code>
* tasks will be executed at one time. If <code>threadsPerProcessor</code>
* is set then this value is
* ignored.; optional
*
* @param numThreads total number of threads.
*
*/
public void setThreadCount(int numThreads) {
this.numThreads = numThreads;
}
/**
* Sets the timeout on this set of tasks. If the timeout is reached
* before the other threads complete, the execution of this
* task completes with an exception.
*
* Note that existing threads continue to run.
*
* @param timeout timeout in milliseconds.
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/**
* Execute the parallel tasks
*
* @exception BuildException if any of the threads failed.
*/
@Override
public void execute() throws BuildException {
updateThreadCounts();
if (numThreads == 0) {
numThreads = nestedTasks.size();
}
spinThreads();
}
/**
* Determine the number of threads based on the number of processors
*/
private void updateThreadCounts() {
if (numThreadsPerProcessor != 0) {
numThreads = Runtime.getRuntime().availableProcessors() *
numThreadsPerProcessor;
}
}
private void processExceptions(TaskRunnable[] runnables) {
if (runnables == null) {
return;
}
for (TaskRunnable runnable : runnables) {
Throwable t = runnable.getException();
if (t != null) {
numExceptions++;
if (firstException == null) {
firstException = t;
}
if (t instanceof BuildException
&& firstLocation == Location.UNKNOWN_LOCATION) {
firstLocation = ((BuildException) t).getLocation();
}
if (t instanceof ExitStatusException
&& firstExitStatus == null) {
ExitStatusException ex = (ExitStatusException) t;
firstExitStatus = ex.getStatus();
// potentially overwriting existing value but the
// location should match the exit status
firstLocation = ex.getLocation();
}
exceptionMessage.append(StringUtils.LINE_SEP);
exceptionMessage.append(t.getMessage());
}
}
}
/**
* Spin up required threads with a maximum number active at any given time.
*
* @exception BuildException if any of the threads failed.
*/
private void spinThreads() throws BuildException {
stillRunning = true;
timedOut = false;
boolean interrupted = false;
TaskRunnable[] runnables = nestedTasks.stream().map(TaskRunnable::new)
.toArray(TaskRunnable[]::new);
final int numTasks = nestedTasks.size();
final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
TaskRunnable[] running = new TaskRunnable[maxRunning];
ThreadGroup group = new ThreadGroup("parallel");
TaskRunnable[] daemons = null;
if (!(daemonTasks == null || daemonTasks.tasks.isEmpty())) {
daemons = new TaskRunnable[daemonTasks.tasks.size()];
}
synchronized (semaphore) {
// When we leave this block we can be sure all data is really
// stored in main memory before the new threads start, the new
// threads will for sure load the data from main memory.
//
// This probably is slightly paranoid.
}
synchronized (semaphore) {
// start any daemon threads
if (daemons != null) {
for (int i = 0; i < daemons.length; ++i) {
daemons[i] = new TaskRunnable(daemonTasks.tasks.get(i));
Thread daemonThread = new Thread(group, daemons[i]);
daemonThread.setDaemon(true);
daemonThread.start();
}
}
// now run main threads in limited numbers...
// start initial batch of threads
int threadNumber = 0;
for (int i = 0; i < maxRunning; ++i) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
}
if (timeout != 0) {
// start the timeout thread
Thread timeoutThread = new Thread() {
@Override
public synchronized void run() {
try {
final long start = System.currentTimeMillis();
final long end = start + timeout;
long now = System.currentTimeMillis();
while (now < end) {
wait(end - now);
now = System.currentTimeMillis();
}
synchronized (semaphore) {
stillRunning = false;
timedOut = true;
semaphore.notifyAll();
}
} catch (InterruptedException e) {
// ignore
}
}
};
timeoutThread.start();
}
try {
// now find available running slots for the remaining threads
outer: while (threadNumber < numTasks && stillRunning) {
for (int i = 0; i < maxRunning; i++) {
if (running[i] == null || running[i].isFinished()) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
// continue on outer while loop to get another
// available slot
continue outer;
}
}
// if we got here all slots in use, so sleep until
// something happens
semaphore.wait();
}
// are all threads finished
outer2: while (stillRunning) {
for (int i = 0; i < maxRunning; ++i) {
if (running[i] != null && !running[i].isFinished()) {
// System.out.println("Thread " + i + " is still
// alive ");
// still running - wait for it
semaphore.wait(); //NOSONAR
continue outer2;
}
}
stillRunning = false;
}
} catch (InterruptedException ie) {
interrupted = true;
}
if (!timedOut && !failOnAny) {
// https://issues.apache.org/bugzilla/show_bug.cgi?id=49527
killAll(running);
}
}
if (interrupted) {
throw new BuildException("Parallel execution interrupted.");
}
if (timedOut) {
throw new BuildException("Parallel execution timed out");
}
// now did any of the threads throw an exception
exceptionMessage = new StringBuffer();
numExceptions = 0;
firstException = null;
firstExitStatus = null;
firstLocation = Location.UNKNOWN_LOCATION;
processExceptions(daemons);
processExceptions(runnables);
if (numExceptions == 1) {
if (firstException instanceof BuildException) {
throw (BuildException) firstException;
}
throw new BuildException(firstException);
}
if (numExceptions > 1) {
if (firstExitStatus == null) {
throw new BuildException(exceptionMessage.toString(),
firstLocation);
}
throw new ExitStatusException(exceptionMessage.toString(),
firstExitStatus, firstLocation);
}
}
/**
* Doesn't do anything if all threads where already gone,
* else it tries to interrupt the threads 100 times.
* @param running The list of tasks that may currently be running.
*/
private void killAll(TaskRunnable[] running) {
boolean oneAlive;
int tries = 0;
do {
oneAlive = false;
for (int i = 0; i < running.length; i++) {
if (running[i] != null && !running[i].isFinished()) {
running[i].interrupt();
Thread.yield();
oneAlive = true;
}
}
if (oneAlive) {
tries++;
Thread.yield();
}
} while (oneAlive && tries < NUMBER_TRIES);
}
/**
* thread that execs a task
*/
private class TaskRunnable implements Runnable {
private Throwable exception;
private Task task;
private boolean finished;
private volatile Thread thread;
/**
* Construct a new TaskRunnable.<p>
*
* @param task the Task to be executed in a separate thread
*/
TaskRunnable(Task task) {
this.task = task;
}
/**
* Executes the task within a thread and takes care about
* Exceptions raised within the task.
*/
@Override
public void run() {
try {
LocalProperties.get(getProject()).copy();
thread = Thread.currentThread();
task.perform();
} catch (Throwable t) {
exception = t;
if (failOnAny) {
stillRunning = false;
}
} finally {
synchronized (semaphore) {
finished = true;
semaphore.notifyAll();
}
}
}
/**
* get any exception that got thrown during execution;
* @return an exception or null for no exception/not yet finished
*/
public Throwable getException() {
return exception;
}
/**
* Provides the indicator that the task has been finished.
* @return Returns true when the task is finished.
*/
boolean isFinished() {
return finished;
}
void interrupt() {
thread.interrupt();
}
}
}