private ForeachArtifact createArtifact()

in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [122:207]


  private ForeachArtifact createArtifact(
      WorkflowSummary workflowSummary, StepRuntimeSummary runtimeSummary) {
    int total = getLoopParamsTotalCount(runtimeSummary);
    Checks.checkTrue(
        total <= FOREACH_ITERATION_LIMIT,
        "Foreach iteration number %s is over the loop size limit %s for step {}{}",
        total,
        FOREACH_ITERATION_LIMIT,
        workflowSummary.getIdentity(),
        runtimeSummary.getIdentity());

    ForeachArtifact artifact = new ForeachArtifact();
    artifact.setForeachWorkflowId(generateForeachWorkflowId(workflowSummary, runtimeSummary));
    artifact.setForeachIdentity(workflowSummary.getIdentity() + runtimeSummary.getIdentity());
    artifact.setForeachRunId(1L);

    // inline run should match its upstream run.
    artifact.setRunId(workflowSummary.getWorkflowRunId());
    if (runtimeSummary.getRestartConfig() != null) {
      artifact.setRunPolicy(runtimeSummary.getRestartConfig().getRestartPolicy());
    } else {
      artifact.setRunPolicy(workflowSummary.getRunPolicy());
    }

    artifact.setNextLoopIndex(0);
    artifact.setTotalLoopCount(total);
    artifact.setForeachOverview(new ForeachStepOverview());

    if (!workflowSummary.isFreshRun() || !runtimeSummary.getStepRetry().isRetryable()) {
      artifact.setForeachRunId(
          instanceDao.getLargestForeachRunIdFromRuns(artifact.getForeachWorkflowId()) + 1L);

      if (artifact.getRunPolicy() == RunPolicy.RESTART_FROM_INCOMPLETE
          || artifact.getRunPolicy() == RunPolicy.RESTART_FROM_SPECIFIC) {
        ForeachArtifact prevArtifact =
            stepInstanceDao.getLatestForeachArtifact(
                workflowSummary.getWorkflowId(),
                workflowSummary.getWorkflowInstanceId(),
                runtimeSummary.getStepId());
        if (prevArtifact != null && prevArtifact.getForeachOverview() != null) {
          RestartConfig config =
              ObjectHelper.valueOrDefault(
                  runtimeSummary.getRestartConfig(), workflowSummary.getRestartConfig());
          long toRestart = // this is the iteration id specified during restart from specific path
              RunRequest.getNextNode(config)
                  .filter(
                      node -> Objects.equals(node.getWorkflowId(), artifact.getForeachWorkflowId()))
                  .map(RestartConfig.RestartNode::getInstanceId)
                  .orElse(0L);

          long prevMaxIterationId =
              artifact
                  .getForeachOverview()
                  .initiateAndGetByPrevMaxIterationId(prevArtifact.getForeachOverview(), toRestart);

          initializeForeachArtifactRollup(
              artifact.getForeachOverview(),
              prevArtifact.getForeachOverview(),
              artifact.getForeachWorkflowId());

          if (toRestart != 0) {
            // Set pending action for restart
            artifact.setPendingAction(
                ForeachAction.builder()
                    .instanceId(toRestart)
                    .instanceRunId(Constants.LATEST_ONE) // unknown and will use base run + 1
                    .restartConfig(config)
                    .action(Actions.StepInstanceAction.RESTART)
                    .user(SYSTEM_USER)
                    .createTime(System.currentTimeMillis())
                    .build());
          }

          if (prevArtifact.getAncestorIterationCount() != null) {
            artifact.setAncestorIterationCount(
                Math.max(prevMaxIterationId, prevArtifact.getAncestorIterationCount()));
          } else {
            artifact.setAncestorIterationCount(prevMaxIterationId);
          }
          artifact.setNextLoopIndex((int) artifact.getForeachOverview().getCheckpoint());
        }
      }
    }

    return artifact;
  }