in brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java [265:423]
public T call() throws Exception {
synchronized (jobTransitionLock) {
primaryStarted = true;
primaryThread = Thread.currentThread();
for (Task<?> t: secondaryJobsAll)
((TaskInternal<?>)t).markQueued();
}
// TODO overkill having a thread/task for this, but it works
// optimisation would either use newTaskEndCallback property on task to submit
// or use some kind of single threaded executor for the queued tasks
Task<List<Object>> secondaryJobMaster = Tasks.<List<Object>>builder().dynamic(false)
.displayName("DST manager (internal)")
// TODO marking it transient helps it be GC'd sooner,
// but ideally we wouldn't have this,
// or else it would be a child
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
.body(new Callable<List<Object>>() {
@Override
public List<Object> call() throws Exception {
List<Object> result = new ArrayList<Object>();
try {
while (!secondaryQueueAborted && (!primaryFinished || !secondaryJobsRemaining.isEmpty())) {
synchronized (jobTransitionLock) {
if (!primaryFinished && secondaryJobsRemaining.isEmpty()) {
currentSecondary = null;
jobTransitionLock.wait(1000);
}
}
@SuppressWarnings("rawtypes")
Task secondaryJob = secondaryJobsRemaining.poll();
if (secondaryJob != null) {
synchronized (jobTransitionLock) {
currentSecondary = secondaryJob;
submitBackgroundInheritingContext(secondaryJob);
jobTransitionLock.notifyAll();
}
try {
result.add(secondaryJob.get());
} catch (Exception e) {
if (TaskTags.isInessential(secondaryJob)) {
result.add(Tasks.getError(secondaryJob));
if (log.isDebugEnabled())
log.debug("Secondary job queue for "+DynamicSequentialTask.this+" ignoring error in inessential task "+secondaryJob+": "+e);
} else {
if (failureHandlingConfig.cancelSecondariesOnSecondaryFailure) {
if (log.isDebugEnabled())
log.debug("Secondary job queue for "+DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in task "+secondaryJob+": "+e);
synchronized (jobTransitionLock) {
for (Task<?> t: secondaryJobsRemaining)
t.cancel(true);
jobTransitionLock.notifyAll();
}
}
if (failureHandlingConfig.abortSecondaryQueueOnSecondaryFailure) {
if (log.isDebugEnabled())
log.debug("Aborting secondary job queue for "+DynamicSequentialTask.this+" due to error in child task "+secondaryJob+" ("+e+", being rethrown)");
secondaryQueueAborted = true;
throw e;
}
if (!primaryFinished && failureHandlingConfig.cancelPrimaryOnSecondaryFailure) {
cancel(TaskCancellationMode.INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS, false);
}
result.add(Tasks.getError(secondaryJob));
if (log.isDebugEnabled())
log.debug("Secondary job queue for "+DynamicSequentialTask.this+" continuing in presence of error in child task "+secondaryJob+" ("+e+", being remembered)");
}
}
}
}
} finally {
synchronized (jobTransitionLock) {
currentSecondary = null;
finishedSecondaries = true;
jobTransitionLock.notifyAll();
}
}
return result;
}
}).build();
((BasicTask<?>)secondaryJobMaster).proxyTargetTask = DynamicSequentialTask.this;
submitBackgroundInheritingContext(secondaryJobMaster);
T result = null;
Throwable error = null;
Throwable uninterestingSelfError = null;
boolean errorIsFromChild = false;
try {
if (log.isTraceEnabled()) log.trace("calling primary job for {}", this);
if (primaryJob!=null) result = primaryJob.call();
} catch (Throwable selfException) {
Exceptions.propagateIfFatal(selfException);
if (Exceptions.getFirstThrowableOfType(selfException, QueueAbortedException.class) != null) {
// Error was caused by the task already having failed, and this thread calling queue() to try
// to queue more work. The underlying cause will be much more interesting.
// Without this special catch, we record error = "Cannot add a task to ... whose queue has been aborted",
// which gets propagated instead of the more interesting child exception.
uninterestingSelfError = selfException;
} else {
error = selfException;
errorIsFromChild = false;
}
if (failureHandlingConfig.abortSecondaryQueueOnPrimaryFailure) {
if (log.isDebugEnabled())
log.debug("Secondary job queue for "+DynamicSequentialTask.this+" aborting with "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
secondaryQueueAborted = true;
}
if (failureHandlingConfig.cancelSecondariesOnPrimaryFailure) {
if (log.isDebugEnabled())
log.debug(DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
synchronized (jobTransitionLock) {
for (Task<?> t: secondaryJobsRemaining)
t.cancel(true);
// do this early to prevent additions; and note we notify very soon below, so not notify is help off until below
primaryThread = null;
primaryFinished = true;
}
}
} finally {
try {
if (log.isTraceEnabled()) log.trace("cleaning up for {}", this);
synchronized (jobTransitionLock) {
// semaphore might be nicer here (aled notes as it is this is a little hard to read)
primaryThread = null;
primaryFinished = true;
jobTransitionLock.notifyAll();
}
if (!isCancelled() && !Thread.currentThread().isInterrupted()) {
if (log.isTraceEnabled()) log.trace("waiting for secondaries for {}", this);
// wait on tasks sequentially so that blocking information is more interesting
DynamicTasks.waitForLast();
List<Object> result2 = secondaryJobMaster.get();
try {
if (primaryJob==null) result = (T)result2;
} catch (ClassCastException e) { /* ignore class cast exception; leave the result as null */ }
}
} catch (Throwable childException) {
Exceptions.propagateIfFatal(childException);
if (error==null) {
error = childException;
errorIsFromChild = true;
} else {
if (log.isDebugEnabled()) log.debug("Parent task "+this+" ignoring child error ("+childException+") in presence of our own error ("+error+")");
}
}
}
if (error!=null) {
handleException(error, errorIsFromChild);
}
if (uninterestingSelfError != null) {
handleException(uninterestingSelfError, false);
}
return result;
}