public Result execute()

in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [310:454]


  public Result execute(
      WorkflowSummary workflowSummary, Step step, StepRuntimeSummary runtimeSummary) {
    try {
      ForeachArtifact artifact =
          runtimeSummary.getArtifacts().get(Artifact.Type.FOREACH.key()).asForeach();

      Optional<TimelineEvent> saveActionTimeline =
          savePendingActionIfFeasible(runtimeSummary, artifact);
      if (saveActionTimeline.isPresent()) {
        return new Result(
            State.CONTINUE,
            Collections.singletonMap(artifact.getType().key(), artifact),
            Collections.singletonList(saveActionTimeline.get()));
      }

      final int total = artifact.getTotalLoopCount();
      int index = artifact.getNextLoopIndex();
      if (index < 0 || index > total) { // assert the index value
        throw new MaestroInternalError(
            "Invalid index value [%s] for %s%s",
            index, workflowSummary.getIdentity(), runtimeSummary.getIdentity());
      }
      refreshIterationOverview(artifact);

      ForeachStep foreachStep = (ForeachStep) step;
      // Take restart action and update foreach artifact including overview and rollup
      Optional<TimelineEvent> takeActionTimeline =
          takePendingActionIfFeasible(workflowSummary, foreachStep, runtimeSummary, artifact);
      if (takeActionTimeline.isPresent()) {
        return new Result(
            State.CONTINUE,
            Collections.singletonMap(artifact.getType().key(), artifact),
            Collections.singletonList(takeActionTimeline.get()));
      }

      final long nonTerminalCount =
          artifact
              .getForeachOverview()
              .getRunningStatsCount(
                  ObjectHelper.valueOrDefault(
                      foreachStep.getStrictOrdering(),
                      Defaults.DEFAULT_FOREACH_STRICT_ORDERING_ENABLED));
      final boolean failed =
          artifact.getForeachOverview().statusExistInIterations(WorkflowInstance.Status.FAILED);
      boolean done = false;
      TimelineEvent timelineEvent = null;
      if (failed) {
        switch (step.getFailureMode()) {
          case FAIL_IMMEDIATELY:
            index = total;
            try {
              done = artifact.getForeachOverview().getRunningStatsCount(false) == 0;
              if (!done) {
                // only terminate itself and maestro task will take care of instance termination
                actionDao.terminate(
                    workflowSummary,
                    step.getId(),
                    SYSTEM_USER,
                    Actions.StepInstanceAction.KILL,
                    "Foreach step with FAIL_IMMEDIATELY terminates itself due to a failed iteration");
                return new Result(
                    State.CONTINUE,
                    Collections.singletonMap(artifact.getType().key(), artifact),
                    Collections.singletonList(
                        TimelineLogEvent.warn(
                            "Terminating the foreach step with FAIL_IMMEDIATELY because of a failed iteration. "
                                + "It will terminate all running iterations as well.")));
              }
            } catch (Exception e) {
              timelineEvent =
                  TimelineLogEvent.warn(
                      "Failed to stop all running foreach iterations. Will retry it.");
            }
            break;
          case IGNORE_FAILURE:
            // will not continue the iterations but foreach step will be COMPLETED_WITH_ERROR.
          case FAIL_AFTER_RUNNING:
            index = -index; // keep the progress
            timelineEvent =
                TimelineLogEvent.warn(
                    "Foreach step will be failed after all running steps because of a step failure.");
            done = nonTerminalCount == 0L;
            break;
          default:
            throw new MaestroInternalError(
                "Invalid failure mode: %s for foreach step %s%s",
                step.getFailureMode(), workflowSummary.getIdentity(), runtimeSummary.getIdentity());
        }
      } else {
        done = nonTerminalCount == 0L && index == total;
      }

      State state;
      if (done) {
        state = deriveStepStateOnceDone(artifact);
      } else {
        final long launchLimit =
            index < 0
                ? 0
                : Math.min(
                    getConcurrency(workflowSummary, foreachStep, nonTerminalCount), total - index);
        if (launchLimit > 0) {
          AtomicInteger nextIndex = new AtomicInteger(index);
          timelineEvent =
              runForeachIterations(
                  workflowSummary,
                  foreachStep,
                  runtimeSummary,
                  artifact,
                  nextIndex,
                  (int) launchLimit);
          index = nextIndex.get();
        } else {
          LOG.debug(
              "In step runtime {}{}, no newly created foreach workflow instances at iteration [{}]",
              workflowSummary.getIdentity(),
              runtimeSummary.getIdentity(),
              Math.abs(index));
        }
        state = State.CONTINUE;
      }
      // update foreach artifact, might be lagging but will be consistent at the end
      artifact.setNextLoopIndex(Math.abs(index));

      return new Result(
          state,
          Collections.singletonMap(artifact.getType().key(), artifact),
          timelineEvent == null
              ? Collections.emptyList()
              : Collections.singletonList(timelineEvent));
    } catch (Exception e) {
      LOG.error(
          "Failed to execute foreach workflow step runtime {}{}",
          workflowSummary.getIdentity(),
          runtimeSummary.getIdentity(),
          e);
      return new Result(
          State.FATAL_ERROR,
          Collections.emptyMap(),
          Collections.singletonList(
              TimelineDetailsEvent.from(
                  Details.create(
                      e, false, "Failed to execute foreach workflow step runtime with an error"))));
    }
  }