in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [506:600]
private Optional<TimelineEvent> savePendingActionIfFeasible(
StepRuntimeSummary runtimeSummary, ForeachArtifact artifact) {
if (runtimeSummary.getPendingAction() == null) {
return Optional.empty();
}
StepAction action = runtimeSummary.getPendingAction();
if (!action.isWorkflowAction()
&& action.getAction() == Actions.StepInstanceAction.RESTART // only support restart
&& action.getRestartConfig() != null) {
List<RestartConfig.RestartNode> path = action.getRestartConfig().getRestartPath();
if (path == null || path.size() <= 1) {
LOG.info(
"Get an invalid action path [{}] at foreach step [{}] and ignore it",
path,
artifact.getForeachIdentity());
return Optional.empty();
}
// pick up 2nd to the last restart node, which is the foreach iteration of foreach step
RestartConfig.RestartNode restartNode = path.get(path.size() - 2);
if (!Objects.equals(restartNode.getWorkflowId(), artifact.getForeachWorkflowId())) {
LOG.warn(
"Get an invalid action path node [{}] at foreach step [{}] and ignore it",
restartNode,
artifact.getForeachIdentity());
return Optional.empty();
}
// index is 0-based and iteration id is 1-based.
if (artifact.getNextLoopIndex() < restartNode.getInstanceId()) {
LOG.info(
"Get an action while the current run has not yet looped over it at foreach step [{}] and ignore it",
artifact.getForeachIdentity());
return Optional.empty();
}
if (artifact.getPendingAction() != null) {
LOG.info(
"Get an action while there is already an ongoing action at foreach step [{}] and ignore it",
artifact.getForeachIdentity());
return Optional.empty();
}
if (!artifact
.getForeachOverview()
.isForeachIterationRestartable(restartNode.getInstanceId())) {
LOG.info(
"Get an action while iteration [{}]'s status is unknown or not failed at foreach step [{}] and ignore it",
restartNode.getInstanceId(),
artifact.getForeachIdentity());
return Optional.empty();
}
try {
WorkflowInstance instance =
instanceDao.getLatestWorkflowInstanceRun(
restartNode.getWorkflowId(), restartNode.getInstanceId());
// support restarting terminated iterations not in succeeded status
if (!instance.getStatus().isTerminal()) {
LOG.info(
"Get an action while iteration [{}]'s status is not-actionable [{}] at foreach step [{}] and ignore it",
restartNode.getInstanceId(),
instance.getStatus(),
artifact.getForeachIdentity());
return Optional.empty();
}
artifact.setPendingAction(
ForeachAction.builder()
.instanceId(restartNode.getInstanceId())
.instanceRunId(instance.getWorkflowRunId())
.restartConfig(action.getRestartConfig())
.action(action.getAction())
.user(action.getUser())
.createTime(action.getCreateTime())
.build());
return Optional.of(
TimelineLogEvent.info(
"Saved a pending action for iteration [%s]", restartNode.getInstanceId()));
} catch (Exception e) {
LOG.warn(
"Failed to save an action [{}] at foreach step [{}] and will retry it",
action,
artifact.getForeachIdentity());
return Optional.empty();
}
} else {
LOG.info(
"Get an unsupported action [{}] at foreach step [{}] and ignore it",
action,
artifact.getForeachIdentity());
return Optional.empty();
}
}