in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [310:454]
public Result execute(
WorkflowSummary workflowSummary, Step step, StepRuntimeSummary runtimeSummary) {
try {
ForeachArtifact artifact =
runtimeSummary.getArtifacts().get(Artifact.Type.FOREACH.key()).asForeach();
Optional<TimelineEvent> saveActionTimeline =
savePendingActionIfFeasible(runtimeSummary, artifact);
if (saveActionTimeline.isPresent()) {
return new Result(
State.CONTINUE,
Collections.singletonMap(artifact.getType().key(), artifact),
Collections.singletonList(saveActionTimeline.get()));
}
final int total = artifact.getTotalLoopCount();
int index = artifact.getNextLoopIndex();
if (index < 0 || index > total) { // assert the index value
throw new MaestroInternalError(
"Invalid index value [%s] for %s%s",
index, workflowSummary.getIdentity(), runtimeSummary.getIdentity());
}
refreshIterationOverview(artifact);
ForeachStep foreachStep = (ForeachStep) step;
// Take restart action and update foreach artifact including overview and rollup
Optional<TimelineEvent> takeActionTimeline =
takePendingActionIfFeasible(workflowSummary, foreachStep, runtimeSummary, artifact);
if (takeActionTimeline.isPresent()) {
return new Result(
State.CONTINUE,
Collections.singletonMap(artifact.getType().key(), artifact),
Collections.singletonList(takeActionTimeline.get()));
}
final long nonTerminalCount =
artifact
.getForeachOverview()
.getRunningStatsCount(
ObjectHelper.valueOrDefault(
foreachStep.getStrictOrdering(),
Defaults.DEFAULT_FOREACH_STRICT_ORDERING_ENABLED));
final boolean failed =
artifact.getForeachOverview().statusExistInIterations(WorkflowInstance.Status.FAILED);
boolean done = false;
TimelineEvent timelineEvent = null;
if (failed) {
switch (step.getFailureMode()) {
case FAIL_IMMEDIATELY:
index = total;
try {
done = artifact.getForeachOverview().getRunningStatsCount(false) == 0;
if (!done) {
// only terminate itself and maestro task will take care of instance termination
actionDao.terminate(
workflowSummary,
step.getId(),
SYSTEM_USER,
Actions.StepInstanceAction.KILL,
"Foreach step with FAIL_IMMEDIATELY terminates itself due to a failed iteration");
return new Result(
State.CONTINUE,
Collections.singletonMap(artifact.getType().key(), artifact),
Collections.singletonList(
TimelineLogEvent.warn(
"Terminating the foreach step with FAIL_IMMEDIATELY because of a failed iteration. "
+ "It will terminate all running iterations as well.")));
}
} catch (Exception e) {
timelineEvent =
TimelineLogEvent.warn(
"Failed to stop all running foreach iterations. Will retry it.");
}
break;
case IGNORE_FAILURE:
// will not continue the iterations but foreach step will be COMPLETED_WITH_ERROR.
case FAIL_AFTER_RUNNING:
index = -index; // keep the progress
timelineEvent =
TimelineLogEvent.warn(
"Foreach step will be failed after all running steps because of a step failure.");
done = nonTerminalCount == 0L;
break;
default:
throw new MaestroInternalError(
"Invalid failure mode: %s for foreach step %s%s",
step.getFailureMode(), workflowSummary.getIdentity(), runtimeSummary.getIdentity());
}
} else {
done = nonTerminalCount == 0L && index == total;
}
State state;
if (done) {
state = deriveStepStateOnceDone(artifact);
} else {
final long launchLimit =
index < 0
? 0
: Math.min(
getConcurrency(workflowSummary, foreachStep, nonTerminalCount), total - index);
if (launchLimit > 0) {
AtomicInteger nextIndex = new AtomicInteger(index);
timelineEvent =
runForeachIterations(
workflowSummary,
foreachStep,
runtimeSummary,
artifact,
nextIndex,
(int) launchLimit);
index = nextIndex.get();
} else {
LOG.debug(
"In step runtime {}{}, no newly created foreach workflow instances at iteration [{}]",
workflowSummary.getIdentity(),
runtimeSummary.getIdentity(),
Math.abs(index));
}
state = State.CONTINUE;
}
// update foreach artifact, might be lagging but will be consistent at the end
artifact.setNextLoopIndex(Math.abs(index));
return new Result(
state,
Collections.singletonMap(artifact.getType().key(), artifact),
timelineEvent == null
? Collections.emptyList()
: Collections.singletonList(timelineEvent));
} catch (Exception e) {
LOG.error(
"Failed to execute foreach workflow step runtime {}{}",
workflowSummary.getIdentity(),
runtimeSummary.getIdentity(),
e);
return new Result(
State.FATAL_ERROR,
Collections.emptyMap(),
Collections.singletonList(
TimelineDetailsEvent.from(
Details.create(
e, false, "Failed to execute foreach workflow step runtime with an error"))));
}
}