private void refreshIterationOverview()

in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [803:871]


  private void refreshIterationOverview(ForeachArtifact artifact) {
    ForeachStepOverview stepOverview = artifact.getForeachOverview();

    List<ForeachIterationOverview> restartResults;
    long restartIterationCheckpoint = stepOverview.getFirstRestartIterationId();
    if (restartIterationCheckpoint > 0) {
      restartResults =
          instanceDao
              .getForeachIterationOverviewWithCheckpoint(
                  artifact.getForeachWorkflowId(),
                  artifact.getForeachRunId(),
                  restartIterationCheckpoint,
                  true)
              .stream()
              .filter(o -> stepOverview.getRestartInfo().contains(o.getInstanceId()))
              .collect(Collectors.toList());
      Checks.checkTrue(
          restartResults.size() == stepOverview.getRestartInfo().size(),
          "Invalid: inconsistent status for foreach restart [%s] where info size [%s] != [%s]",
          artifact.getForeachIdentity(),
          restartResults.size(),
          stepOverview.getRestartInfo());
    } else {
      restartResults = Collections.emptyList();
    }

    Set<Long> skipList = stepOverview.getSkippedIterationsWithCheckpoint();

    List<ForeachIterationOverview> results =
        instanceDao.getForeachIterationOverviewWithCheckpoint(
            artifact.getForeachWorkflowId(),
            artifact.getForeachRunId(),
            stepOverview.getCheckpoint(),
            false);

    if (!results.isEmpty()) {
      long maxIterationId = results.getFirst().getInstanceId(); // results are sorted in DESC;
      long newCheckpoint =
          results.stream()
              .filter(rs -> !rs.getStatus().isTerminal())
              .mapToLong(ForeachIterationOverview::getInstanceId)
              .min()
              .orElse(maxIterationId + 1);
      Checks.checkTrue(
          newCheckpoint >= stepOverview.getCheckpoint(),
          "In artifact [%s], updated checkpoint [%s] must be no less than the previous one [%s]",
          artifact,
          newCheckpoint,
          stepOverview.getCheckpoint());
      stepOverview.setCheckpoint(newCheckpoint);
    }

    stepOverview.resetRunning();
    for (ForeachIterationOverview result : restartResults) {
      stepOverview.addOne(result.getInstanceId(), result.getStatus(), result.getRollupOverview());
      if (result.getStatus().isTerminal()) {
        stepOverview.getRestartInfo().remove(result.getInstanceId());
      }
    }

    results.stream()
        .filter(o -> !skipList.contains(o.getInstanceId()))
        .forEach(
            result ->
                stepOverview.addOne(
                    result.getInstanceId(), result.getStatus(), result.getRollupOverview()));

    stepOverview.refreshDetail();
  }