public T call()

in core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java [286:446]


        public T call() throws Exception {

            synchronized (jobTransitionLock) {
                primaryStarted = true;
                primaryThread = Thread.currentThread();
                for (Task<?> t: secondaryJobsAll)
                    ((TaskInternal<?>)t).markQueued();
            }
            // TODO overkill having a thread/task for this, but it works
            // optimisation would either use newTaskEndCallback property on task to submit
            // or use some kind of single threaded executor for the queued tasks
            Task<List<Object>> secondaryJobMaster = Tasks.<List<Object>>builder().dynamic(false)
                    .displayName(TASK_NAME)
                    // marking it transient helps it be GC'd sooner,
                    // but ideally we wouldn't have this,
                    // or else it would be a child
                    .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
                    .body(new Callable<List<Object>>() {

                @Override
                public List<Object> call() throws Exception {
                    List<Object> result = new ArrayList<Object>();
                    try { 
                        while (!secondaryQueueAborted && (!primaryFinished || !secondaryJobsRemaining.isEmpty())) {
                            if (isCancelled()) throw new CancellationException();
                            synchronized (jobTransitionLock) {
                                if (!primaryFinished && secondaryJobsRemaining.isEmpty()) {
                                    currentSecondary = null;
                                    jobTransitionLock.wait(1000);
                                }
                            }
                            @SuppressWarnings("rawtypes")
                            Task secondaryJob = secondaryJobsRemaining.poll();
                            if (secondaryJob != null) {
                                if (isCancelled()) throw new CancellationException();
                                synchronized (jobTransitionLock) {
                                    currentSecondary = secondaryJob;
                                    submitBackgroundInheritingContext(secondaryJob);
                                    jobTransitionLock.notifyAll();
                                }
                                try {
                                    result.add(secondaryJob.get());
                                } catch (Exception e) {
                                    if (TaskTags.isInessential(secondaryJob)) {
                                        result.add(Tasks.getError(secondaryJob));
                                        if (log.isDebugEnabled())
                                            log.debug("Secondary job queue for "+DynamicSequentialTask.this+" ignoring error in inessential task "+secondaryJob+": "+e);
                                    } else {
                                        if (failureHandlingConfig.cancelSecondariesOnSecondaryFailure) {
                                            if (log.isDebugEnabled())
                                                log.debug("Secondary job queue for "+DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in task "+secondaryJob+": "+e);
                                            synchronized (jobTransitionLock) {
                                                for (Task<?> t: secondaryJobsRemaining)
                                                    t.cancel(true);
                                                jobTransitionLock.notifyAll();
                                            }
                                        }
                                        
                                        if (failureHandlingConfig.abortSecondaryQueueOnSecondaryFailure) {
                                            if (log.isDebugEnabled())
                                                log.debug("Aborting secondary job queue for "+DynamicSequentialTask.this+" due to error in child task "+secondaryJob+" ("+e+", being rethrown)");
                                            secondaryQueueAborted = true;
                                            throw e;
                                        }

                                        if (!primaryFinished && failureHandlingConfig.cancelPrimaryOnSecondaryFailure) {
                                            cancel(TaskCancellationMode.INTERRUPT_TASK_BUT_NOT_SUBMITTED_TASKS, false);
                                        }
                                        
                                        result.add(Tasks.getError(secondaryJob));
                                        if (log.isDebugEnabled())
                                            log.debug("Secondary job queue for "+DynamicSequentialTask.this+" continuing in presence of error in child task "+secondaryJob+" ("+e+", being remembered)");
                                    }
                                }
                            }
                        }
                    } finally {
                        synchronized (jobTransitionLock) {
                            currentSecondary = null;
                            finishedSecondaries = true;
                            jobTransitionLock.notifyAll();
                        }
                    }
                    return result;
                }
            }).build();
            ((BasicTask<?>)secondaryJobMaster).proxyTargetTask = DynamicSequentialTask.this;
            
            submitBackgroundInheritingContext(secondaryJobMaster);
            
            T result = null;
            Throwable error = null;
            Throwable uninterestingSelfError = null;
            boolean errorIsFromChild = false;
            try {
                if (log.isTraceEnabled()) log.trace("calling primary job for {}", this);
                if (primaryJob!=null) result = primaryJob.call();
            } catch (Throwable selfException) {
                Exceptions.propagateIfFatal(selfException);
                if (Exceptions.getFirstThrowableOfType(selfException, QueueAbortedException.class) != null) {
                    // Error was caused by the task already having failed, and this thread calling queue() to try
                    // to queue more work. The underlying cause will be much more interesting.
                    // Without this special catch, we record error = "Cannot add a task to ... whose queue has been aborted",
                    // which gets propagated instead of the more interesting child exception.
                    uninterestingSelfError = selfException;
                } else {
                    error = selfException;
                    errorIsFromChild = false;
                }
                if (failureHandlingConfig.abortSecondaryQueueOnPrimaryFailure) {
                    if (log.isDebugEnabled())
                        log.debug("Secondary job queue for "+DynamicSequentialTask.this+" aborting with "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
                    secondaryQueueAborted = true;
                }
                if (failureHandlingConfig.cancelSecondariesOnPrimaryFailure) {
                    if (log.isDebugEnabled())
                        log.debug(DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException);
                    synchronized (jobTransitionLock) {
                        for (Task<?> t: secondaryJobsRemaining)
                            t.cancel(true);
                        // do this early to prevent additions; and note we notify very soon below, so not notify is help off until below
                        primaryThread = null;
                        primaryFinished = true;
                    }
                }
            } finally {
                try {
                    if (log.isTraceEnabled()) log.trace("cleaning up for {}", this);
                    synchronized (jobTransitionLock) {
                        // semaphore might be nicer here (aled notes as it is this is a little hard to read)
                        primaryThread = null;
                        primaryFinished = true;
                        jobTransitionLock.notifyAll();
                    }
                    if (!isCancelled() && !Thread.currentThread().isInterrupted()) {
                        if (log.isTraceEnabled()) log.trace("waiting for secondaries for {}", this);
                        // wait on tasks sequentially so that blocking information is more interesting
                        DynamicTasks.waitForLast();
                        List<Object> result2 = secondaryJobMaster.get();
                        try {
                            if (primaryJob==null) result = (T)result2;
                        } catch (ClassCastException e) { /* ignore class cast exception; leave the result as null */ }
                    }
                } catch (Throwable childException) {
                    Exceptions.propagateIfFatal(childException);
                    if (error==null) {
                        error = childException;
                        errorIsFromChild = true;
                    } else {
                        if (log.isDebugEnabled()) log.debug("Parent task "+this+" ignoring child error ("+childException+") in presence of our own error ("+error+")");
                    }
                }
            }
            if (error!=null) {
                handleException(error, errorIsFromChild);
            }
            if (uninterestingSelfError != null) {
                handleException(uninterestingSelfError, false);
            }
            return result;
        }