in core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java [1468:1584]
private void runCurrentStepInstanceApproved(WorkflowStepDefinition step) {
stepsRun++;
Task<?> t;
if (continuationInstructions!=null) {
t = step.newTaskContinuing(currentStepInstance, continuationInstructions);
continuationInstructions = null;
} else {
t = step.newTask(currentStepInstance);
}
updateOldNextStepOnThisStepStarting();
// about to run -- checkpoint noting current and previous steps, and updating replayable from info
OldStepRecord currentStepRecord = oldStepInfo.compute(currentStepIndex, (index, old) -> {
if (old == null) old = new OldStepRecord();
old.countStarted++;
old.workflowScratchUpdates = null;
old.previous = MutableSet.<Integer>of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous);
old.previousTaskId = previousStepTaskId;
old.nextTaskId = null;
return old;
});
WorkflowReplayUtils.updateReplayableFromStep(WorkflowExecutionContext.this, step);
oldStepInfo.compute(previousStepIndex==null ? STEP_INDEX_FOR_START : previousStepIndex, (index, old) -> {
if (old==null) old = new OldStepRecord();
if (previousStepIndex==null && workflowScratchVariables!=null && !workflowScratchVariables.isEmpty()) {
// if workflow scratch vars were initialized prior to run, we nee to save those
old.workflowScratch = MutableMap.copyOf(workflowScratchVariables);
}
old.next = MutableSet.<Integer>of(currentStepIndex).putAll(old.next);
old.nextTaskId = t.getId();
return old;
});
errorHandlerContext = null;
errorHandlerTaskId = null;
currentStepInstance.next = null; // clear, eg if was set from a previous run; will be reset from step definition
// but don't clear output, in case a step is returning to itself and wants to reference previous_step.output
persist();
BiConsumer<Object,Object> onFinish = (stepOutputDefinition,overrideNext) -> {
currentStepInstance.next = WorkflowReplayUtils.getNext(overrideNext, currentStepInstance, step);
if (stepOutputDefinition!=null) {
Object outputResolved = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, stepOutputDefinition, Object.class);
currentStepInstance.setOutput(outputResolved);
}
if (currentStepInstance.output != null) {
Pair<Object, Set<Integer>> prev = getStepOutputAndBacktrackedSteps(null);
if (prev != null && Objects.equals(prev.getLeft(), currentStepInstance.output) && lastErrorHandlerOutput==null) {
// optimization, clear the value here if we can simply take it from the previous step
currentStepInstance.output = null;
}
}
if (workflowScratchVariablesUpdatedThisStep!=null && !workflowScratchVariablesUpdatedThisStep.isEmpty()) {
currentStepRecord.workflowScratchUpdates = workflowScratchVariablesUpdatedThisStep;
}
if (currentStepRecord.workflowScratch != null) {
// if we are repeating, check if we need to keep what we were repeating
Pair<Map<String, Object>, Set<Integer>> prev = getStepWorkflowScratchAndBacktrackedSteps(null);
if (prev!=null && !prev.getRight().contains(currentStepIndex) && Objects.equals(prev.getLeft(), currentStepRecord.workflowScratch)){
currentStepRecord.workflowScratch = null;
}
}
workflowScratchVariablesUpdatedThisStep = null;
};
// now run the step
try {
Duration duration = step.getTimeout();
Object newOutput;
if (duration!=null) {
boolean isEnded = DynamicTasks.queue(t).blockUntilEnded(duration);
if (isEnded) {
newOutput = t.getUnchecked();
} else {
t.cancel(true);
throw new TimeoutException("Timeout after "+duration+": "+t.getDisplayName());
}
} else {
newOutput = DynamicTasks.queue(t).getUnchecked();
}
currentStepInstance.setOutput(newOutput);
// allow output to be customized / overridden
onFinish.accept(step.output, null);
} catch (Exception e) {
try {
handleErrorAtStep(step, t, onFinish, e);
} catch (Exception e2) {
currentStepInstance.setError(e2);
throw e2;
}
} finally {
// do this whether or not error
oldStepInfo.compute(currentStepIndex, (index, old) -> {
if (old == null) {
log.warn("Lost old step info for " + this + ", step " + index);
old = new OldStepRecord();
}
if (currentStepInstance.getError()==null) old.countCompleted++;
// okay if this gets picked up by accident because we will check the stepIndex it records against the currentStepIndex,
// and ignore it if different
old.context = currentStepInstance;
return old;
});
}
previousStepTaskId = currentStepInstance.taskId;
previousStepIndex = currentStepIndex;
moveToNextStep("Completed step "+ workflowStepReference(currentStepIndex), false);
}