in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [122:207]
private ForeachArtifact createArtifact(
WorkflowSummary workflowSummary, StepRuntimeSummary runtimeSummary) {
int total = getLoopParamsTotalCount(runtimeSummary);
Checks.checkTrue(
total <= FOREACH_ITERATION_LIMIT,
"Foreach iteration number %s is over the loop size limit %s for step {}{}",
total,
FOREACH_ITERATION_LIMIT,
workflowSummary.getIdentity(),
runtimeSummary.getIdentity());
ForeachArtifact artifact = new ForeachArtifact();
artifact.setForeachWorkflowId(generateForeachWorkflowId(workflowSummary, runtimeSummary));
artifact.setForeachIdentity(workflowSummary.getIdentity() + runtimeSummary.getIdentity());
artifact.setForeachRunId(1L);
// inline run should match its upstream run.
artifact.setRunId(workflowSummary.getWorkflowRunId());
if (runtimeSummary.getRestartConfig() != null) {
artifact.setRunPolicy(runtimeSummary.getRestartConfig().getRestartPolicy());
} else {
artifact.setRunPolicy(workflowSummary.getRunPolicy());
}
artifact.setNextLoopIndex(0);
artifact.setTotalLoopCount(total);
artifact.setForeachOverview(new ForeachStepOverview());
if (!workflowSummary.isFreshRun() || !runtimeSummary.getStepRetry().isRetryable()) {
artifact.setForeachRunId(
instanceDao.getLargestForeachRunIdFromRuns(artifact.getForeachWorkflowId()) + 1L);
if (artifact.getRunPolicy() == RunPolicy.RESTART_FROM_INCOMPLETE
|| artifact.getRunPolicy() == RunPolicy.RESTART_FROM_SPECIFIC) {
ForeachArtifact prevArtifact =
stepInstanceDao.getLatestForeachArtifact(
workflowSummary.getWorkflowId(),
workflowSummary.getWorkflowInstanceId(),
runtimeSummary.getStepId());
if (prevArtifact != null && prevArtifact.getForeachOverview() != null) {
RestartConfig config =
ObjectHelper.valueOrDefault(
runtimeSummary.getRestartConfig(), workflowSummary.getRestartConfig());
long toRestart = // this is the iteration id specified during restart from specific path
RunRequest.getNextNode(config)
.filter(
node -> Objects.equals(node.getWorkflowId(), artifact.getForeachWorkflowId()))
.map(RestartConfig.RestartNode::getInstanceId)
.orElse(0L);
long prevMaxIterationId =
artifact
.getForeachOverview()
.initiateAndGetByPrevMaxIterationId(prevArtifact.getForeachOverview(), toRestart);
initializeForeachArtifactRollup(
artifact.getForeachOverview(),
prevArtifact.getForeachOverview(),
artifact.getForeachWorkflowId());
if (toRestart != 0) {
// Set pending action for restart
artifact.setPendingAction(
ForeachAction.builder()
.instanceId(toRestart)
.instanceRunId(Constants.LATEST_ONE) // unknown and will use base run + 1
.restartConfig(config)
.action(Actions.StepInstanceAction.RESTART)
.user(SYSTEM_USER)
.createTime(System.currentTimeMillis())
.build());
}
if (prevArtifact.getAncestorIterationCount() != null) {
artifact.setAncestorIterationCount(
Math.max(prevMaxIterationId, prevArtifact.getAncestorIterationCount()));
} else {
artifact.setAncestorIterationCount(prevMaxIterationId);
}
artifact.setNextLoopIndex((int) artifact.getForeachOverview().getCheckpoint());
}
}
}
return artifact;
}