in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [803:871]
private void refreshIterationOverview(ForeachArtifact artifact) {
ForeachStepOverview stepOverview = artifact.getForeachOverview();
List<ForeachIterationOverview> restartResults;
long restartIterationCheckpoint = stepOverview.getFirstRestartIterationId();
if (restartIterationCheckpoint > 0) {
restartResults =
instanceDao
.getForeachIterationOverviewWithCheckpoint(
artifact.getForeachWorkflowId(),
artifact.getForeachRunId(),
restartIterationCheckpoint,
true)
.stream()
.filter(o -> stepOverview.getRestartInfo().contains(o.getInstanceId()))
.collect(Collectors.toList());
Checks.checkTrue(
restartResults.size() == stepOverview.getRestartInfo().size(),
"Invalid: inconsistent status for foreach restart [%s] where info size [%s] != [%s]",
artifact.getForeachIdentity(),
restartResults.size(),
stepOverview.getRestartInfo());
} else {
restartResults = Collections.emptyList();
}
Set<Long> skipList = stepOverview.getSkippedIterationsWithCheckpoint();
List<ForeachIterationOverview> results =
instanceDao.getForeachIterationOverviewWithCheckpoint(
artifact.getForeachWorkflowId(),
artifact.getForeachRunId(),
stepOverview.getCheckpoint(),
false);
if (!results.isEmpty()) {
long maxIterationId = results.getFirst().getInstanceId(); // results are sorted in DESC;
long newCheckpoint =
results.stream()
.filter(rs -> !rs.getStatus().isTerminal())
.mapToLong(ForeachIterationOverview::getInstanceId)
.min()
.orElse(maxIterationId + 1);
Checks.checkTrue(
newCheckpoint >= stepOverview.getCheckpoint(),
"In artifact [%s], updated checkpoint [%s] must be no less than the previous one [%s]",
artifact,
newCheckpoint,
stepOverview.getCheckpoint());
stepOverview.setCheckpoint(newCheckpoint);
}
stepOverview.resetRunning();
for (ForeachIterationOverview result : restartResults) {
stepOverview.addOne(result.getInstanceId(), result.getStatus(), result.getRollupOverview());
if (result.getStatus().isTerminal()) {
stepOverview.getRestartInfo().remove(result.getInstanceId());
}
}
results.stream()
.filter(o -> !skipList.contains(o.getInstanceId()))
.forEach(
result ->
stepOverview.addOne(
result.getInstanceId(), result.getStatus(), result.getRollupOverview()));
stepOverview.refreshDetail();
}