in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [615:704]
private Optional<TimelineEvent> takePendingActionIfFeasible(
WorkflowSummary workflowSummary,
ForeachStep foreachStep,
StepRuntimeSummary runtimeSummary,
ForeachArtifact artifact) {
if (artifact.getPendingAction() == null) {
return Optional.empty();
}
ForeachAction foreachAction = artifact.getPendingAction();
try {
WorkflowInstance instance =
instanceDao.getWorkflowInstanceRun(
artifact.getForeachWorkflowId(),
foreachAction.getInstanceId(),
foreachAction.getInstanceRunId());
if ((foreachAction.getInstanceRunId() == Constants.LATEST_ONE
|| instance.getWorkflowRunId() == foreachAction.getInstanceRunId())
&& instance.getStatus().isTerminal()) {
int restartIterationId = (int) foreachAction.getInstanceId();
ForeachArtifact restartArtifact = new ForeachArtifact();
restartArtifact.setForeachWorkflowId(artifact.getForeachWorkflowId());
restartArtifact.setForeachRunId(
Math.max(artifact.getForeachRunId(), foreachAction.getInstanceRunId()) + 1);
RunRequest runRequest =
StepHelper.createInternalWorkflowRunRequest(
workflowSummary,
runtimeSummary,
Collections.singletonList(Tag.create(FOREACH_TAG_NAME)),
createForeachRunParams(
restartIterationId - 1, workflowSummary, foreachStep, runtimeSummary),
generateDedupKey(restartArtifact, restartIterationId - 1))
.toBuilder()
.restartConfig(foreachAction.getRestartConfig())
.build();
WorkflowInstance.Status oldStatus = instance.getStatus();
WorkflowRollupOverview oldOverview =
instance.getRuntimeOverview() == null
? null
: instance.getRuntimeOverview().getRollupOverview();
Optional<Details> result =
actionHandler.restartForeachInstance(
runRequest, instance, foreachStep.getId(), restartArtifact.getForeachRunId());
if (result.isPresent()) {
LOG.info(
"Failed to take the action for foreach step {}{} due to [{}] and will retry it later",
workflowSummary.getIdentity(),
runtimeSummary.getIdentity(),
result.get());
return result.map(TimelineDetailsEvent::from);
} else {
LOG.info(
"Successfully took the pending action for foreach step {}{} and will update artifact together",
workflowSummary.getIdentity(),
runtimeSummary.getIdentity());
artifact
.getForeachOverview()
.updateForRestart(
foreachAction.getInstanceId(), instance.getStatus(), oldStatus, oldOverview);
artifact.setPendingAction(null);
return Optional.of(
TimelineLogEvent.info("Successfully take an action: [%s]", foreachAction));
}
} else {
LOG.info(
"Got an out-dated pending action [{}] for foreach step {}{} and ignore it...",
foreachAction,
workflowSummary.getIdentity(),
runtimeSummary.getIdentity());
artifact.setPendingAction(null);
return Optional.empty();
}
} catch (Exception e) {
LOG.warn(
"Fatally failed to take the action for foreach step {}{} due to [{}] and will not retry it",
workflowSummary.getIdentity(),
runtimeSummary.getIdentity(),
e.getMessage());
artifact.setPendingAction(null);
return Optional.of(
TimelineDetailsEvent.from(
Details.create(
e,
false,
"Failed to take the action for foreach step with an error and won't retry")));
}
}